瀏覽代碼

支持rowid字段配置

AE86 4 年之前
父節點
當前提交
d32b13dec2

+ 2 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/ConnectorConfigChecker.java

@@ -18,8 +18,9 @@ public interface ConnectorConfigChecker {
      *
      * @param mapping
      * @param tableGroup
+     * @param params
      */
-    default void dealIncrementStrategy(Mapping mapping, TableGroup tableGroup) {}
+    default void dealIncrementStrategy(Mapping mapping, TableGroup tableGroup, Map<String, String> params) {}
 
     /**
      * 修改配置

+ 108 - 35
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/OracleConfigChecker.java

@@ -1,24 +1,28 @@
 package org.dbsyncer.biz.checker.impl.connector;
 
 import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.biz.enums.OracleIncrementEnum;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.config.MetaInfo;
+import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /**
  * <p>1、增量同步时,目标源必须有一个主键字段用于接收ROW_ID值。</p>
- * <p>2、全局可配置目标源ROW_ID字段名称,默认为ROW_ID_NAME。 </p>
- * <p>3、如果配置了接收字段,添加字段映射关系[ROW_ID_NAME] > [ROW_ID_NAME],并将ROW_ID_NAME字段设置为目标源的唯一主键。</p>
- * <p>4、全量同步时,ROW_ID_NAME参数非必须。</p>
+ * <p>2、全局可配置目标源ROW_ID字段名称,默认为ROW_ID_LABEL_NAME。 </p>
+ * <p>3、如果配置了接收字段,添加字段映射关系[ROW_ID_LABEL_NAME] > [ROW_ID_LABEL_NAME],并将ROW_ID_LABEL_NAME字段设置为目标源的唯一主键。</p>
+ * <p>4、全量同步时,ROW_ID_LABEL_NAME参数非必须。</p>
  *
  * @author AE86
  * @version 1.0.0
@@ -27,57 +31,126 @@ import java.util.stream.Collectors;
 @Component
 public class OracleConfigChecker extends AbstractDataBaseConfigChecker {
 
-    /**
-     * 默认ROWID列名称
-     */
-    private static final String ROW_ID_NAME = "ORACLE_ROW_ID";
-    private static final String ROW_ID      = "ROWIDTOCHAR(ROWID)";
-
     @Autowired
     private Manager manager;
 
     @Override
-    public void dealIncrementStrategy(Mapping mapping, TableGroup tableGroup) {
-        // TODO 模拟测试
-        Map<String, String> params = mapping.getParams();
-        mapping.getParams().put(ROW_ID_NAME, "row_id");
-
-        // 没有定义目标源ROW_ID字段
-        if (CollectionUtils.isEmpty(mapping.getParams()) || !params.containsKey(ROW_ID_NAME)) {
+    public void dealIncrementStrategy(Mapping mapping, TableGroup tableGroup, Map<String, String> params) {
+        String targetRowIdName = params.get(OracleIncrementEnum.ROW_ID_LABEL_NAME.getName());
+        if (CollectionUtils.isEmpty(params) || StringUtils.isBlank(targetRowIdName)) {
+            revert(mapping, tableGroup);
             return;
         }
 
+        // 检查目标源是否支持该自定义字段
+        List<Field> targetColumn = tableGroup.getTargetTable().getColumn();
+        Field targetField = null;
+        for (Field f : targetColumn) {
+            if (StringUtils.equals(f.getName(), targetRowIdName)) {
+                targetField = f;
+                targetField.setPk(true);
+                continue;
+            }
+            f.setPk(false);
+        }
+        Assert.isTrue(null != targetField,
+                String.format("[%s 同步到 %s],目标源表不存在字段%s", tableGroup.getSourceTable().getName(), tableGroup.getTargetTable().getName(),
+                        targetRowIdName));
+
         // 检查是否更新
-        String targetRowIdName = params.get(ROW_ID_NAME);
         for (FieldMapping m : tableGroup.getFieldMapping()) {
-            if (StringUtils.equals(m.getSource().getName(), ROW_ID)) {
+            if (null != m.getSource() && OracleIncrementEnum.isRowId(m.getSource().getName())) {
                 m.getTarget().setName(targetRowIdName);
-                m.getTarget().setPk(true);
                 return;
             }
         }
 
-        List<Field> sourceColumn = tableGroup.getSourceTable().getColumn();
-        List<Field> sFieldList = sourceColumn.stream().filter(f -> StringUtils.endsWithIgnoreCase(f.getName(), ROW_ID)).collect(Collectors.toList());
-        Field sourceField = new Field(ROW_ID, "VARCHAR2", 12, false, ROW_ID_NAME, true);
-        if(CollectionUtils.isEmpty(sFieldList)){
-            sourceColumn.add(0, sourceField);
+        Field sourceField = new Field(OracleIncrementEnum.ROW_ID.getName(), "VARCHAR2", 12, false,
+                OracleIncrementEnum.ROW_ID_LABEL_NAME.getName(), true);
+        tableGroup.getSourceTable().getColumn().add(0, sourceField);
+
+        // 取消主键
+        tableGroup.getFieldMapping().forEach(m -> {
+            if (null != m.getTarget()) {
+                m.getTarget().setPk(false);
+            }
+        });
+        tableGroup.getFieldMapping().add(0, new FieldMapping(sourceField, targetField));
+    }
+
+    /**
+     * 还原字段和映射关系
+     *
+     * @param mapping
+     * @param tableGroup
+     */
+    private void revert(Mapping mapping, TableGroup tableGroup) {
+        List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
+        if (CollectionUtils.isEmpty(fieldMapping)) {
+            return;
         }
 
-        List<Field> targetColumn = tableGroup.getTargetTable().getColumn();
-        Field targetField = null;
-        for(Field f : targetColumn){
-            f.setPk(StringUtils.endsWithIgnoreCase(f.getName(), targetRowIdName));
-            if(f.isPk()){
-                targetField = f;
+        // 还原字段
+        Table sourceTable = tableGroup.getSourceTable();
+        List<Field> sourceColumn = sourceTable.getColumn();
+        List<Field> sourceFields = new ArrayList<>();
+        boolean existRowId = false;
+        for (Field f : sourceColumn) {
+            if (OracleIncrementEnum.isRowId(f.getName())) {
+                existRowId = true;
+                continue;
             }
+            sourceFields.add(f);
         }
-        if(null == targetField){
-            targetField = new Field(targetRowIdName, "VARCHAR2", 12, true);
-            targetColumn.add(0, targetField);
+        sourceTable.setColumn(sourceFields);
+
+        if (!existRowId) {
+            return;
         }
 
-        tableGroup.getFieldMapping().add(0, new FieldMapping(sourceField, targetField));
+        // 存在自定义主键
+        String pk = null;
+        for (FieldMapping m : tableGroup.getFieldMapping()) {
+            if (null != m.getSource() && OracleIncrementEnum.isRowId(m.getSource().getName())) {
+                continue;
+            }
+            if (null != m.getTarget() && m.getTarget().isPk()) {
+                pk = m.getTarget().getName();
+                break;
+            }
+        }
+
+        // 没有自定义主键,获取表元信息
+        if (null == pk) {
+            Table targetTable = tableGroup.getTargetTable();
+            MetaInfo metaInfo = manager.getMetaInfo(mapping.getTargetConnectorId(), targetTable.getName());
+            List<Field> targetColumn = metaInfo.getColumn();
+            targetTable.setColumn(targetColumn);
+
+            if (!CollectionUtils.isEmpty(targetColumn)) {
+                for (Field f : targetColumn) {
+                    if (f.isPk()) {
+                        pk = f.getName();
+                        break;
+                    }
+                }
+            }
+        }
+
+        // 剔除映射关系
+        List<FieldMapping> list = new ArrayList<>();
+        for (FieldMapping m : tableGroup.getFieldMapping()) {
+            if (null != m.getSource() && OracleIncrementEnum.isRowId(m.getSource().getName())) {
+                continue;
+            }
+
+            if (null != m.getTarget() && StringUtils.equals(m.getTarget().getName(), pk)) {
+                m.getTarget().setPk(true);
+            }
+            list.add(m);
+        }
+        fieldMapping.clear();
+        fieldMapping.addAll(list);
     }
 
 }

+ 3 - 7
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -103,9 +103,6 @@ public class MappingChecker extends AbstractChecker {
         // 修改高级配置:过滤条件/转换配置/插件配置
         this.modifySuperConfigModel(mapping, params);
 
-        // 更新映射关系过滤条件
-        batchUpdateTableGroupCommand(mapping);
-
         // 更新meta
         updateMeta(mapping);
 
@@ -130,16 +127,15 @@ public class MappingChecker extends AbstractChecker {
     }
 
     /**
-     * <b>更新映射关系过滤条件</b>
-     * <p>如果映射关系没有过滤条件,使用全局的过滤条件</p>
+     * 合并关联的映射关系配置
      *
      * @param mapping
      */
-    private void batchUpdateTableGroupCommand(Mapping mapping) {
+    public void batchMergeTableGroupConfig(Mapping mapping) {
         List<TableGroup> groupAll = manager.getTableGroupAll(mapping.getId());
         if (!CollectionUtils.isEmpty(groupAll)) {
             for (TableGroup g : groupAll) {
-                tableGroupChecker.genCommand(mapping, g);
+                tableGroupChecker.mergeConfig(mapping, g);
                 manager.editTableGroup(g);
             }
         }

+ 11 - 13
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -74,11 +74,8 @@ public class TableGroupChecker extends AbstractChecker {
         // 匹配相似字段映射关系
         mergeFieldMapping(tableGroup);
 
-        // 处理策略
-        dealIncrementStrategy(mapping, tableGroup);
-
-        // 生成command
-        genCommand(mapping, tableGroup);
+        // 合并配置
+        mergeConfig(mapping, tableGroup);
 
         return tableGroup;
     }
@@ -104,18 +101,19 @@ public class TableGroupChecker extends AbstractChecker {
         // 字段映射关系
         setFieldMapping(tableGroup, fieldMappingJson);
 
-        // 处理策略
-        dealIncrementStrategy(mapping, tableGroup);
-
-        // 生成command
-        genCommand(mapping, tableGroup);
+        // 合并配置
+        mergeConfig(mapping, tableGroup);
 
         return tableGroup;
     }
 
-    public void genCommand(Mapping mapping, TableGroup tableGroup) {
+    public void mergeConfig(Mapping mapping, TableGroup tableGroup) {
+        // 合并高级配置
         TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
 
+        // 处理策略
+        dealIncrementStrategy(mapping, tableGroup, group.getParams());
+
         Map<String, String> command = manager.getCommand(mapping, group);
         tableGroup.setCommand(command);
 
@@ -124,12 +122,12 @@ public class TableGroupChecker extends AbstractChecker {
         tableGroup.getSourceTable().setCount(count);
     }
 
-    public void dealIncrementStrategy(Mapping mapping, TableGroup tableGroup) {
+    public void dealIncrementStrategy(Mapping mapping, TableGroup tableGroup, Map<String, String> params) {
         String connectorType = manager.getConnector(mapping.getSourceConnectorId()).getConfig().getConnectorType();
         String type = StringUtil.toLowerCaseFirstOne(connectorType).concat("ConfigChecker");
         ConnectorConfigChecker checker = map.get(type);
         Assert.notNull(checker, "Checker can not be null.");
-        checker.dealIncrementStrategy(mapping, tableGroup);
+        checker.dealIncrementStrategy(mapping, tableGroup, params);
     }
 
     private Table getTable(String connectorId, String tableName) {

+ 41 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/enums/OracleIncrementEnum.java

@@ -0,0 +1,41 @@
+package org.dbsyncer.biz.enums;
+
+/**
+ * Oralce参数配置
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2021/4/28 23:21
+ */
+public enum OracleIncrementEnum {
+
+    /**
+     * ROWIDTOCHAR(ROWID) 列名
+     */
+    ROW_ID("ROWIDTOCHAR(ROWID)"),
+    /**
+     * ORACLE_ROW_ID 别名
+     */
+    ROW_ID_LABEL_NAME("ORACLE_ROW_ID");
+
+    private String name;
+
+    OracleIncrementEnum(String name) {
+        this.name = name;
+    }
+
+    /**
+     * 是否ROWIDTOCHAR(ROWID) 列名
+     *
+     * @param name
+     * @return
+     */
+    public static boolean isRowId(String name) {
+        return ROW_ID.getName().equals(name);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+}

+ 3 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MappingServiceImpl.java

@@ -61,9 +61,10 @@ public class MappingServiceImpl extends BaseServiceImpl implements MappingServic
         Mapping mapping = assertMappingExist(id);
         synchronized (LOCK) {
             assertRunning(mapping.getMetaId());
-            ConfigModel model = mappingChecker.checkEditConfigModel(params);
-            log(LogType.MappingLog.UPDATE, (Mapping) model);
+            Mapping model = (Mapping) mappingChecker.checkEditConfigModel(params);
+            log(LogType.MappingLog.UPDATE, model);
 
+            mappingChecker.batchMergeTableGroupConfig(model);
             return manager.editMapping(model);
         }
     }

+ 0 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -22,14 +22,12 @@ import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.config.FieldPicker;
 import org.dbsyncer.manager.puller.AbstractPuller;
 import org.dbsyncer.parser.Parser;
-import org.dbsyncer.parser.enums.PrimaryKeyMappingEnum;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
-import org.dbsyncer.parser.strategy.PrimaryKeyMappingStrategy;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

+ 3 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/PickerUtil.java

@@ -18,7 +18,7 @@ public abstract class PickerUtil {
     }
 
     /**
-     * 合并过滤条件、转换配置、插件配置、目标源字段、数据源字段
+     * 合并参数配置、过滤条件、转换配置、插件配置、目标源字段、数据源字段
      *
      * @param mapping
      * @param tableGroup
@@ -36,6 +36,8 @@ public abstract class PickerUtil {
         group.setTargetTable(tableGroup.getTargetTable());
         group.setCommand(tableGroup.getCommand());
 
+        // 参数配置(默认使用全局)
+        group.setParams(CollectionUtils.isEmpty(tableGroup.getParams()) ? mapping.getParams() : tableGroup.getParams());
         // 过滤条件(默认使用全局)
         group.setFilter(CollectionUtils.isEmpty(tableGroup.getFilter()) ? mapping.getFilter() : tableGroup.getFilter());
         // 转换配置(默认使用全局)

+ 4 - 0
dbsyncer-web/src/main/resources/public/mapping/edit.html

@@ -116,6 +116,9 @@
                                 <div th:replace="mapping/editIncrement :: content"></div>
                             </div>
 
+                            <!-- 参数配置 -->
+                            <div th:replace="mapping/editParameter :: content"></div>
+
                             <!-- 过滤条件 -->
                             <div th:replace="mapping/editFilter :: content"></div>
 
@@ -135,6 +138,7 @@
 </div>
 
 <script th:src="@{/js/mapping/edit.js}"></script>
+<script th:src="@{/js/mapping/editParam.js}"></script>
 <script th:src="@{/js/mapping/editFilterAndConvert.js}"></script>
 <script th:src="@{/js/mapping/editIncrement.js}"></script>
 </html>

+ 30 - 0
dbsyncer-web/src/main/resources/public/mapping/editParameter.html

@@ -0,0 +1,30 @@
+<!DOCTYPE html>
+<html xmlns="http://www.w3.org/1999/xhtml"
+      xmlns:th="http://www.thymeleaf.org" lang="zh-CN">
+
+<div th:fragment="content">
+    <p class="text-muted">参数配置</p>
+
+    <div id="paramsList" class="form-group">
+        <div class="row">
+            <div class="col-md-4">
+                <label class="col-sm-3 control-label text-right">目标源ROWID</label>
+                <div class="col-sm-9">
+                    <!-- Mapping params -->
+                    <input th:if="${tableGroup} == null" th:value="${mapping?.params?.get('ORACLE_ROW_ID')}" name="ORACLE_ROW_ID" type="text" class="form-control" title="区分Oracle增量同步数据字段,会将该字段设置为目标源唯一主键"/>
+
+                    <!-- TableGroup params -->
+                    <input th:if="${tableGroup} != null" th:value="${tableGroup?.params?.get('ORACLE_ROW_ID')}" name="ORACLE_ROW_ID" type="text" class="form-control" title="区分Oracle增量同步数据字段,会将该字段设置为目标源唯一主键"/>
+                </div>
+            </div>
+            <div class="col-md-8"></div>
+        </div>
+    </div>
+
+    <!-- 隐藏表单值 -->
+    <div class="form-group hidden">
+        <input id="params" name="params" class="form-control" type="text"/>
+    </div>
+</div>
+
+</html>

+ 4 - 0
dbsyncer-web/src/main/resources/public/mapping/editTableGroup.html

@@ -163,6 +163,9 @@
                             </h4>
                         </div>
                         <div id="tableGroupSuperConfig" class="panel-body panel-collapse collapse">
+                            <!-- 参数配置 -->
+                            <div th:replace="mapping/editParameter :: content"></div>
+
                             <!-- 过滤条件 -->
                             <div th:replace="mapping/editFilter :: content"></div>
 
@@ -181,5 +184,6 @@
 </div>
 
 <script th:src="@{/js/mapping/editTableGroup.js}"></script>
+<script th:src="@{/js/mapping/editParam.js}"></script>
 <script th:src="@{/js/mapping/editFilterAndConvert.js}"></script>
 </html>

+ 26 - 0
dbsyncer-web/src/main/resources/static/js/mapping/editParam.js

@@ -0,0 +1,26 @@
+// 绑定高级配置参数配置
+function bingMappingParamsInputClick(){
+    var $paramsList = $("#paramsList");
+    var $inputs = $paramsList.find("input");
+    $inputs.unbind();
+    genMappingParams($inputs);
+    $inputs.blur(function(){
+        genMappingParams($inputs);
+    });
+}
+
+// 生成参数配置
+function genMappingParams($inputs){
+    var params = {};
+    $inputs.each(function () {
+        if('' != $(this).val()){
+            params[$(this).attr("name")] = $(this).val();
+        }
+    })
+    $("#params").val(JSON.stringify(params));
+}
+
+$(function() {
+    // 绑定高级配置参数配置
+    bingMappingParamsInputClick();
+});