Browse Source

支持多表并行同步

AE86 1 year ago
parent
commit
bf67aa6ca1
23 changed files with 366 additions and 194 deletions
  1. 6 6
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java
  2. 9 2
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/BufferActuatorConfig.java
  3. 51 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/FlushExecutorConfig.java
  4. 2 2
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/SchedulerConfig.java
  5. 49 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/WriteExecutorConfig.java
  6. 6 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/Watcher.java
  7. 2 2
      dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java
  8. 16 58
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/ThreadPoolUtil.java
  9. 5 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java
  10. 5 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java
  11. 48 4
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  12. 5 5
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java
  13. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  14. 30 18
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  15. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java
  16. 1 6
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java
  17. 1 4
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java
  18. 13 7
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  19. 18 8
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java
  20. 1 22
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java
  21. 0 37
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/SyncBufferActuator.java
  22. 82 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java
  23. 12 9
      dbsyncer-web/src/main/resources/application.properties

+ 6 - 6
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -13,11 +13,10 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.monitor.Monitor;
-import org.dbsyncer.parser.flush.BufferActuator;
+import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
-import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.util.BinlogMessageUtil;
@@ -55,7 +54,7 @@ public class DataSyncServiceImpl implements DataSyncService {
     private Manager manager;
 
     @Resource
-    private BufferActuator syncBufferActuator;
+    private Parser parser;
 
     @Override
     public MessageVo getMessageVo(String metaId, String messageId) {
@@ -150,10 +149,11 @@ public class DataSyncServiceImpl implements DataSyncService {
             if (StringUtil.isNotBlank(retryDataParams)) {
                 JsonUtil.parseMap(retryDataParams).forEach((k, v) -> binlogData.put(k, convertValue(binlogData.get(k), (String) v)));
             }
-            // TODO 待获取源表名称
-            RowChangedEvent changedEvent = new RowChangedEvent(StringUtil.EMPTY, event, Collections.EMPTY_LIST);
+            TableGroup tableGroup = manager.getTableGroup(tableGroupId);
+            String sourceTableName = tableGroup.getSourceTable().getName();
+            RowChangedEvent changedEvent = new RowChangedEvent(sourceTableName, event, Collections.EMPTY_LIST);
             changedEvent.setChangedRow(binlogData);
-            syncBufferActuator.offer(new WriterRequest(tableGroupId, changedEvent));
+            parser.execute(tableGroupId, changedEvent);
             monitor.removeData(metaId, messageId);
             // 更新失败数
             Meta meta = manager.getMeta(metaId);

+ 9 - 2
dbsyncer-common/src/main/java/org/dbsyncer/common/config/BufferActuatorConfig.java

@@ -4,13 +4,15 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
 
 /**
+ * 缓冲区配置
+ *
  * @author AE86
  * @version 1.0.0
  * @date 2022/7/14 23:50
  */
 @Configuration
-@ConfigurationProperties(prefix = "dbsyncer.parser.flush.buffer.actuator")
-public class BufferActuatorConfig {
+@ConfigurationProperties(prefix = "dbsyncer.parser.buffer.actuator")
+public class BufferActuatorConfig implements Cloneable {
 
     /**
      * 写批量数
@@ -63,4 +65,9 @@ public class BufferActuatorConfig {
     public void setPeriodMillisecond(int periodMillisecond) {
         this.periodMillisecond = periodMillisecond;
     }
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
 }

+ 51 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/FlushExecutorConfig.java

@@ -0,0 +1,51 @@
+package org.dbsyncer.common.config;
+
+import org.dbsyncer.common.util.ThreadPoolUtil;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+/**
+ * 持久化线程池配置
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020-04-26 23:40
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.parser.flush.executor")
+public class FlushExecutorConfig {
+
+    /**
+     * 工作线程数
+     */
+    private int coreSize = Runtime.getRuntime().availableProcessors();
+
+    /**
+     * 工作线任务队列
+     */
+    private int queueCapacity = 300;
+
+    @Bean(name = "flushExecutor", destroyMethod = "shutdown")
+    public ThreadPoolTaskExecutor flushExecutor() {
+        return ThreadPoolUtil.newThreadPoolTaskExecutor(coreSize, coreSize, queueCapacity, 30, "flushExecutor-");
+    }
+
+    public int getQueueCapacity() {
+        return queueCapacity;
+    }
+
+    public void setQueueCapacity(int queueCapacity) {
+        this.queueCapacity = queueCapacity;
+    }
+
+    public int getCoreSize() {
+        return coreSize;
+    }
+
+    public void setCoreSize(int coreSize) {
+        this.coreSize = coreSize;
+    }
+
+}

+ 2 - 2
dbsyncer-common/src/main/java/org/dbsyncer/common/config/TaskSchedulerConfig.java → dbsyncer-common/src/main/java/org/dbsyncer/common/config/SchedulerConfig.java

@@ -14,8 +14,8 @@ import java.util.concurrent.RejectedExecutionHandler;
  * @date 2022/4/29 10:27
  */
 @Configuration
-@ConfigurationProperties(prefix = "dbsyncer.web.task.scheduler")
-public class TaskSchedulerConfig implements SchedulingConfigurer {
+@ConfigurationProperties(prefix = "dbsyncer.web.scheduler")
+public class SchedulerConfig implements SchedulingConfigurer {
 
     /**
      * 工作线程数

+ 49 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/WriteExecutorConfig.java

@@ -0,0 +1,49 @@
+package org.dbsyncer.common.config;
+
+import org.dbsyncer.common.util.ThreadPoolUtil;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020-04-26 23:40
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.parser.write.executor")
+public class WriteExecutorConfig {
+
+    /**
+     * 工作线程数
+     */
+    private int coreSize = Runtime.getRuntime().availableProcessors() * 2;
+
+    /**
+     * 工作线任务队列
+     */
+    private int queueCapacity = 1000;
+
+    @Bean(name = "writeExecutor", destroyMethod = "shutdown")
+    public ThreadPoolTaskExecutor writeExecutor() {
+        return ThreadPoolUtil.newThreadPoolTaskExecutor(coreSize, coreSize, queueCapacity, 30, "writeExecutor-");
+    }
+
+    public int getQueueCapacity() {
+        return queueCapacity;
+    }
+
+    public void setQueueCapacity(int queueCapacity) {
+        this.queueCapacity = queueCapacity;
+    }
+
+    public int getCoreSize() {
+        return coreSize;
+    }
+
+    public void setCoreSize(int coreSize) {
+        this.coreSize = coreSize;
+    }
+
+}

+ 6 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Watcher.java

@@ -38,4 +38,10 @@ public interface Watcher {
      * @return
      */
     long getMetaUpdateTime();
+
+    /**
+     * 关闭事件
+     *
+     */
+    void closeEvent();
 }

+ 2 - 2
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java

@@ -5,11 +5,11 @@ import org.dbsyncer.common.util.UUIDUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 import org.springframework.scheduling.support.CronTrigger;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
@@ -27,7 +27,7 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService, Disposabl
     /**
      * 定时任务线程池
      */
-    @Autowired
+    @Resource
     private ThreadPoolTaskScheduler taskScheduler;
 
     private Map<String, ScheduledFuture> map = new ConcurrentHashMap<>();

+ 16 - 58
dbsyncer-common/src/main/java/org/dbsyncer/common/config/TaskExecutorConfig.java → dbsyncer-common/src/main/java/org/dbsyncer/common/util/ThreadPoolUtil.java

@@ -1,53 +1,36 @@
-package org.dbsyncer.common.config;
+package org.dbsyncer.common.util;
 
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
-import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionHandler;
 
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2020-04-26 23:40
- */
-@Configuration
-@ConfigurationProperties(prefix = "dbsyncer.web.task.executor")
-public class TaskExecutorConfig {
+public abstract class ThreadPoolUtil {
 
     /**
-     * 工作线程数
+     * 新建线程池
+     *
+     * @param corePoolSize 核心线程数
+     * @param maxPoolSize 最大线程数
+     * @param queueCapacity 队列容量
+     * @param keepAliveSeconds 空闲线程销毁时间(秒)
+     * @param threadNamePrefix 线程前缀名称
+     * @return
      */
-    private int coreSize = Runtime.getRuntime().availableProcessors() * 2;
-
-    /**
-     * 最大工作线程数
-     */
-    private int maxSize = 64;
-
-    /**
-     * 工作线任务队列
-     */
-    private int queueCapacity = 1000;
-
-    @Bean("taskExecutor")
-    public Executor taskExecutor() {
+    public static ThreadPoolTaskExecutor newThreadPoolTaskExecutor(int corePoolSize, int maxPoolSize, int queueCapacity, int keepAliveSeconds, String threadNamePrefix) {
         //注意这一行日志:2. do submit,taskCount [101], completedTaskCount [87], activeCount [5], queueSize [9]
         //这说明提交任务到线程池的时候,调用的是submit(Callable task)这个方法,当前已经提交了101个任务,完成了87个,当前有5个线程在处理任务,还剩9个任务在队列中等待,线程池的基本情况一路了然;
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         //核心线程数10:线程池创建时候初始化的线程数
-        executor.setCorePoolSize(coreSize);
+        executor.setCorePoolSize(corePoolSize);
         //最大线程数128:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
         //maxPoolSize 当系统负载大道最大值时,核心线程数已无法按时处理完所有任务,这是就需要增加线程.每秒200个任务需要20个线程,那么当每秒1000个任务时,则需要(1000-queueCapacity)*(20/200),即60个线程,可将maxPoolSize设置为60;
-        executor.setMaxPoolSize(maxSize);
+        executor.setMaxPoolSize(maxPoolSize);
         //缓冲队列:用来缓冲执行任务的队列
         executor.setQueueCapacity(queueCapacity);
         //允许线程的空闲时间30秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
-        executor.setKeepAliveSeconds(30);
+        executor.setKeepAliveSeconds(keepAliveSeconds);
         //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
-        executor.setThreadNamePrefix("taskExecutor-");
+        executor.setThreadNamePrefix(threadNamePrefix);
         //理线程池对拒绝任务的处策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
         /*CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
         这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
@@ -64,7 +47,7 @@ public class TaskExecutorConfig {
         return executor;
     }
 
-    public RejectedExecutionHandler rejectedExecutionHandler() {
+    private static RejectedExecutionHandler rejectedExecutionHandler() {
         return (r, executor) -> {
             try {
                 executor.getQueue().put(r);
@@ -73,29 +56,4 @@ public class TaskExecutorConfig {
             }
         };
     }
-
-    public int getQueueCapacity() {
-        return queueCapacity;
-    }
-
-    public void setQueueCapacity(int queueCapacity) {
-        this.queueCapacity = queueCapacity;
-    }
-
-    public int getCoreSize() {
-        return coreSize;
-    }
-
-    public void setCoreSize(int coreSize) {
-        this.coreSize = coreSize;
-    }
-
-    public int getMaxSize() {
-        return maxSize;
-    }
-
-    public void setMaxSize(int maxSize) {
-        this.maxSize = maxSize;
-    }
-
 }

+ 5 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -44,6 +44,11 @@ public abstract class AbstractExtractor implements Extractor {
         this.watcher = watcher;
     }
 
+    @Override
+    public void closeEvent() {
+        watcher.closeEvent();
+    }
+
     @Override
     public void changeEvent(ChangedEvent event) {
         if (null != event) {

+ 5 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -23,6 +23,11 @@ public interface Extractor {
      */
     void register(Watcher watcher);
 
+    /**
+     * 关闭事件
+     */
+    void closeEvent();
+
     /**
      * 数据变更事件
      *

+ 48 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -2,9 +2,9 @@ package org.dbsyncer.manager.puller;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
-import org.dbsyncer.common.event.ScanChangedEvent;
 import org.dbsyncer.common.event.RefreshOffsetEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.event.ScanChangedEvent;
 import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
@@ -23,12 +23,14 @@ import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.ManagerException;
 import org.dbsyncer.manager.model.FieldPicker;
 import org.dbsyncer.parser.Parser;
+import org.dbsyncer.parser.flush.impl.TableGroupBufferActuator;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,6 +81,9 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
     @Resource
     private ConnectorFactory connectorFactory;
 
+    @Resource
+    private TableGroupBufferActuator tableGroupBufferActuator;
+
     private Map<String, Extractor> map = new ConcurrentHashMap<>();
 
     @PostConstruct
@@ -121,6 +126,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
     public void close(String metaId) {
         Extractor extractor = map.get(metaId);
         if (null != extractor) {
+            extractor.closeEvent();
             extractor.close();
         }
         map.remove(metaId);
@@ -194,6 +200,8 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
 
     abstract class AbstractConsumer<E extends ChangedEvent> implements Watcher {
         protected Meta meta;
+        protected Map<String, TableGroupBufferActuator> router = new ConcurrentHashMap<>();
+        private final int MAX_BUFFER_ACTUATOR_SIZE = 10;
 
         public abstract void onChange(E e);
 
@@ -218,6 +226,37 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
         public long getMetaUpdateTime() {
             return meta.getUpdateTime();
         }
+
+        @Override
+        public void closeEvent() {
+            router.values().forEach(bufferActuator -> bufferActuator.stop());
+        }
+
+        protected void execute(String tableGroupId, ChangedEvent event) {
+            if (router.containsKey(tableGroupId)) {
+                router.get(tableGroupId).offer(new WriterRequest(tableGroupId, event));
+                return;
+            }
+            parser.execute(tableGroupId, event);
+        }
+
+        protected void buildBufferActuator(String tableGroupId) {
+            // TODO 暂定执行器上限,待替换为LRU模型
+            if (router.size() >= MAX_BUFFER_ACTUATOR_SIZE) {
+                return;
+            }
+            router.computeIfAbsent(tableGroupId, k -> {
+                try {
+                    TableGroupBufferActuator newBufferActuator = (TableGroupBufferActuator) tableGroupBufferActuator.clone();
+                    newBufferActuator.setTableGroupId(tableGroupId);
+                    newBufferActuator.buildConfig();
+                    return newBufferActuator;
+                } catch (CloneNotSupportedException ex) {
+                    logger.error(ex.getMessage(), ex);
+                }
+                return null;
+            });
+        }
     }
 
     final class QuartzConsumer extends AbstractConsumer<ScanChangedEvent> {
@@ -225,7 +264,10 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
 
         public QuartzConsumer(Meta meta, Mapping mapping, List<TableGroup> tableGroups) {
             this.meta = meta;
-            tableGroups.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
+            tableGroups.forEach(t -> {
+                tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t)));
+                buildBufferActuator(t.getId());
+            });
         }
 
         @Override
@@ -235,7 +277,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
             event.setSourceTableName(tableGroup.getSourceTable().getName());
 
             // 定时暂不支持触发刷新增量点事件
-            parser.execute(tableGroup.getId(), event);
+            execute(tableGroup.getId(), event);
         }
     }
 
@@ -250,7 +292,9 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
                 tablePicker.putIfAbsent(tableName, new ArrayList<>());
                 TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
                 tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
+                buildBufferActuator(group.getId());
             });
+
         }
 
         @Override
@@ -264,7 +308,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
                     final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
                     if (picker.filter(changedRow)) {
                         event.setChangedRow(changedRow);
-                        parser.execute(picker.getTableGroup().getId(), event);
+                        execute(picker.getTableGroup().getId(), event);
                     }
                 });
             }

+ 5 - 5
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -57,10 +57,10 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
     private Manager manager;
 
     @Resource
-    private Executor taskExecutor;
+    private Executor writeExecutor;
 
     @Resource
-    private BufferActuator syncBufferActuator;
+    private BufferActuator generalBufferActuator;
 
     @Resource
     private BufferActuator storageBufferActuator;
@@ -172,7 +172,7 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
 
     @Override
     public List<MetricResponse> getMetricInfo() {
-        ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) taskExecutor;
+        ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) writeExecutor;
         ThreadPoolExecutor pool = threadTask.getThreadPoolExecutor();
 
         List<MetricResponse> list = new ArrayList<>();
@@ -196,8 +196,8 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
         report.setInsert(mappingReportMetric.getInsert());
         report.setUpdate(mappingReportMetric.getUpdate());
         report.setDelete(mappingReportMetric.getDelete());
-        report.setQueueUp(syncBufferActuator.getQueue().size());
-        report.setQueueCapacity(syncBufferActuator.getQueueCapacity());
+        report.setQueueUp(generalBufferActuator.getQueue().size());
+        report.setQueueCapacity(generalBufferActuator.getQueueCapacity());
         return report;
     }
 

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -85,7 +85,7 @@ public class ParserFactory implements Parser {
     private ApplicationContext applicationContext;
 
     @Resource
-    private BufferActuator syncBufferActuator;
+    private BufferActuator generalBufferActuator;
 
     @Override
     public ConnectorMapper connect(AbstractConnectorConfig config) {
@@ -303,7 +303,7 @@ public class ParserFactory implements Parser {
 
     @Override
     public void execute(String tableGroupId, ChangedEvent event) {
-        syncBufferActuator.offer(new WriterRequest(tableGroupId, event));
+        generalBufferActuator.offer(new WriterRequest(tableGroupId, event));
     }
 
     @Override

+ 30 - 18
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -1,10 +1,14 @@
 package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.common.config.BufferActuatorConfig;
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
 import java.lang.reflect.ParameterizedType;
 import java.time.Duration;
 import java.time.Instant;
@@ -29,7 +33,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * @version 1.0.0
  * @date 2022/3/27 17:36
  */
-public abstract class AbstractBufferActuator<Request extends BufferRequest, Response extends BufferResponse> implements BufferActuator {
+public abstract class AbstractBufferActuator<Request extends BufferRequest, Response extends BufferResponse> implements BufferActuator, ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private Class<Response> responseClazz;
@@ -37,9 +41,14 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
     private final Lock queueLock = new ReentrantLock(true);
     private final Condition isFull = queueLock.newCondition();
     private final Duration OFFER_INTERVAL = Duration.of(500, ChronoUnit.MILLIS);
-    private BufferActuatorConfig config;
     private BlockingQueue<Request> queue;
 
+    @Resource
+    private BufferActuatorConfig bufferActuatorConfig;
+
+    @Resource
+    private ScheduledTaskService scheduledTaskService;
+
     public AbstractBufferActuator() {
         int level = 5;
         Class<?> aClass = getClass();
@@ -54,14 +63,18 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
         Assert.notNull(responseClazz, String.format("%s的父类%s泛型参数Response为空.", getClass().getName(), AbstractBufferActuator.class.getName()));
     }
 
+    @PostConstruct
+    public void init() {
+        buildBufferActuatorConfig();
+        scheduledTaskService.start(bufferActuatorConfig.getPeriodMillisecond(), this);
+    }
+
     /**
      * 初始化配置
-     *
-     * @param config
      */
-    public void buildConfig(BufferActuatorConfig config) {
-        this.config = config;
-        this.queue = new LinkedBlockingQueue(config.getQueueCapacity());
+    protected void buildBufferActuatorConfig() {
+        this.queue = new LinkedBlockingQueue(bufferActuatorConfig.getQueueCapacity());
+        logger.info("初始化{}容量:{}", this.getClass().getSimpleName(), bufferActuatorConfig.getQueueCapacity());
     }
 
     /**
@@ -119,7 +132,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
     }
 
     @Override
-    public void batchExecute() {
+    public void run() {
         boolean locked = false;
         try {
             locked = taskLock.tryLock();
@@ -142,12 +155,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
 
     @Override
     public int getQueueCapacity() {
-        return config.getQueueCapacity();
-    }
-
-    @Override
-    public Object clone() throws CloneNotSupportedException {
-        return super.clone();
+        return bufferActuatorConfig.getQueueCapacity();
     }
 
     private void submit() throws IllegalAccessException, InstantiationException {
@@ -157,7 +165,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
 
         AtomicLong batchCounter = new AtomicLong();
         Map<String, Response> map = new LinkedHashMap<>();
-        while (!queue.isEmpty() && batchCounter.get() < config.getBatchCount()) {
+        while (!queue.isEmpty() && batchCounter.get() < bufferActuatorConfig.getBatchCount()) {
             Request poll = queue.poll();
             String key = getPartitionKey(poll);
             if (!map.containsKey(key)) {
@@ -180,14 +188,18 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             }
-            logger.info("[{}{}]{}, {}ms", key, response.getSuffixName(), response.getTaskSize(), (Instant.now().toEpochMilli() - now));
+            logger.info("{}[{}{}]{}, {}ms", this.getClass().getSimpleName(), key, response.getSuffixName(), response.getTaskSize(), (Instant.now().toEpochMilli() - now));
         });
         map.clear();
         map = null;
         batchCounter = null;
     }
 
-    public BufferActuatorConfig getConfig() {
-        return config;
+    public BufferActuatorConfig getBufferActuatorConfig() {
+        return bufferActuatorConfig;
+    }
+
+    public void setBufferActuatorConfig(BufferActuatorConfig bufferActuatorConfig) {
+        this.bufferActuatorConfig = bufferActuatorConfig;
     }
 }

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -43,12 +43,12 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
 
         if (flushDataConfig.isWriterFail() && !CollectionUtils.isEmpty(result.getFailData())) {
             final String error = StringUtil.substring(result.getError().toString(), 0, flushDataConfig.getMaxErrorLength());
-            flushService.write(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, false, result.getFailData(), error);
+            flushService.asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, false, result.getFailData(), error);
         }
 
         // 是否写增量数据
         if (flushDataConfig.isWriterSuccess() && !CollectionUtils.isEmpty(result.getSuccessData())) {
-            flushService.write(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, true, result.getSuccessData(), "");
+            flushService.asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, true, result.getSuccessData(), "");
         }
     }
 

+ 1 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -7,7 +7,7 @@ import java.util.Queue;
  * @version 1.0.0
  * @date 2022/3/27 17:34
  */
-public interface BufferActuator extends Cloneable {
+public interface BufferActuator {
 
     /**
      * 提交任务
@@ -16,11 +16,6 @@ public interface BufferActuator extends Cloneable {
      */
     void offer(BufferRequest request);
 
-    /**
-     * 批量执行
-     */
-    void batchExecute();
-
     /**
      * 获取缓存队列
      *

+ 1 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java

@@ -1,7 +1,5 @@
 package org.dbsyncer.parser.flush;
 
-import org.springframework.scheduling.annotation.Async;
-
 import java.util.List;
 import java.util.Map;
 
@@ -13,7 +11,6 @@ public interface FlushService {
      * @param type
      * @param error
      */
-    @Async("taskExecutor")
     void asyncWrite(String type, String error);
 
     /**
@@ -26,5 +23,5 @@ public interface FlushService {
      * @param success
      * @param data
      */
-    void write(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error);
+    void asyncWrite(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error);
 }

+ 13 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -20,6 +20,7 @@ import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * 持久化
@@ -47,18 +48,23 @@ public class FlushServiceImpl implements FlushService {
     @Resource
     private IncrementDataConfig flushDataConfig;
 
+    @Resource
+    private Executor flushExecutor;
+
     @Override
     public void asyncWrite(String type, String error) {
-        Map<String, Object> params = new HashMap();
-        params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
-        params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
-        params.put(ConfigConstant.CONFIG_MODEL_JSON, substring(error));
-        params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
-        storageService.add(StorageEnum.LOG, params);
+        flushExecutor.execute(() -> {
+            Map<String, Object> params = new HashMap();
+            params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
+            params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
+            params.put(ConfigConstant.CONFIG_MODEL_JSON, substring(error));
+            params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
+            storageService.add(StorageEnum.LOG, params);
+        });
     }
 
     @Override
-    public void write(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error) {
+    public void asyncWrite(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error) {
         long now = Instant.now().toEpochMilli();
         data.forEach(r -> {
             Map<String, Object> row = new HashMap();

+ 18 - 8
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -22,6 +22,7 @@ import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
 import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import javax.annotation.Resource;
@@ -30,13 +31,17 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 
 /**
- * 同步任务缓冲执行器
+ * 通用执行器(单线程消费,多线程批量写,按序执行)
  *
  * @author AE86
  * @version 1.0.0
  * @date 2022/3/27 16:50
  */
-public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest, WriterResponse> {
+@Component
+public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest, WriterResponse> {
+
+    @Resource
+    private Executor writeExecutor;
 
     @Resource
     private ConnectorFactory connectorFactory;
@@ -53,9 +58,6 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Resource
     private CacheService cacheService;
 
-    @Resource
-    private Executor taskExecutor;
-
     @Resource
     private ApplicationContext applicationContext;
 
@@ -105,9 +107,9 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         final IncrementConvertContext context = new IncrementConvertContext(sConnectorMapper, tConnectorMapper, sourceTableName, targetTableName, event, sourceDataList, targetDataList);
         pluginFactory.convert(group.getPlugin(), context);
 
-        // 5、批量执行同步 TODO 待实现多表并行
-        BatchWriter batchWriter = new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, getConfig().getWriterBatchCount());
-        Result result = parserFactory.writeBatch(context, batchWriter, taskExecutor);
+        // 5、批量执行同步
+        BatchWriter batchWriter = new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, getBufferActuatorConfig().getWriterBatchCount());
+        Result result = parserFactory.writeBatch(context, batchWriter, writeExecutor);
 
         // 6.发布刷新增量点事件
         applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
@@ -133,4 +135,12 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         Assert.notNull(conn, "Connector can not be null.");
         return conn.getConfig();
     }
+
+    public Executor getWriteExecutor() {
+        return writeExecutor;
+    }
+
+    public void setWriteExecutor(Executor writeExecutor) {
+        this.writeExecutor = writeExecutor;
+    }
 }

+ 1 - 22
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -1,8 +1,5 @@
 package org.dbsyncer.parser.flush.impl;
 
-import org.dbsyncer.common.config.BufferActuatorConfig;
-import org.dbsyncer.common.scheduled.ScheduledTaskJob;
-import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.StorageRequest;
 import org.dbsyncer.parser.model.StorageResponse;
@@ -10,7 +7,6 @@ import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 
 /**
@@ -21,23 +17,11 @@ import javax.annotation.Resource;
  * @date 2022/3/27 16:50
  */
 @Component
-public final class StorageBufferActuator extends AbstractBufferActuator<StorageRequest, StorageResponse> implements ScheduledTaskJob {
-
-    @Resource
-    private BufferActuatorConfig bufferActuatorConfig;
-
-    @Resource
-    private ScheduledTaskService scheduledTaskService;
+public final class StorageBufferActuator extends AbstractBufferActuator<StorageRequest, StorageResponse> {
 
     @Resource
     private StorageService storageService;
 
-    @PostConstruct
-    private void init() {
-        super.buildConfig(bufferActuatorConfig);
-        scheduledTaskService.start(bufferActuatorConfig.getPeriodMillisecond(), this);
-    }
-
     @Override
     protected String getPartitionKey(StorageRequest request) {
         return request.getMetaId();
@@ -54,9 +38,4 @@ public final class StorageBufferActuator extends AbstractBufferActuator<StorageR
         storageService.addBatch(StorageEnum.DATA, response.getMetaId(), response.getDataList());
     }
 
-    @Override
-    public void run() {
-        batchExecute();
-    }
-
 }

+ 0 - 37
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/SyncBufferActuator.java

@@ -1,37 +0,0 @@
-package org.dbsyncer.parser.flush.impl;
-
-import org.dbsyncer.common.config.BufferActuatorConfig;
-import org.dbsyncer.common.scheduled.ScheduledTaskJob;
-import org.dbsyncer.common.scheduled.ScheduledTaskService;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
-
-/**
- * 同步缓冲执行器
- *
- * @author AE86
- * @version 1.0.0
- * @date 2022/3/27 16:50
- */
-@Component
-public final class SyncBufferActuator extends WriterBufferActuator implements ScheduledTaskJob {
-
-    @Resource
-    private BufferActuatorConfig bufferActuatorConfig;
-
-    @Resource
-    private ScheduledTaskService scheduledTaskService;
-
-    @PostConstruct
-    private void init() {
-        super.buildConfig(bufferActuatorConfig);
-        scheduledTaskService.start(bufferActuatorConfig.getPeriodMillisecond(), this);
-    }
-
-    @Override
-    public void run() {
-        batchExecute();
-    }
-}

+ 82 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java

@@ -0,0 +1,82 @@
+package org.dbsyncer.parser.flush.impl;
+
+import org.dbsyncer.common.config.BufferActuatorConfig;
+import org.dbsyncer.common.config.WriteExecutorConfig;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.common.util.ThreadPoolUtil;
+import org.dbsyncer.common.util.UUIDUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * 表执行器(根据表消费数据,多线程批量写,按序执行)
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:50
+ */
+@Component
+public final class TableGroupBufferActuator extends GeneralBufferActuator implements Cloneable {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Resource
+    private ScheduledTaskService scheduledTaskService;
+
+    @Resource
+    private WriteExecutorConfig writeExecutorConfig;
+
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+    private String taskKey;
+
+    private String tableGroupId;
+
+    @Override
+    public void init() {
+        // nothing to do
+    }
+
+    public void buildConfig() {
+        BufferActuatorConfig actuatorConfig = super.getBufferActuatorConfig();
+        try {
+            BufferActuatorConfig newConfig = (BufferActuatorConfig) actuatorConfig.clone();
+            // TODO 暂定容量上限
+            newConfig.setQueueCapacity(50000);
+            setBufferActuatorConfig(newConfig);
+            super.buildBufferActuatorConfig();
+            taskKey = UUIDUtil.getUUID();
+            String threadNamePrefix = new StringBuilder("writeExecutor-").append(tableGroupId).append(StringUtil.SYMBOL).toString();
+            threadPoolTaskExecutor = ThreadPoolUtil.newThreadPoolTaskExecutor(5, 5, 1000, 30, threadNamePrefix);
+            setWriteExecutor(threadPoolTaskExecutor);
+            scheduledTaskService.start(taskKey, getBufferActuatorConfig().getPeriodMillisecond(), this);
+        } catch (CloneNotSupportedException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
+    public void stop() {
+        if (threadPoolTaskExecutor != null) {
+            threadPoolTaskExecutor.shutdown();
+        }
+        scheduledTaskService.stop(taskKey);
+    }
+
+    public String getTableGroupId() {
+        return tableGroupId;
+    }
+
+    public void setTableGroupId(String tableGroupId) {
+        this.tableGroupId = tableGroupId;
+    }
+}

+ 12 - 9
dbsyncer-web/src/main/resources/application.properties

@@ -11,11 +11,7 @@ server.servlet.context-path=/
 # 机器的唯一编号(单机部署则填默认值; 部署集群时, 要保证编号在集群中唯一)
 dbsyncer.web.worker.id=1
 # 定时任务线程数
-dbsyncer.web.task.scheduler.pool-size=8
-# 增量同步任务线程数
-dbsyncer.web.task.executor.core-size=10
-# 增量同步任务线程池队列
-dbsyncer.web.task.executor.queue-capacity=1000
+dbsyncer.web.scheduler.pool-size=8
 
 #storage
 # 是否使用MySQL存储配置(false-关闭; true-开启)
@@ -26,14 +22,21 @@ dbsyncer.storage.support.mysql.config.username=root
 dbsyncer.storage.support.mysql.config.password=123
 
 #parser
+# **************************** 增量同步配置 ****************************
+# 增量同步线程数
+dbsyncer.parser.write.executor.core-size=10
+# 增量同步线程池队列
+dbsyncer.parser.write.executor.queue-capacity=1000
+# **************************** 缓冲区配置 ****************************
 # 单次执行批量同步任务数
-dbsyncer.parser.flush.buffer.actuator.writer-batch-count=100
+dbsyncer.parser.buffer.actuator.writer-batch-count=100
 # 每次同步任务缓存队列消费的任务数
-dbsyncer.parser.flush.buffer.actuator.batch-count=1000
+dbsyncer.parser.buffer.actuator.batch-count=1000
 # 同步任务缓存队列容量
-dbsyncer.parser.flush.buffer.actuator.queue-capacity=100000
+dbsyncer.parser.buffer.actuator.queue-capacity=100000
 # 定时消费缓存队列间隔(毫秒)
-dbsyncer.parser.flush.buffer.actuator.period-millisecond=300
+dbsyncer.parser.buffer.actuator.period-millisecond=300
+# **************************** 持久化配置 ****************************
 # 是否记录全量数据(false-关闭; true-开启)
 dbsyncer.parser.flush.data.full.enabled=false
 # 是否记录同步成功数据(false-关闭; true-开启)