穿云 vor 4 Monaten
Ursprung
Commit
0eb72e8261

+ 1 - 4
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -166,10 +166,7 @@ public class DataSyncServiceImpl implements DataSyncService {
 
 
         // 转换为源字段
         // 转换为源字段
         final Picker picker = new Picker(tableGroup.getFieldMapping());
         final Picker picker = new Picker(tableGroup.getFieldMapping());
-        Map map = picker.pickSourceData(binlogData);
-        List<Field> sourceFields = picker.getSourceFields();
-        List<Object> changedRow = new ArrayList<>(sourceFields.size());
-        sourceFields.forEach(field -> changedRow.add(map.get(field.getName())));
+        List<Object> changedRow = picker.pickSourceData(binlogData);
         RowChangedEvent changedEvent = new RowChangedEvent(sourceTableName, event, changedRow);
         RowChangedEvent changedEvent = new RowChangedEvent(sourceTableName, event, changedRow);
 
 
         // 执行同步是否成功
         // 执行同步是否成功

+ 4 - 6
dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/IncrementPuller.java

@@ -20,10 +20,7 @@ import org.dbsyncer.sdk.enums.ListenerTypeEnum;
 import org.dbsyncer.sdk.listener.AbstractListener;
 import org.dbsyncer.sdk.listener.AbstractListener;
 import org.dbsyncer.sdk.listener.AbstractQuartzListener;
 import org.dbsyncer.sdk.listener.AbstractQuartzListener;
 import org.dbsyncer.sdk.listener.Listener;
 import org.dbsyncer.sdk.listener.Listener;
-import org.dbsyncer.sdk.model.ChangedOffset;
-import org.dbsyncer.sdk.model.ConnectorConfig;
-import org.dbsyncer.sdk.model.Table;
-import org.dbsyncer.sdk.model.TableGroupQuartzCommand;
+import org.dbsyncer.sdk.model.*;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -151,8 +148,9 @@ public final class IncrementPuller extends AbstractPuller implements Application
             List<TableGroupQuartzCommand> quartzCommands = list.stream().map(t -> {
             List<TableGroupQuartzCommand> quartzCommands = list.stream().map(t -> {
                 final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
                 final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
                 final Picker picker = new Picker(group.getFieldMapping());
                 final Picker picker = new Picker(group.getFieldMapping());
-                Assert.notEmpty(picker.getSourceFields(), "表字段映射关系不能为空:" + group.getSourceTable().getName() + " > " + group.getTargetTable().getName());
-                return new TableGroupQuartzCommand(t.getSourceTable(), picker.getSourceFields(), t.getCommand());
+                List<Field> fields = picker.getSourceFields();
+                Assert.notEmpty(fields, "表字段映射关系不能为空:" + group.getSourceTable().getName() + " > " + group.getTargetTable().getName());
+                return new TableGroupQuartzCommand(t.getSourceTable(), fields, t.getCommand());
             }).collect(Collectors.toList());
             }).collect(Collectors.toList());
             quartzListener.setCommands(quartzCommands);
             quartzListener.setCommands(quartzCommands);
         }
         }

+ 6 - 0
dbsyncer-parser/pom.xml

@@ -25,6 +25,12 @@
             <version>${project.parent.version}</version>
             <version>${project.parent.version}</version>
         </dependency>
         </dependency>
 
 
+        <dependency>
+            <groupId>com.scxhtb</groupId>
+            <artifactId>dbsyncer-parser-plus</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
         <dependency>
         <dependency>
             <groupId>com.github.jsqlparser</groupId>
             <groupId>com.github.jsqlparser</groupId>
             <artifactId>jsqlparser</artifactId>
             <artifactId>jsqlparser</artifactId>

+ 4 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -35,6 +35,7 @@ import org.springframework.util.Assert;
 
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import javax.annotation.Resource;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -138,11 +139,10 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
             return;
             return;
         }
         }
 
 
-        final Picker picker = new Picker(group.getFieldMapping());
-
-        final List<Map> sourceDataList = null; //response.getDataList();
         // 2、映射字段
         // 2、映射字段
-        List<Map> targetDataList = picker.pickTargetData(sourceDataList);
+        final Picker picker = new Picker(group.getFieldMapping());
+        List<Map> sourceDataList = new ArrayList<>();
+        List<Map> targetDataList = picker.pickTargetData(response.getDataList(), sourceDataList);
 
 
         // 3、参数转换
         // 3、参数转换
         ConvertUtil.convert(group.getConvert(), targetDataList);
         ConvertUtil.convert(group.getConvert(), targetDataList);

+ 0 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/ParserComponentImpl.java

@@ -181,7 +181,6 @@ public class ParserComponentImpl implements ParserComponent {
             pluginFactory.process(group.getPlugin(), context, ProcessEnum.CONVERT);
             pluginFactory.process(group.getPlugin(), context, ProcessEnum.CONVERT);
 
 
             // 5、写入目标源
             // 5、写入目标源
-            context.setTargetFields(picker.getTargetFields());
             Result result = writeBatch(context, executor);
             Result result = writeBatch(context, executor);
 
 
             // 6、更新结果
             // 6、更新结果

+ 41 - 17
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -28,25 +28,61 @@ public class Picker {
     public List<Map> pickTargetData(List<Map> source) {
     public List<Map> pickTargetData(List<Map> source) {
         List<Map> targetMapList = new ArrayList<>();
         List<Map> targetMapList = new ArrayList<>();
         if (!CollectionUtils.isEmpty(source)) {
         if (!CollectionUtils.isEmpty(source)) {
-            final int size = source.size();
             final int sFieldSize = sourceFields.size();
             final int sFieldSize = sourceFields.size();
             final int tFieldSize = targetFields.size();
             final int tFieldSize = targetFields.size();
             Map<String, Object> target = null;
             Map<String, Object> target = null;
-            for (int i = 0; i < size; i++) {
+            for (Map row : source) {
                 target = new HashMap<>();
                 target = new HashMap<>();
-                exchange(sFieldSize, tFieldSize, sourceFields, targetFields, source.get(i), target);
+                exchange(sFieldSize, tFieldSize, sourceFields, targetFields, row, target);
                 targetMapList.add(target);
                 targetMapList.add(target);
             }
             }
         }
         }
         return targetMapList;
         return targetMapList;
     }
     }
 
 
-    public Map pickSourceData(Map target) {
+    public List<Map> pickTargetData(List<List<Object>> rows, List<Map> sourceMapList) {
+        List<Map> targetMapList = new ArrayList<>();
+        if (!CollectionUtils.isEmpty(rows)) {
+            final int sFieldSize = sourceFields.size();
+            final int tFieldSize = targetFields.size();
+            Map<String, Object> source = null;
+            Map<String, Object> target = null;
+            List<Object> row = null;
+            List<Field> sFields = getFields(sourceFields);
+            for (int i = 0; i < rows.size(); i++) {
+                source = new HashMap<>();
+                target = new HashMap<>();
+                row = rows.get(i);
+                for (int j = 0; j < row.size(); i++) {
+                    source.put(sFields.get(j).getName(), row.get(j));
+                }
+                exchange(sFieldSize, tFieldSize, sourceFields, targetFields, source, target);
+                sourceMapList.add(source);
+                targetMapList.add(target);
+            }
+        }
+        return targetMapList;
+    }
+
+    public List<Object> pickSourceData(Map target) {
         Map<String, Object> source = new HashMap<>();
         Map<String, Object> source = new HashMap<>();
         if (!CollectionUtils.isEmpty(target)) {
         if (!CollectionUtils.isEmpty(target)) {
             exchange(targetFields.size(), sourceFields.size(), targetFields, sourceFields, target, source);
             exchange(targetFields.size(), sourceFields.size(), targetFields, sourceFields, target, source);
         }
         }
-        return source;
+
+        return getFields(sourceFields).stream().map(field -> source.get(field.getName())).collect(Collectors.toList());
+    }
+
+    public List<Field> getSourceFields() {
+        return getFields(sourceFields);
+    }
+
+    public List<Field> getTargetFields() {
+        return getFields(targetFields);
+    }
+
+    public Map<String, Field> getTargetFieldMap() {
+        return targetFields.stream().filter(Objects::nonNull).collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
     }
     }
 
 
     private void exchange(int sFieldSize, int tFieldSize, List<Field> sFields, List<Field> tFields, Map<String, Object> source, Map<String, Object> target) {
     private void exchange(int sFieldSize, int tFieldSize, List<Field> sFields, List<Field> tFields, Map<String, Object> source, Map<String, Object> target) {
@@ -86,16 +122,4 @@ public class Picker {
         return Collections.unmodifiableList(fields);
         return Collections.unmodifiableList(fields);
     }
     }
 
 
-    public List<Field> getSourceFields() {
-        return getFields(sourceFields);
-    }
-
-    public List<Field> getTargetFields() {
-        return getFields(targetFields);
-    }
-
-    public Map<String, Field> getTargetFieldMap() {
-        return targetFields.stream().filter(Objects::nonNull).collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
-    }
-
 }
 }

+ 3 - 4
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/model/TableGroupQuartzCommand.java

@@ -5,6 +5,7 @@ import org.dbsyncer.sdk.util.PrimaryKeyUtil;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 
 public class TableGroupQuartzCommand {
 public class TableGroupQuartzCommand {
 
 
@@ -20,7 +21,7 @@ public class TableGroupQuartzCommand {
         this.table = table;
         this.table = table;
         this.fields = fields;
         this.fields = fields;
         this.command = command;
         this.command = command;
-        this.primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(table);;
+        this.primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(table);
     }
     }
 
 
     public Table getTable() {
     public Table getTable() {
@@ -40,8 +41,6 @@ public class TableGroupQuartzCommand {
     }
     }
 
 
     public List<Object> getChangedRow(Map<String, Object> row) {
     public List<Object> getChangedRow(Map<String, Object> row) {
-        List<Object> changedRow = new ArrayList<>(fields.size());
-        fields.forEach(field -> changedRow.add(row.get(field.getName())));
-        return changedRow;
+        return fields.stream().map(field -> row.get(field.getName())).collect(Collectors.toList());
     }
     }
 }
 }