Przeglądaj źródła

支持全量覆盖写入

AE86 3 lat temu
rodzic
commit
cb5ac32038

+ 1 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/config/ScheduleConfig.java

@@ -22,7 +22,7 @@ public class ScheduleConfig implements SchedulingConfigurer {
 
     @Bean(name = "taskScheduler", destroyMethod = "shutdown")
     public ThreadPoolTaskScheduler taskScheduler() {
-        int poolSize = Runtime.getRuntime().availableProcessors() * 2;
+        int poolSize = Runtime.getRuntime().availableProcessors();
         ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
         //核心线程池大小
         scheduler.setPoolSize(poolSize);

+ 0 - 12
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterBatchConfig.java

@@ -27,22 +27,13 @@ public class WriterBatchConfig {
      * 集合数据
      */
     private List<Map> data;
-    /**
-     * 强制更新
-     */
-    private boolean isForceUpdate;
 
     public WriterBatchConfig(String tableName, String event, Map<String, String> command, List<Field> fields, List<Map> data) {
-        this(tableName, event, command, fields, data, false);
-    }
-
-    public WriterBatchConfig(String tableName, String event, Map<String, String> command, List<Field> fields, List<Map> data, boolean isForceUpdate) {
         this.tableName = tableName;
         this.event = event;
         this.command = command;
         this.fields = fields;
         this.data = data;
-        this.isForceUpdate = isForceUpdate;
     }
 
     public String getTableName() {
@@ -65,7 +56,4 @@ public class WriterBatchConfig {
         return data;
     }
 
-    public boolean isForceUpdate() {
-        return isForceUpdate;
-    }
 }

+ 3 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -145,19 +145,15 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
                     })
             );
         } catch (Exception e) {
-            result.addFailData(data);
-            result.getError().append(e.getMessage());
+            logger.error(e.getMessage());
+            data.forEach(row -> forceUpdate(result, connectorMapper, config, pkField, row));
         }
 
         if (null != execute) {
             int batchSize = execute.length;
             for (int i = 0; i < batchSize; i++) {
                 if (execute[i] == 0) {
-                    if (config.isForceUpdate()) {
-                        forceUpdate(result, connectorMapper, config, pkField, data.get(i));
-                    } else {
-                        result.getFailData().add(data.get(i));
-                    }
+                    forceUpdate(result, connectorMapper, config, pkField, data.get(i));
                     continue;
                 }
                 result.getSuccessData().add(data.get(i));

+ 2 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -330,12 +330,11 @@ public class ParserFactory implements Parser {
         String event = batchWriter.getEvent();
         Map<String, String> command = batchWriter.getCommand();
         List<Field> fields = batchWriter.getFields();
-        boolean forceUpdate = batchWriter.isForceUpdate();
         // 总数
         int total = dataList.size();
         // 单次任务
         if (total <= batchSize) {
-            return connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, dataList, forceUpdate));
+            return connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, dataList));
         }
 
         // 批量任务, 拆分
@@ -358,7 +357,7 @@ public class ParserFactory implements Parser {
 
             taskExecutor.execute(() -> {
                 try {
-                    Result w = connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, data, forceUpdate));
+                    Result w = connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, data));
                     result.addSuccessData(w.getSuccessData());
                     result.addFailData(w.getFailData());
                     result.getError().append(w.getError());

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -76,7 +76,7 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     protected void pull(WriterResponse response) {
         ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(response.getTargetConnectorId()));
         Result result = parserFactory.writeBatch(new BatchWriter(targetConnectorMapper, response.getCommand(), response.getTargetTableName(), response.getEvent(),
-                response.getFields(), response.getDataList(), BATCH_SIZE, true));
+                response.getFields(), response.getDataList(), BATCH_SIZE));
         flushStrategy.flushIncrementData(response.getMetaId(), result, response.getEvent());
     }
 

+ 6 - 16
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/BatchWriter.java

@@ -8,22 +8,16 @@ import java.util.Map;
 
 public final class BatchWriter {
 
-    private ConnectorMapper     connectorMapper;
+    private ConnectorMapper connectorMapper;
     private Map<String, String> command;
-    private String              tableName;
-    private String              event;
-    private List<Field>         fields;
-    private List<Map>           dataList;
-    private int                 batchSize;
-    private boolean             isForceUpdate;
+    private String tableName;
+    private String event;
+    private List<Field> fields;
+    private List<Map> dataList;
+    private int batchSize;
 
     public BatchWriter(ConnectorMapper connectorMapper, Map<String, String> command, String tableName, String event,
                        List<Field> fields, List<Map> dataList, int batchSize) {
-        this(connectorMapper, command, tableName, event, fields, dataList, batchSize, false);
-    }
-
-    public BatchWriter(ConnectorMapper connectorMapper, Map<String, String> command, String tableName, String event,
-                       List<Field> fields, List<Map> dataList, int batchSize, boolean isForceUpdate) {
         this.connectorMapper = connectorMapper;
         this.command = command;
         this.tableName = tableName;
@@ -31,7 +25,6 @@ public final class BatchWriter {
         this.fields = fields;
         this.dataList = dataList;
         this.batchSize = batchSize;
-        this.isForceUpdate = isForceUpdate;
     }
 
     public ConnectorMapper getConnectorMapper() {
@@ -62,7 +55,4 @@ public final class BatchWriter {
         return batchSize;
     }
 
-    public boolean isForceUpdate() {
-        return isForceUpdate;
-    }
 }