浏览代码

rowid改造方案

AE86 4 年之前
父节点
当前提交
e1cbd5aef0

+ 2 - 6
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/ConnectorConfigChecker.java

@@ -1,11 +1,9 @@
 package org.dbsyncer.biz.checker;
 
-import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
 
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -16,14 +14,12 @@ import java.util.Map;
 public interface ConnectorConfigChecker {
 
     /**
-     * 设置默认字段
+     * 处理增量同步策略
      *
      * @param mapping
      * @param tableGroup
-     * @param column
-     * @param isSourceTable
      */
-    default void updateFields(Mapping mapping, TableGroup tableGroup, List<Field> column, boolean isSourceTable) {}
+    default void dealIncrementStrategy(Mapping mapping, TableGroup tableGroup) {}
 
     /**
      * 修改配置

+ 12 - 25
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/OracleConfigChecker.java

@@ -1,9 +1,11 @@
 package org.dbsyncer.biz.checker.impl.connector;
 
 import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
@@ -22,38 +24,23 @@ public class OracleConfigChecker extends AbstractDataBaseConfigChecker {
      */
     private static final String ROW_ID_NAME = "DBSYNCER_ROWID";
 
+    @Autowired
+    private Manager manager;
+
     @Override
-    public void updateFields(Mapping mapping, TableGroup tableGroup, List<Field> column, boolean isSourceTable) {
+    public void dealIncrementStrategy(Mapping mapping, TableGroup tableGroup) {
+        /**
+         * 1、增量同步时,目标源必须有一个主键字段用于接收ROW_ID值。
+         * 2、全局可配置目标源ROW_ID字段名称,默认为ROW_ID_NAME。
+         * 3、如果配置了接收字段,添加字段映射关系[ROW_ID_NAME]> [ROW_ID_NAME],并将ROW_ID_NAME字段设置为目标源的唯一主键。
+         * 4、全量同步时,ROW_ID_NAME参数非必须。
+         */
         // TODO mapping.enableRowId
         boolean enableRowId = false;
         if(!enableRowId){
             return;
         }
 
-        // TODO source   Oralce >> Oracle   Oralce >> Mysql   Mysql >> Oracle
-        if (isSourceTable) {
-            List<Field> list = new ArrayList<>();
-            list.add(getSourceField());
-            list.addAll(column);
-            column.clear();
-            column.addAll(list);
-            return;
-        }
-
-        // Target
-        // TODO mapping.rowIdName
-        String rowIdName = "RID";
-        // Oracle 默认加上ROWID名称,设置为唯一主键
-        column.parallelStream().forEach(f -> f.setPk(false));
-        Field targetField = new Field(rowIdName, "VARCHAR2", 12, true);
-        List<Field> list = new ArrayList<>();
-        list.add(targetField);
-        list.addAll(column);
-        column.clear();
-        column.addAll(list);
-
-        // 添加默认ROWID映射关系
-        tableGroup.getFieldMapping().add(new FieldMapping(getSourceField(), targetField));
     }
 
     private Field getSourceField() {

+ 7 - 15
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.biz.checker.impl.mapping;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.dbsyncer.biz.checker.AbstractChecker;
 import org.dbsyncer.biz.checker.MappingConfigChecker;
@@ -86,19 +85,12 @@ public class MappingChecker extends AbstractChecker {
 
         // 同步方式(仅支持全量或增量同步方式)
         String model = params.get("model");
-        if (StringUtils.isNotBlank(model)) {
-            if (null != ModelEnum.getModelEnum(model)) {
-                mapping.setModel(model);
-            }
-        }
+        mapping.setModel(null != ModelEnum.getModelEnum(model) ? model : ModelEnum.FULL.getCode());
 
         // 全量配置
-        String readNum = params.get("readNum");
-        mapping.setReadNum(NumberUtils.toInt(readNum, mapping.getReadNum()));
-        String threadNum = params.get("threadNum");
-        mapping.setThreadNum(NumberUtils.toInt(threadNum, mapping.getThreadNum()));
-        String batchNum = params.get("batchNum");
-        mapping.setBatchNum(NumberUtils.toInt(batchNum, mapping.getBatchNum()));
+        mapping.setReadNum(NumberUtils.toInt(params.get("readNum"), mapping.getReadNum()));
+        mapping.setThreadNum(NumberUtils.toInt(params.get("threadNum"), mapping.getThreadNum()));
+        mapping.setBatchNum(NumberUtils.toInt(params.get("batchNum"), mapping.getBatchNum()));
 
         // 增量配置(日志/定时)
         String incrementStrategy = params.get("incrementStrategy");
@@ -112,7 +104,7 @@ public class MappingChecker extends AbstractChecker {
         this.modifySuperConfigModel(mapping, params);
 
         // 更新映射关系过滤条件
-        setFilterCommand(mapping);
+        batchUpdateTableGroupCommand(mapping);
 
         // 更新meta
         updateMeta(mapping);
@@ -143,11 +135,11 @@ public class MappingChecker extends AbstractChecker {
      *
      * @param mapping
      */
-    private void setFilterCommand(Mapping mapping) {
+    private void batchUpdateTableGroupCommand(Mapping mapping) {
         List<TableGroup> groupAll = manager.getTableGroupAll(mapping.getId());
         if (!CollectionUtils.isEmpty(groupAll)) {
             for (TableGroup g : groupAll) {
-                tableGroupChecker.setCommand(mapping, g);
+                tableGroupChecker.genCommand(mapping, g);
                 manager.editTableGroup(g);
             }
         }

+ 17 - 14
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -65,17 +65,20 @@ public class TableGroupChecker extends AbstractChecker {
         tableGroup.setName(ConfigConstant.TABLE_GROUP);
         tableGroup.setType(ConfigConstant.TABLE_GROUP);
         tableGroup.setMappingId(mappingId);
-        tableGroup.setSourceTable(getTable(mapping, tableGroup, sourceTable, true));
-        tableGroup.setTargetTable(getTable(mapping, tableGroup, targetTable, false));
+        tableGroup.setSourceTable(getTable(mapping.getSourceConnectorId(), sourceTable));
+        tableGroup.setTargetTable(getTable(mapping.getTargetConnectorId(), targetTable));
 
         // 修改基本配置
         this.modifyConfigModel(tableGroup, params);
 
+        // 处理策略
+        dealIncrementStrategy(mapping, tableGroup);
+
         // 匹配相似字段
         mergeFieldMapping(tableGroup);
 
         // 生成command
-        setCommand(mapping, tableGroup);
+        genCommand(mapping, tableGroup);
 
         return tableGroup;
     }
@@ -102,12 +105,12 @@ public class TableGroupChecker extends AbstractChecker {
         this.modifySuperConfigModel(tableGroup, params);
 
         // 生成command
-        setCommand(mapping, tableGroup);
+        genCommand(mapping, tableGroup);
 
         return tableGroup;
     }
 
-    public void setCommand(Mapping mapping, TableGroup tableGroup) {
+    public void genCommand(Mapping mapping, TableGroup tableGroup) {
         TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
 
         Map<String, String> command = manager.getCommand(mapping, group);
@@ -118,17 +121,17 @@ public class TableGroupChecker extends AbstractChecker {
         tableGroup.getSourceTable().setCount(count);
     }
 
-    private Table getTable(Mapping mapping, TableGroup tableGroup, String tableName, boolean isSourceTable) {
-        String connectorId = isSourceTable ? mapping.getSourceConnectorId() : mapping.getTargetConnectorId();
-        MetaInfo metaInfo = manager.getMetaInfo(connectorId, tableName);
-        Assert.notNull(metaInfo, "无法获取连接器表信息.");
-
-        String connectorType = manager.getConnector(connectorId).getConfig().getConnectorType();
+    public void dealIncrementStrategy(Mapping mapping, TableGroup tableGroup) {
+        String connectorType = manager.getConnector(mapping.getSourceConnectorId()).getConfig().getConnectorType();
         String type = StringUtil.toLowerCaseFirstOne(connectorType).concat("ConfigChecker");
         ConnectorConfigChecker checker = map.get(type);
         Assert.notNull(checker, "Checker can not be null.");
-        // TODO 暂时实现
-        checker.updateFields(mapping, tableGroup, metaInfo.getColumn(), isSourceTable);
+        checker.dealIncrementStrategy(mapping, tableGroup);
+    }
+
+    private Table getTable(String connectorId, String tableName) {
+        MetaInfo metaInfo = manager.getMetaInfo(connectorId, tableName);
+        Assert.notNull(metaInfo, "无法获取连接器表信息.");
         return new Table().setName(tableName).setColumn(metaInfo.getColumn());
     }
 
@@ -139,7 +142,7 @@ public class TableGroupChecker extends AbstractChecker {
                 // 数据源表和目标表都存在
                 if (StringUtils.equals(sourceTable, g.getSourceTable().getName())
                         && StringUtils.equals(targetTable, g.getTargetTable().getName())) {
-                    final String error = String.format("映射关系已存在.", sourceTable, targetTable);
+                    final String error = String.format("映射关系已存在.%s > %s", sourceTable, targetTable);
                     logger.error(error);
                     throw new BizException(error);
                 }