瀏覽代碼

支持合并值转换

AE86 2 年之前
父節點
當前提交
98ee82bd08

+ 4 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java

@@ -72,6 +72,10 @@ public abstract class StringUtil {
         return StringUtils.startsWith(str, prefix);
     }
 
+    public static String toString(Object obj) {
+        return obj == null ? "" : String.valueOf(obj);
+    }
+
     /**
      * 首字母转小写
      *

+ 3 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -86,10 +86,10 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         final String sourceTableName = group.getSourceTable().getName();
         final String targetTableName = group.getTargetTable().getName();
         final String event = response.getEvent();
+        final Picker picker = new Picker(group.getFieldMapping());
         final List<Map> sourceDataList = response.getDataList();
 
         // 2、映射字段
-        final Picker picker = new Picker(group.getFieldMapping());
         List<Map> targetDataList = picker.pickData(sourceDataList);
 
         // 3、参数转换
@@ -102,8 +102,8 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         pluginFactory.convert(group.getPlugin(), context);
 
         // 5、批量执行同步
-        Result result = parserFactory.writeBatch(context, new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event,
-                picker.getTargetFields(), targetDataList, bufferActuatorConfig.getWriterBatchCount()));
+        BatchWriter batchWriter = new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, bufferActuatorConfig.getWriterBatchCount());
+        Result result = parserFactory.writeBatch(context, batchWriter);
 
         // 6、持久化同步结果
         result.setTableGroupId(tableGroup.getId());

+ 32 - 13
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -1,26 +1,32 @@
 package org.dbsyncer.parser.model;
 
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.model.Field;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class Picker {
 
-    private List<Field> sourceFields;
-    private List<Field> targetFields;
+    private List<Field> sourceFields = new ArrayList<>();
+    private List<Field> targetFields = new ArrayList<>();
 
     public Picker(List<FieldMapping> fieldMapping) {
-        sourceFields = new ArrayList<>();
-        targetFields = new ArrayList<>();
         if (!CollectionUtils.isEmpty(fieldMapping)) {
             fieldMapping.forEach(m -> {
-                sourceFields.add(m.getSource());
-                targetFields.add(m.getTarget());
+                if (m.getSource() != null) {
+                    sourceFields.add(m.getSource());
+                }
+                if (m.getTarget() != null) {
+                    targetFields.add(m.getTarget());
+                }
             });
         }
     }
@@ -45,25 +51,38 @@ public class Picker {
         Field sField = null;
         Field tField = null;
         Object v = null;
+        String tFieldName = null;
         for (int k = 0; k < sFieldSize; k++) {
             sField = sFields.get(k);
             tField = tFields.get(k);
             if (null != sField && null != tField) {
                 v = source.get(sField.isUnmodifiabled() ? sField.getLabelName() : sField.getName());
-                target.put(tField.getName(), v);
+                tFieldName = tField.getName();
+                // 映射值
+                if (!target.containsKey(tFieldName)) {
+                    target.put(tFieldName, v);
+                    continue;
+                }
+                // 合并值
+                String mergedValue = new StringBuilder(StringUtil.toString(target.get(tFieldName))).append(StringUtil.toString(v)).toString();
+                target.put(tFieldName, mergedValue);
             }
         }
     }
 
-    public List<Field> getSourceFields() {
-        return sourceFields.stream().filter(f -> null != f).collect(Collectors.toList());
-    }
-
     public List<Field> getTargetFields() {
-        return targetFields.stream().filter(f -> null != f).collect(Collectors.toList());
+        List<Field> fields = new ArrayList<>();
+        Set<String> keys = new HashSet<>();
+        targetFields.forEach(f -> {
+            if (!keys.contains(f.getName())) {
+                fields.add(f);
+                keys.add(f.getName());
+            }
+        });
+        return Collections.unmodifiableList(fields);
     }
 
     public Map<String, Field> getSourceFieldMap() {
-        return getSourceFields().stream().collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
+        return sourceFields.stream().filter(f -> null != f).collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
     }
 }