瀏覽代碼

记录错误表名

AE86 4 年之前
父節點
當前提交
a41dbfc475

+ 9 - 9
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -83,23 +83,23 @@ public class ConnectorFactory {
         return connector.getCount(config, command);
     }
 
-    public Result reader(ConnectorConfig config, Map<String, String> command, List<Object> args, int pageIndex, int pageSize) {
-        Connector connector = getConnector(config.getConnectorType());
-        Result result = connector.reader(new ReaderConfig(config, command, args, pageIndex, pageSize));
+    public Result reader(ReaderConfig config) {
+        Connector connector = getConnector(config.getConfig().getConnectorType());
+        Result result = connector.reader(config);
         Assert.notNull(result, "Connector reader result can not null");
         return result;
     }
 
-    public Result writer(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map> data) {
-        Connector connector = getConnector(config.getConnectorType());
-        Result result = connector.writer(new WriterBatchConfig(config, command, fields, data));
+    public Result writer(WriterBatchConfig config) {
+        Connector connector = getConnector(config.getConfig().getConnectorType());
+        Result result = connector.writer(config);
         Assert.notNull(result, "Connector writer result can not null");
         return result;
     }
 
-    public Result writer(ConnectorConfig config, List<Field> fields, Map<String, String> command, String event, Map<String, Object> data) {
-        Connector connector = getConnector(config.getConnectorType());
-        Result result = connector.writer(new WriterSingleConfig(config, fields, command, event, data));
+    public Result writer(WriterSingleConfig config) {
+        Connector connector = getConnector(config.getConfig().getConnectorType());
+        Result result = connector.writer(config);
         Assert.notNull(result, "Connector writer result can not null");
         return result;
     }

+ 2 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterSingleConfig.java

@@ -17,12 +17,13 @@ public class WriterSingleConfig extends WriterConfig {
 
     private String table;
 
-    public WriterSingleConfig(ConnectorConfig config, List<Field> fields, Map<String,String> command, String event, Map<String,Object> data) {
+    public WriterSingleConfig(ConnectorConfig config, List<Field> fields, Map<String, String> command, String event, Map<String, Object> data, String table) {
         setConfig(config);
         setCommand(command);
         setFields(fields);
         setData(data);
         setEvent(event);
+        setTable(table);
     }
 
     public Map<String, Object> getData() {

+ 2 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java

@@ -6,6 +6,7 @@ import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.QuartzFilter;
@@ -80,7 +81,7 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
         Point point = checkLastPoint(command, index);
         int pageIndex = 1;
         for (; ; ) {
-            Result reader = connectorFactory.reader(connectorConfig, point.getCommand(), point.getArgs(), pageIndex++, readNum);
+            Result reader = connectorFactory.reader(new ReaderConfig(connectorConfig, point.getCommand(), point.getArgs(), pageIndex++, readNum));
             List<Map> data = reader.getData();
             if (CollectionUtils.isEmpty(data)) {
                 break;

+ 4 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -212,7 +212,7 @@ public class ParserFactory implements Parser {
 
             // 1、获取数据源数据
             int pageIndex = Integer.parseInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
-            Result reader = connectorFactory.reader(sConfig, command, new ArrayList<>(), pageIndex, pageSize);
+            Result reader = connectorFactory.reader(new ReaderConfig(sConfig, command, new ArrayList<>(), pageIndex, pageSize));
             List<Map> data = reader.getData();
             if (CollectionUtils.isEmpty(data)) {
                 params.clear();
@@ -268,7 +268,7 @@ public class ParserFactory implements Parser {
         pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
 
         // 5、写入目标源
-        Result writer = connectorFactory.writer(tConfig, picker.getTargetFields(), tableGroup.getCommand(), event, target);
+        Result writer = connectorFactory.writer(new WriterSingleConfig(tConfig, picker.getTargetFields(), tableGroup.getCommand(), event, target, rowChangedEvent.getTableName()));
 
         // 6、更新结果
         List<Map> list = new ArrayList<>(1);
@@ -355,7 +355,7 @@ public class ParserFactory implements Parser {
         int total = target.size();
         // 单次任务
         if (total <= batchSize) {
-            return connectorFactory.writer(config, command, fields, target);
+            return connectorFactory.writer(new WriterBatchConfig(config, command, fields, target));
         }
 
         // 批量任务, 拆分
@@ -412,7 +412,7 @@ public class ParserFactory implements Parser {
             }
             data.add(poll);
         }
-        return connectorFactory.writer(config, command, fields, data);
+        return connectorFactory.writer(new WriterBatchConfig(config, command, fields, data));
     }
 
     private ThreadPoolTaskExecutor getThreadPoolTaskExecutor(int threadSize, int queueCapacity) {

+ 2 - 5
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -4,10 +4,7 @@ import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.Field;
-import org.dbsyncer.connector.config.SqlBuilderConfig;
+import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.Database;
 import org.dbsyncer.connector.enums.ConnectorEnum;
@@ -166,7 +163,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             Executor executor = getExecutor(type, table);
             Map<String, String> command = new HashMap<>();
             command.put(SqlBuilderEnum.INSERT.getName(), executor.getInsert());
-            connectorFactory.writer(config, command, executor.getFields(), list);
+            connectorFactory.writer(new WriterBatchConfig(config, command, executor.getFields(), list));
         }
 
     }