AE86 1 년 전
부모
커밋
9122c7a9ea

+ 4 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -24,6 +24,7 @@ import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -68,7 +69,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
                 logService.log(LogType.SystemLog.ERROR, e.getMessage());
             } finally {
                 try {
-                    if(executor != null){
+                    if (executor != null) {
                         executor.shutdown();
                     }
                 } catch (Exception e) {
@@ -98,7 +99,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         flush(event.getTask());
     }
 
-    private void doTask(Task task, Mapping mapping, List<TableGroup> list, ExecutorService executorService) {
+    private void doTask(Task task, Mapping mapping, List<TableGroup> list, Executor executor) {
         // 记录开始时间
         long now = Instant.now().toEpochMilli();
         task.setBeginTime(now);
@@ -116,7 +117,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
 
         int i = task.getTableGroupIndex();
         while (i < list.size()) {
-            parser.execute(task, mapping, list.get(i), executorService);
+            parser.execute(task, mapping, list.get(i), executor);
             if (!task.isRunning()) {
                 break;
             }

+ 5 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -21,7 +21,7 @@ import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 
 /**
  * @author AE86
@@ -154,9 +154,9 @@ public interface Parser {
      * @param task
      * @param mapping
      * @param tableGroup
-     * @param executorService
+     * @param executor
      */
-    void execute(Task task, Mapping mapping, TableGroup tableGroup, ExecutorService executorService);
+    void execute(Task task, Mapping mapping, TableGroup tableGroup, Executor executor);
 
     /**
      * 增量同步
@@ -171,8 +171,9 @@ public interface Parser {
      *
      * @param context
      * @param batchWriter
+     * @param executor
      * @return
      */
-    Result writeBatch(ConvertContext context, BatchWriter batchWriter);
+    Result writeBatch(ConvertContext context, BatchWriter batchWriter, Executor executor);
 
 }

+ 8 - 31
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -55,7 +55,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 
 /**
  * @author AE86
@@ -82,9 +81,6 @@ public class ParserFactory implements Parser {
     @Resource
     private FlushStrategy flushStrategy;
 
-    @Resource
-    private Executor taskExecutor;
-
     @Resource
     private ApplicationContext applicationContext;
 
@@ -229,7 +225,7 @@ public class ParserFactory implements Parser {
     }
 
     @Override
-    public void execute(Task task, Mapping mapping, TableGroup tableGroup, ExecutorService executorService) {
+    public void execute(Task task, Mapping mapping, TableGroup tableGroup, Executor executor) {
         final String metaId = task.getId();
         final String sourceConnectorId = mapping.getSourceConnectorId();
         final String targetConnectorId = mapping.getTargetConnectorId();
@@ -285,18 +281,18 @@ public class ParserFactory implements Parser {
 
             // 5、写入目标源
             BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, tTableName, event, picker.getTargetFields(), target, batchSize);
-            Result result = writeBatch(context, batchWriter, executorService);
-
-            // 6、同步完成后通知插件做后置处理
-            pluginFactory.postProcessAfter(group.getPlugin(), context);
+            Result result = writeBatch(context, batchWriter, executor);
 
-            // 7、更新结果
+            // 6、更新结果
             task.setPageIndex(task.getPageIndex() + 1);
             task.setCursors(PrimaryKeyUtil.getLastCursors(source, primaryKeys));
             result.setTableGroupId(tableGroup.getId());
             result.setTargetTableGroupName(tTableName);
             flush(task, result);
 
+            // 7、同步完成后通知插件做后置处理
+            pluginFactory.postProcessAfter(group.getPlugin(), context);
+
             // 8、判断尾页
             if (source.size() < pageSize) {
                 logger.info("完成全量:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
@@ -310,27 +306,8 @@ public class ParserFactory implements Parser {
         syncBufferActuator.offer(new WriterRequest(tableGroupId, event));
     }
 
-    /**
-     * 批量写入
-     *
-     * @param context
-     * @param batchWriter
-     * @return
-     */
     @Override
-    public Result writeBatch(ConvertContext context, BatchWriter batchWriter) {
-        return writeBatch(context, batchWriter, taskExecutor);
-    }
-
-    /**
-     * 批量写入
-     *
-     * @param context
-     * @param batchWriter
-     * @param taskExecutor
-     * @return
-     */
-    private Result writeBatch(ConvertContext context, BatchWriter batchWriter, Executor taskExecutor) {
+    public Result writeBatch(ConvertContext context, BatchWriter batchWriter, Executor executor) {
         final Result result = new Result();
         // 终止同步数据到目标源库
         if (context.isTerminated()) {
@@ -368,7 +345,7 @@ public class ParserFactory implements Parser {
                 toIndex += batchSize;
             }
 
-            taskExecutor.execute(() -> {
+            executor.execute(() -> {
                 try {
                     Result w = connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, data));
                     result.addSuccessData(w.getSuccessData());

+ 4 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -7,9 +7,9 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.strategy.FlushStrategy;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.Assert;
 
+import javax.annotation.Resource;
 import java.time.Instant;
 
 /**
@@ -19,13 +19,13 @@ import java.time.Instant;
  */
 public abstract class AbstractFlushStrategy implements FlushStrategy {
 
-    @Autowired
+    @Resource
     private FlushService flushService;
 
-    @Autowired
+    @Resource
     private CacheService cacheService;
 
-    @Autowired
+    @Resource
     private IncrementDataConfig flushDataConfig;
 
     @Override

+ 6 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -27,6 +27,7 @@ import org.springframework.util.Assert;
 import javax.annotation.Resource;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * 同步任务缓冲执行器
@@ -52,6 +53,9 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Resource
     private CacheService cacheService;
 
+    @Resource
+    private Executor taskExecutor;
+
     @Resource
     private ApplicationContext applicationContext;
 
@@ -101,9 +105,9 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         final IncrementConvertContext context = new IncrementConvertContext(sConnectorMapper, tConnectorMapper, sourceTableName, targetTableName, event, sourceDataList, targetDataList);
         pluginFactory.convert(group.getPlugin(), context);
 
-        // 5、批量执行同步
+        // 5、批量执行同步 TODO 待实现多表并行
         BatchWriter batchWriter = new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, getConfig().getWriterBatchCount());
-        Result result = parserFactory.writeBatch(context, batchWriter);
+        Result result = parserFactory.writeBatch(context, batchWriter, taskExecutor);
 
         // 6.发布刷新增量点事件
         applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));

+ 3 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableFullFlushStrategy.java

@@ -7,7 +7,8 @@ import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.annotation.Resource;
 
 /**
  * 不记录全量数据, 只记录增量同步数据, 将异常记录到系统日志中
@@ -20,7 +21,7 @@ public final class DisableFullFlushStrategy extends AbstractFlushStrategy {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    @Autowired
+    @Resource
     private LogService logService;
 
     @Override

+ 2 - 2
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java

@@ -10,10 +10,10 @@ import org.dbsyncer.plugin.config.Plugin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
@@ -54,7 +54,7 @@ public class PluginFactory implements DisposableBean {
 
     private final Map<String, ConvertService> service = new LinkedHashMap<>();
 
-    @Autowired
+    @Resource
     private ProxyApplicationContext proxyApplicationContext;
 
     @PostConstruct