Kaynağa Gözat

改造rowid

AE86 4 yıl önce
ebeveyn
işleme
1932ae4f24

+ 5 - 4
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/AbstractChecker.java

@@ -19,10 +19,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.Assert;
 
 import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * @author AE86
@@ -66,6 +63,10 @@ public abstract class AbstractChecker implements Checker {
      * @param params
      */
     protected void modifySuperConfigModel(AbstractConfigModel model, Map<String, String> params) {
+        // 全局参数
+        String mappingParams = params.get("params");
+        model.setParams(StringUtils.isNotBlank(mappingParams) ? JsonUtil.jsonToObj(mappingParams, Map.class) : new LinkedHashMap());
+
         // 过滤条件
         String filterJson = params.get("filter");
         if (StringUtils.isNotBlank(filterJson)) {

+ 36 - 14
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/OracleConfigChecker.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.biz.checker.impl.connector;
 
+import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.model.FieldMapping;
@@ -8,10 +10,15 @@ import org.dbsyncer.parser.model.TableGroup;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
+ * <p>1、增量同步时,目标源必须有一个主键字段用于接收ROW_ID值。</p>
+ * <p>2、全局可配置目标源ROW_ID字段名称,默认为ROW_ID_NAME。 </p>
+ * <p>3、如果配置了接收字段,添加字段映射关系[ROW_ID_NAME] > [ROW_ID_NAME],并将ROW_ID_NAME字段设置为目标源的唯一主键。</p>
+ * <p>4、全量同步时,ROW_ID_NAME参数非必须。</p>
+ *
  * @author AE86
  * @version 1.0.0
  * @date 2020/1/8 15:17
@@ -22,28 +29,43 @@ public class OracleConfigChecker extends AbstractDataBaseConfigChecker {
     /**
      * 默认ROWID列名称
      */
-    private static final String ROW_ID_NAME = "DBSYNCER_ROWID";
+    private static final String ROW_ID_NAME = "ORACLE_ROW_ID";
+    private static final String ROW_ID      = "ROWIDTOCHAR(ROWID) as " + ROW_ID_NAME;
 
     @Autowired
     private Manager manager;
 
     @Override
     public void dealIncrementStrategy(Mapping mapping, TableGroup tableGroup) {
-        /**
-         * 1、增量同步时,目标源必须有一个主键字段用于接收ROW_ID值。
-         * 2、全局可配置目标源ROW_ID字段名称,默认为ROW_ID_NAME。
-         * 3、如果配置了接收字段,添加字段映射关系[ROW_ID_NAME]> [ROW_ID_NAME],并将ROW_ID_NAME字段设置为目标源的唯一主键。
-         * 4、全量同步时,ROW_ID_NAME参数非必须。
-         */
-        // TODO mapping.enableRowId
-        boolean enableRowId = false;
-        if(!enableRowId){
+        // TODO 模拟测试
+        Map<String, String> params = mapping.getParams();
+        mapping.getParams().put(ROW_ID_NAME, "RID");
+
+        // 没有定义目标源ROW_ID字段
+        if (CollectionUtils.isEmpty(mapping.getParams()) || !params.containsKey(ROW_ID_NAME)) {
             return;
         }
 
-    }
+        // 检查是否更新
+        String targetRowIdName = params.get(ROW_ID_NAME);
+        for (FieldMapping m : tableGroup.getFieldMapping()) {
+            if (StringUtils.equals(m.getSource().getName(), ROW_ID)) {
+                m.getTarget().setName(targetRowIdName);
+                m.getTarget().setPk(true);
+                return;
+            }
+        }
 
-    private Field getSourceField() {
-        return new Field(ROW_ID_NAME, "VARCHAR2", 12, false, true);
+        List<Field> sourceColumn = tableGroup.getSourceTable().getColumn();
+        Field sourceField = new Field(ROW_ID, "VARCHAR2", 12, false, true);
+        sourceColumn.add(0, sourceField);
+
+        List<Field> targetColumn = tableGroup.getTargetTable().getColumn();
+        targetColumn.parallelStream().forEach(f -> f.setPk(false));
+        Field targetField = new Field(targetRowIdName, "VARCHAR2", 12, true, false);
+        targetColumn.add(0, targetField);
+
+        tableGroup.getFieldMapping().add(0, new FieldMapping(sourceField, targetField));
     }
+
 }

+ 9 - 8
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -71,12 +71,12 @@ public class TableGroupChecker extends AbstractChecker {
         // 修改基本配置
         this.modifyConfigModel(tableGroup, params);
 
+        // 匹配相似字段映射关系
+        mergeFieldMapping(tableGroup);
+
         // 处理策略
         dealIncrementStrategy(mapping, tableGroup);
 
-        // 匹配相似字段
-        mergeFieldMapping(tableGroup);
-
         // 生成command
         genCommand(mapping, tableGroup);
 
@@ -98,11 +98,14 @@ public class TableGroupChecker extends AbstractChecker {
         // 修改基本配置
         this.modifyConfigModel(tableGroup, params);
 
+        // 修改高级配置:过滤条件/转换配置/插件配置
+        this.modifySuperConfigModel(tableGroup, params);
+
         // 字段映射关系
         setFieldMapping(tableGroup, fieldMappingJson);
 
-        // 修改高级配置:过滤条件/转换配置/插件配置
-        this.modifySuperConfigModel(tableGroup, params);
+        // 处理策略
+        dealIncrementStrategy(mapping, tableGroup);
 
         // 生成command
         genCommand(mapping, tableGroup);
@@ -167,11 +170,9 @@ public class TableGroupChecker extends AbstractChecker {
         k1.retainAll(k2);
 
         // 有相似字段
-        List<FieldMapping> fields = new ArrayList<>();
         if (!CollectionUtils.isEmpty(k1)) {
-            k1.forEach(k -> fields.add(new FieldMapping(m1.get(k), m2.get(k))));
+            k1.forEach(k -> tableGroup.getFieldMapping().add(new FieldMapping(m1.get(k), m2.get(k))));
         }
-        tableGroup.getFieldMapping().addAll(fields);
     }
 
     private void shuffleColumn(List<Field> col, List<String> key, Map<String, Field> map) {

+ 13 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/AbstractConfigModel.java

@@ -4,9 +4,13 @@ import org.dbsyncer.connector.config.Filter;
 import org.dbsyncer.plugin.config.Plugin;
 
 import java.util.List;
+import java.util.Map;
 
 public abstract class AbstractConfigModel extends ConfigModel {
 
+    // 全局参数
+    private Map<String, String> params;
+
     // 过滤条件
     private List<Filter> filter;
 
@@ -16,6 +20,15 @@ public abstract class AbstractConfigModel extends ConfigModel {
     // 插件配置
     private Plugin plugin;
 
+    public Map<String, String> getParams() {
+        return params;
+    }
+
+    public AbstractConfigModel setParams(Map<String, String> params) {
+        this.params = params;
+        return this;
+    }
+
     public List<Filter> getFilter() {
         return filter;
     }

+ 3 - 0
dbsyncer-parser/src/main/resources/Mapping.json

@@ -22,6 +22,9 @@
       "pk":true
     }
   ],
+  "params":{
+    "ORACLE_ROW_ID":"RID"
+  },
   "filter": [
     {
       "name": "AGE",

+ 3 - 0
dbsyncer-parser/src/main/resources/TableGroup.json

@@ -49,6 +49,9 @@
     "UPDATE":"UPDATE MY_USER set name=? where MY_USER.id=?",
     "DELETE":"DELETE MY_USER WHERE MY_USER.id=?"
   },
+  "params":{
+    "ORACLE_ROW_ID":"RID"
+  },
   "filter": [
     {
       "name": "AGE",