Ver código fonte

修复binlog反序列化

AE86 2 anos atrás
pai
commit
de15bc5237

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/AbstractWriterBinlog.java

@@ -68,7 +68,7 @@ public abstract class AbstractWriterBinlog extends AbstractBinlogRecorder<Writer
         Map<String, Object> data = new HashMap<>();
         try {
             final Picker picker = new Picker(tableGroup.getFieldMapping());
-            final Map<String, Field> fieldMap = picker.getTargetFieldMap();
+            final Map<String, Field> fieldMap = picker.getSourceFieldMap();
             message.getData().getRowMap().forEach((k, v) -> {
                 if (fieldMap.containsKey(k)) {
                     data.put(k, BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v));

+ 1 - 13
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -295,19 +295,7 @@ public class ParserFactory implements Parser {
     @Override
     public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event) {
         logger.debug("Table[{}] {}, data:{}", event.getSourceTableName(), event.getEvent(), event.getDataMap());
-
-        // 1、获取映射字段
-        final Picker picker = new Picker(tableGroup.getFieldMapping());
-        final Map target = picker.pickData(event.getDataMap());
-
-        // 2、参数转换
-        ConvertUtil.convert(tableGroup.getConvert(), target);
-
-        // 3、插件转换
-        pluginFactory.convert(tableGroup.getPlugin(), event.getEvent(), event.getDataMap(), target);
-
-        // 4、处理数据
-        parserStrategy.execute(tableGroup.getId(), event.getEvent(), target);
+        parserStrategy.execute(tableGroup.getId(), event.getEvent(), event.getDataMap());
     }
 
     /**

+ 25 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -7,16 +7,20 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.parser.ParserFactory;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.strategy.ParserStrategy;
+import org.dbsyncer.parser.util.ConvertUtil;
+import org.dbsyncer.plugin.PluginFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
+import java.util.List;
+import java.util.Map;
+
 /**
  * @author AE86
  * @version 1.0.0
@@ -31,6 +35,9 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Autowired
     private ParserFactory parserFactory;
 
+    @Autowired
+    private PluginFactory pluginFactory;
+
     @Autowired
     private FlushStrategy flushStrategy;
 
@@ -68,17 +75,28 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         final TableGroup tableGroup = cacheService.get(response.getTableGroupId(), TableGroup.class);
         final Mapping mapping = cacheService.get(tableGroup.getMappingId(), Mapping.class);
         final String targetTableName = tableGroup.getTargetTable().getName();
+        final String event = response.getEvent();
+        final List<Map> sourceDataList = response.getDataList();
+
+        // 2、映射字段
         final Picker picker = new Picker(tableGroup.getFieldMapping());
+        List<Map> targetDataList = picker.pickData(sourceDataList);
+
+        // 3、参数转换
+        ConvertUtil.convert(tableGroup.getConvert(), targetDataList);
+
+        // 4、插件转换
+        pluginFactory.convert(tableGroup.getPlugin(), event, sourceDataList, targetDataList);
 
-        // 2、批量执行同步
+        // 5、批量执行同步
         ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
-        Result result = parserFactory.writeBatch(new BatchWriter(targetConnectorMapper, tableGroup.getCommand(), targetTableName, response.getEvent(),
-                picker.getTargetFields(), response.getDataList(), bufferActuatorConfig.getWriterBatchCount()));
+        BatchWriter batchWriter = new BatchWriter(targetConnectorMapper, tableGroup.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, bufferActuatorConfig.getWriterBatchCount());
+        Result result = parserFactory.writeBatch(batchWriter);
 
-        // 3、持久化同步结果
-        flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
+        // 6、持久化同步结果
+        flushStrategy.flushIncrementData(mapping.getMetaId(), result, event);
 
-        // 4、消息处理完成
+        // 7、完成处理
         parserStrategy.complete(response.getMessageIds());
     }
 

+ 6 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -42,14 +42,6 @@ public class Picker {
         return targetMapList;
     }
 
-    public Map pickData(Map<String, Object> data) {
-        Map targetMap = new HashMap<>();
-        if (!CollectionUtils.isEmpty(data)) {
-            exchange(sourceFields.size(), sourceFields, targetFields, data, targetMap);
-        }
-        return targetMap;
-    }
-
     private void exchange(int sFieldSize, List<Field> sFields, List<Field> tFields, Map<String, Object> source,
                           Map<String, Object> target) {
         Field sField = null;
@@ -77,11 +69,15 @@ public class Picker {
         return primaryKey;
     }
 
+    public List<Field> getSourceFields() {
+        return sourceFields.stream().filter(f -> null != f).collect(Collectors.toList());
+    }
+
     public List<Field> getTargetFields() {
         return targetFields.stream().filter(f -> null != f).collect(Collectors.toList());
     }
 
-    public Map<String, Field> getTargetFieldMap() {
-        return getTargetFields().stream().collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
+    public Map<String, Field> getSourceFieldMap() {
+        return getSourceFields().stream().collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
     }
 }

+ 10 - 4
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java

@@ -96,15 +96,21 @@ public class PluginFactory {
         return Collections.unmodifiableList(plugins);
     }
 
-    public void convert(Plugin plugin, List<Map> source, List<Map> target) {
+    public void convert(Plugin plugin, List<Map> sourceList, List<Map> targetList) {
         if (null != plugin && service.containsKey(plugin.getClassName())) {
-            service.get(plugin.getClassName()).convert(new FullConvertContext(applicationContextProxy, source, target));
+            service.get(plugin.getClassName()).convert(new FullConvertContext(applicationContextProxy, sourceList, targetList));
         }
     }
 
-    public void convert(Plugin plugin, String event, Map<String, Object> source, Map<String, Object> target) {
+    public void convert(Plugin plugin, String event, List<Map> sourceList, List<Map> targetList) {
         if (null != plugin && service.containsKey(plugin.getClassName())) {
-            service.get(plugin.getClassName()).convert(new IncrementConvertContext(applicationContextProxy, event, source, target));
+            ConvertService convertService = service.get(plugin.getClassName());
+            int size = sourceList.size();
+            if(size == targetList.size()){
+                for (int i = 0; i < size; i++) {
+                    convertService.convert(new IncrementConvertContext(applicationContextProxy, event, sourceList.get(i), targetList.get(i)));
+                }
+            }
         }
     }