AE86 5 年之前
父节点
当前提交
9161520440

+ 4 - 57
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -5,7 +5,6 @@ import org.dbsyncer.biz.BizException;
 import org.dbsyncer.biz.checker.AbstractChecker;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.Field;
-import org.dbsyncer.connector.config.Filter;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.enums.ConnectorEnum;
@@ -14,6 +13,7 @@ import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.json.JSONArray;
 import org.json.JSONException;
@@ -102,15 +102,7 @@ public class TableGroupChecker extends AbstractChecker {
     }
 
     public void setCommand(Mapping mapping, TableGroup tableGroup) {
-        TableGroup group = new TableGroup();
-        group.setFieldMapping(new ArrayList<>(tableGroup.getFieldMapping()));
-        group.setSourceTable(tableGroup.getSourceTable());
-        group.setTargetTable(tableGroup.getTargetTable());
-        // 默认使用全局的过滤条件
-        group.setFilter(CollectionUtils.isEmpty(tableGroup.getFilter()) ? mapping.getFilter() : tableGroup.getFilter());
-
-        // 添加增量配置事件和过滤条件字段
-        appendEventAndFilter(mapping, group);
+        TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
 
         Map<String, String> command = manager.getCommand(mapping, group);
         tableGroup.setCommand(command);
@@ -152,45 +144,6 @@ public class TableGroupChecker extends AbstractChecker {
         }
     }
 
-    private void appendEventAndFilter(Mapping mapping, TableGroup group) {
-        final List<FieldMapping> fieldMapping = group.getFieldMapping();
-
-        // 检查增量字段是否在映射关系中
-        String eventFieldName = mapping.getListener().getEventFieldName();
-        if (StringUtils.isNotBlank(eventFieldName)) {
-            Map<String, Field> fields = convert2Map(group.getSourceTable().getColumn());
-            addFieldMapping(fieldMapping, eventFieldName, fields, true);
-        }
-
-        // 检查过滤条件是否在映射关系中
-        List<Filter> filter = group.getFilter();
-        if (!CollectionUtils.isEmpty(filter)) {
-            Map<String, Field> fields = convert2Map(group.getSourceTable().getColumn());
-            filter.forEach(f -> addFieldMapping(fieldMapping, f.getName(), fields, true));
-        }
-
-    }
-
-    private void addFieldMapping(List<FieldMapping> fieldMapping, String name, Map<String, Field> fields, boolean checkSource) {
-        if (StringUtils.isNotBlank(name)) {
-            boolean exist = false;
-            for (FieldMapping m : fieldMapping) {
-                Field f = checkSource ? m.getSource() : m.getTarget();
-                if (null == f) {
-                    continue;
-                }
-                if (StringUtils.equals(f.getName(), name)) {
-                    exist = true;
-                    break;
-                }
-            }
-            if (!exist && null != fields.get(name)) {
-                FieldMapping fm = checkSource ? new FieldMapping(fields.get(name), null) : new FieldMapping(null, fields.get(name));
-                fieldMapping.add(fm);
-            }
-        }
-    }
-
     private void mergeFieldMapping(TableGroup tableGroup) {
         List<Field> sCol = tableGroup.getSourceTable().getColumn();
         List<Field> tCol = tableGroup.getTargetTable().getColumn();
@@ -238,8 +191,8 @@ public class TableGroupChecker extends AbstractChecker {
                 throw new BizException("映射关系不能为空");
             }
 
-            final Map<String, Field> sMap = convert2Map(tableGroup.getSourceTable().getColumn());
-            final Map<String, Field> tMap = convert2Map(tableGroup.getTargetTable().getColumn());
+            final Map<String, Field> sMap = PickerUtil.convert2Map(tableGroup.getSourceTable().getColumn());
+            final Map<String, Field> tMap = PickerUtil.convert2Map(tableGroup.getTargetTable().getColumn());
             int length = mapping.length();
             List<FieldMapping> list = new ArrayList<>();
             JSONObject row = null;
@@ -265,10 +218,4 @@ public class TableGroupChecker extends AbstractChecker {
         }
     }
 
-    private Map<String, Field> convert2Map(List<Field> col) {
-        final Map<String, Field> map = new HashMap<>();
-        col.forEach(f -> map.put(f.getName(), f));
-        return map;
-    }
-
 }

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/FieldPicker.java

@@ -107,7 +107,7 @@ public class FieldPicker {
         Assert.notEmpty(fieldMapping, "映射关系不能为空.");
 
         // 找到同步字段 => [{source.name}]
-        Set<String> key = fieldMapping.stream().map(m -> m.getSource().getName()).collect(Collectors.toSet());
+        Set<String> key = fieldMapping.stream().filter(m -> null != m.getSource()).map(m -> m.getSource().getName()).collect(Collectors.toSet());
 
         // 记录字段索引 [{"ID":0},{"NAME":1}]
         index = new LinkedList<>();

+ 4 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -22,6 +22,7 @@ import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
@@ -228,7 +229,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
             this.mapping = mapping;
             this.metaId = mapping.getMetaId();
             this.tablePicker = new LinkedList<>();
-            list.forEach(t -> tablePicker.add(new FieldPicker(t)));
+            list.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
         }
 
         @Override
@@ -277,7 +278,8 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
                 final Table table = t.getSourceTable();
                 final String tableName = table.getName();
                 tablePicker.putIfAbsent(tableName, new ArrayList<>());
-                tablePicker.get(tableName).add(new FieldPicker(t, t.getFilter(), table.getColumn(), t.getFieldMapping()));
+                TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
+                tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
             });
         }
 

+ 13 - 23
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -20,7 +20,6 @@ import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
-import org.dbsyncer.plugin.config.Plugin;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.slf4j.Logger;
@@ -94,10 +93,10 @@ public class ParserFactory implements Parser {
         Table sTable = new Table().setName(sTableName).setColumn(new ArrayList<>());
         Table tTable = new Table().setName(tTableName).setColumn(new ArrayList<>());
         fieldMapping.forEach(m -> {
-            if(null != m.getSource()){
+            if (null != m.getSource()) {
                 sTable.getColumn().add(m.getSource());
             }
-            if(null != m.getTarget()){
+            if (null != m.getTarget()) {
                 tTable.getColumn().add(m.getTarget());
             }
         });
@@ -181,16 +180,13 @@ public class ParserFactory implements Parser {
         Assert.notNull(sConfig, "数据源配置不能为空.");
         ConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
         Assert.notNull(tConfig, "目标源配置不能为空.");
-        Map<String, String> command = tableGroup.getCommand();
+        TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
+        Map<String, String> command = group.getCommand();
         Assert.notEmpty(command, "执行命令不能为空.");
-        List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
-        String sTableName = tableGroup.getSourceTable().getName();
-        String tTableName = tableGroup.getTargetTable().getName();
+        List<FieldMapping> fieldMapping = group.getFieldMapping();
+        String sTableName = group.getSourceTable().getName();
+        String tTableName = group.getTargetTable().getName();
         Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
-        // 转换配置(默认使用全局)
-        List<Convert> convert = CollectionUtils.isEmpty(tableGroup.getConvert()) ? mapping.getConvert() : tableGroup.getConvert();
-        // 插件配置(默认使用全局)
-        Plugin plugin = null == tableGroup.getPlugin() ? mapping.getPlugin() : tableGroup.getPlugin();
         // 获取同步字段
         Picker picker = new Picker();
         PickerUtil.pickFields(picker, fieldMapping);
@@ -223,10 +219,10 @@ public class ParserFactory implements Parser {
 
             // 3、参数转换
             List<Map<String, Object>> target = picker.getTargetList();
-            ConvertUtil.convert(convert, target);
+            ConvertUtil.convert(group.getConvert(), target);
 
             // 4、插件转换
-            pluginFactory.convert(plugin, data, target);
+            pluginFactory.convert(group.getPlugin(), data, target);
 
             // 5、写入目标源
             Result writer = writeBatch(tConfig, command, picker.getTargetFields(), target, threadSize, batchSize);
@@ -245,15 +241,9 @@ public class ParserFactory implements Parser {
         final String metaId = mapping.getMetaId();
 
         ConnectorConfig tConfig = getConnectorConfig(mapping.getTargetConnectorId());
-        Map<String, String> command = tableGroup.getCommand();
-        List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
-        // 转换配置(默认使用全局)
-        List<Convert> convert = CollectionUtils.isEmpty(tableGroup.getConvert()) ? mapping.getConvert() : tableGroup.getConvert();
-        // 插件配置(默认使用全局)
-        Plugin plugin = null == tableGroup.getPlugin() ? mapping.getPlugin() : tableGroup.getPlugin();
         // 获取同步字段
         Picker picker = new Picker();
-        PickerUtil.pickFields(picker, fieldMapping);
+        PickerUtil.pickFields(picker, tableGroup.getFieldMapping());
 
         // 1、映射字段
         String event = dataEvent.getEvent();
@@ -262,13 +252,13 @@ public class ParserFactory implements Parser {
 
         // 2、参数转换
         Map<String, Object> target = picker.getTarget();
-        ConvertUtil.convert(convert, target);
+        ConvertUtil.convert(tableGroup.getConvert(), target);
 
         // 3、插件转换
-        pluginFactory.convert(plugin, event, data, target);
+        pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
 
         // 4、写入目标源
-        Result writer = connectorFactory.writer(tConfig, picker.getTargetFields(), command, event, target);
+        Result writer = connectorFactory.writer(tConfig, picker.getTargetFields(), tableGroup.getCommand(), event, target);
 
         // 5、更新结果
         List<Map<String, Object>> list = new ArrayList<>(1);

+ 2 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/DefaultTimestampHandler.java

@@ -3,6 +3,7 @@ package org.dbsyncer.parser.convert.handler;
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.parser.convert.Handler;
 
+import java.sql.Timestamp;
 import java.time.Instant;
 
 /**
@@ -14,6 +15,6 @@ public class DefaultTimestampHandler implements Handler {
 
     @Override
     public Object handle(String args, Object value) {
-        return null == value || StringUtils.isBlank(String.valueOf(value)) ? Instant.now().toEpochMilli() : value;
+        return null == value || StringUtils.isBlank(String.valueOf(value)) ? new Timestamp(Instant.now().toEpochMilli()) : value;
     }
 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/logger/LogServiceImpl.java

@@ -22,7 +22,7 @@ public class LogServiceImpl implements LogService {
 
     @Override
     public void log(LogType logType, String msg) {
-        flushService.asyncWrite(logType.getType(), msg);
+        flushService.asyncWrite(logType.getType(), null == msg ? logType.getMessage() : msg);
     }
 
     @Override

+ 92 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/PickerUtil.java

@@ -1,9 +1,11 @@
 package org.dbsyncer.parser.util;
 
+import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.Field;
-import org.dbsyncer.parser.model.FieldMapping;
-import org.dbsyncer.parser.model.Picker;
+import org.dbsyncer.connector.config.Filter;
+import org.dbsyncer.parser.model.*;
+import org.springframework.beans.BeanUtils;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -15,6 +17,37 @@ public abstract class PickerUtil {
     private PickerUtil() {
     }
 
+    /**
+     * 合并过滤条件、转换配置、插件配置、目标源字段、数据源字段
+     *
+     * @param mapping
+     * @param tableGroup
+     */
+    public static TableGroup mergeTableGroupConfig(Mapping mapping, TableGroup tableGroup) {
+        TableGroup group = new TableGroup();
+        List<FieldMapping> fm = new ArrayList<>();
+        tableGroup.getFieldMapping().forEach(f -> {
+            FieldMapping m = new FieldMapping();
+            BeanUtils.copyProperties(f, m);
+            fm.add(m);
+        });
+        group.setFieldMapping(fm);
+        group.setSourceTable(tableGroup.getSourceTable());
+        group.setTargetTable(tableGroup.getTargetTable());
+        group.setCommand(tableGroup.getCommand());
+
+        // 过滤条件(默认使用全局)
+        group.setFilter(CollectionUtils.isEmpty(tableGroup.getFilter()) ? mapping.getFilter() : tableGroup.getFilter());
+        // 转换配置(默认使用全局)
+        group.setConvert(CollectionUtils.isEmpty(tableGroup.getConvert()) ? mapping.getConvert() : tableGroup.getConvert());
+        // 插件配置(默认使用全局)
+        group.setPlugin(null == tableGroup.getPlugin() ? mapping.getPlugin() : tableGroup.getPlugin());
+
+        // 合并增量配置/过滤条件/转换配置字段
+        appendFieldMapping(mapping, group);
+        return group;
+    }
+
     public static void pickFields(Picker picker, List<FieldMapping> fieldMapping) {
         if (!CollectionUtils.isEmpty(fieldMapping)) {
             List<Field> sFields = new ArrayList<>();
@@ -60,15 +93,69 @@ public abstract class PickerUtil {
         }
     }
 
-    private static void exchange(int sFieldSize, List<Field> sFields, List<Field> tFields, Map<String, Object> source, Map<String, Object> target) {
+    public static Map<String, Field> convert2Map(List<Field> col) {
+        final Map<String, Field> map = new HashMap<>();
+        col.forEach(f -> map.put(f.getName(), f));
+        return map;
+    }
+
+    private static void exchange(int sFieldSize, List<Field> sFields, List<Field> tFields, Map<String, Object> source,
+                                 Map<String, Object> target) {
         Field sField = null;
+        Field tField = null;
         Object v = null;
         for (int k = 0; k < sFieldSize; k++) {
             sField = sFields.get(k);
-            if (null != sField) {
+            tField = tFields.get(k);
+            if (null != sField && null != tField) {
                 v = source.get(sField.getName());
+                target.put(tField.getName(), v);
+            }
+        }
+    }
+
+    private static void appendFieldMapping(Mapping mapping, TableGroup group) {
+        final List<FieldMapping> fieldMapping = group.getFieldMapping();
 
-                target.put(tFields.get(k).getName(), v);
+        // 检查增量字段是否在映射关系中
+        String eventFieldName = mapping.getListener().getEventFieldName();
+        if (StringUtils.isNotBlank(eventFieldName)) {
+            Map<String, Field> fields = convert2Map(group.getSourceTable().getColumn());
+            addFieldMapping(fieldMapping, eventFieldName, fields, true);
+        }
+
+        // 检查过滤条件是否在映射关系中
+        List<Filter> filter = group.getFilter();
+        if (!CollectionUtils.isEmpty(filter)) {
+            Map<String, Field> fields = convert2Map(group.getSourceTable().getColumn());
+            filter.forEach(f -> addFieldMapping(fieldMapping, f.getName(), fields, true));
+        }
+
+        // 检查转换配置是否在映射关系中
+        List<Convert> convert = group.getConvert();
+        if (!CollectionUtils.isEmpty(convert)) {
+            Map<String, Field> fields = convert2Map(group.getTargetTable().getColumn());
+            convert.forEach(c -> addFieldMapping(fieldMapping, c.getName(), fields, false));
+        }
+
+    }
+
+    private static void addFieldMapping(List<FieldMapping> fieldMapping, String name, Map<String, Field> fields, boolean checkSource) {
+        if (StringUtils.isNotBlank(name)) {
+            boolean exist = false;
+            for (FieldMapping m : fieldMapping) {
+                Field f = checkSource ? m.getSource() : m.getTarget();
+                if (null == f) {
+                    continue;
+                }
+                if (StringUtils.equals(f.getName(), name)) {
+                    exist = true;
+                    break;
+                }
+            }
+            if (!exist && null != fields.get(name)) {
+                FieldMapping fm = checkSource ? new FieldMapping(fields.get(name), null) : new FieldMapping(null, fields.get(name));
+                fieldMapping.add(fm);
             }
         }
     }