Преглед изворни кода

!154 merge
Merge pull request !154 from AE86/design_consumer

AE86 пре 1 година
родитељ
комит
bc395212ce
35 измењених фајлова са 623 додато и 315 уклоњено
  1. 6 6
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java
  2. 28 30
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/BufferActuatorConfig.java
  3. 13 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/FlushExecutorConfig.java
  4. 49 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/GeneralBufferConfig.java
  5. 1 43
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/IncrementDataConfig.java
  6. 2 2
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/SchedulerConfig.java
  7. 89 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/StorageConfig.java
  8. 54 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/TableGroupBufferConfig.java
  9. 11 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/WriteExecutorConfig.java
  10. 27 27
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/ScanChangedEvent.java
  11. 6 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/Watcher.java
  12. 2 2
      dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java
  13. 16 58
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/ThreadPoolUtil.java
  14. 5 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java
  15. 5 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java
  16. 5 5
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java
  17. 1 0
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java
  18. 54 7
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  19. 1 1
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/OperationTemplate.java
  20. 5 5
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java
  21. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  22. 44 19
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  23. 7 7
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java
  24. 1 6
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java
  25. 6 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferRequest.java
  26. 1 4
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java
  27. 16 10
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  28. 25 8
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java
  29. 6 16
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java
  30. 0 37
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/SyncBufferActuator.java
  31. 81 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java
  32. 1 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/StorageRequest.java
  33. 5 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java
  34. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableFlushStrategy.java
  35. 47 19
      dbsyncer-web/src/main/resources/application.properties

+ 6 - 6
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -13,11 +13,10 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.monitor.Monitor;
-import org.dbsyncer.parser.flush.BufferActuator;
+import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
-import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.util.BinlogMessageUtil;
@@ -55,7 +54,7 @@ public class DataSyncServiceImpl implements DataSyncService {
     private Manager manager;
 
     @Resource
-    private BufferActuator syncBufferActuator;
+    private Parser parser;
 
     @Override
     public MessageVo getMessageVo(String metaId, String messageId) {
@@ -150,10 +149,11 @@ public class DataSyncServiceImpl implements DataSyncService {
             if (StringUtil.isNotBlank(retryDataParams)) {
                 JsonUtil.parseMap(retryDataParams).forEach((k, v) -> binlogData.put(k, convertValue(binlogData.get(k), (String) v)));
             }
-            // TODO 待获取源表名称
-            RowChangedEvent changedEvent = new RowChangedEvent(StringUtil.EMPTY, event, Collections.EMPTY_LIST);
+            TableGroup tableGroup = manager.getTableGroup(tableGroupId);
+            String sourceTableName = tableGroup.getSourceTable().getName();
+            RowChangedEvent changedEvent = new RowChangedEvent(sourceTableName, event, Collections.EMPTY_LIST);
             changedEvent.setChangedRow(binlogData);
-            syncBufferActuator.offer(new WriterRequest(tableGroupId, changedEvent));
+            parser.execute(tableGroupId, changedEvent);
             monitor.removeData(metaId, messageId);
             // 更新失败数
             Meta meta = manager.getMeta(metaId);

+ 28 - 30
dbsyncer-common/src/main/java/org/dbsyncer/common/config/BufferActuatorConfig.java

@@ -1,66 +1,64 @@
 package org.dbsyncer.common.config;
 
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
-
 /**
+ * 缓冲区配置
+ *
  * @author AE86
  * @version 1.0.0
  * @date 2022/7/14 23:50
  */
-@Configuration
-@ConfigurationProperties(prefix = "dbsyncer.parser.flush.buffer.actuator")
-public class BufferActuatorConfig {
+public abstract class BufferActuatorConfig {
 
     /**
-     * 写批量
+     * 单次执行任务
      */
-    private int writerBatchCount = 100;
+    private int bufferWriterCount = 100;
 
     /**
-     * 批量同步
+     * 每次消费缓存队列的任务
      */
-    private int batchCount = 1000;
+    private int bufferPullCount = 1000;
 
     /**
-     * 工作线任务队列
+     * 缓存队列容量
      */
-    private int queueCapacity = 5_0000;
+    private int bufferQueueCapacity = 3_0000;
 
     /**
-     * 同步间隔(毫秒)
+     * 定时消费缓存队列间隔(毫秒)
      */
-    private int periodMillisecond = 300;
+    private int bufferPeriodMillisecond = 300;
 
-    public int getWriterBatchCount() {
-        return writerBatchCount;
+    public int getBufferWriterCount() {
+        return bufferWriterCount;
     }
 
-    public void setWriterBatchCount(int writerBatchCount) {
-        this.writerBatchCount = writerBatchCount;
+    public void setBufferWriterCount(int bufferWriterCount) {
+        this.bufferWriterCount = bufferWriterCount;
     }
 
-    public int getBatchCount() {
-        return batchCount;
+    public int getBufferPullCount() {
+        return bufferPullCount;
     }
 
-    public void setBatchCount(int batchCount) {
-        this.batchCount = batchCount;
+    public void setBufferPullCount(int bufferPullCount) {
+        this.bufferPullCount = bufferPullCount;
     }
 
-    public int getQueueCapacity() {
-        return queueCapacity;
+    public int getBufferQueueCapacity() {
+        return bufferQueueCapacity;
     }
 
-    public void setQueueCapacity(int queueCapacity) {
-        this.queueCapacity = queueCapacity;
+    public void setBufferQueueCapacity(int bufferQueueCapacity) {
+        this.bufferQueueCapacity = bufferQueueCapacity;
     }
 
-    public int getPeriodMillisecond() {
-        return periodMillisecond;
+    public int getBufferPeriodMillisecond() {
+        return bufferPeriodMillisecond;
     }
 
-    public void setPeriodMillisecond(int periodMillisecond) {
-        this.periodMillisecond = periodMillisecond;
+    public void setBufferPeriodMillisecond(int bufferPeriodMillisecond) {
+        this.bufferPeriodMillisecond = bufferPeriodMillisecond;
     }
+
 }

+ 13 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/FlushExecutorConfig.java

@@ -0,0 +1,13 @@
+package org.dbsyncer.common.config;
+
+/**
+ * 持久化线程池配置
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020-04-26 23:40
+ */
+@Deprecated
+public class FlushExecutorConfig {
+
+}

+ 49 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/GeneralBufferConfig.java

@@ -0,0 +1,49 @@
+package org.dbsyncer.common.config;
+
+import org.dbsyncer.common.util.ThreadPoolUtil;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+/**
+ * 通用执行器配置
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2023/8/28 23:50
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.parser.general")
+public class GeneralBufferConfig extends BufferActuatorConfig {
+    /**
+     * 工作线程数
+     */
+    private int threadCoreSize = Runtime.getRuntime().availableProcessors() * 2;
+
+    /**
+     * 工作线任务队列
+     */
+    private int threadQueueCapacity = 1000;
+
+    @Bean(name = "generalExecutor", destroyMethod = "shutdown")
+    public ThreadPoolTaskExecutor generalExecutor() {
+        return ThreadPoolUtil.newThreadPoolTaskExecutor(threadCoreSize, threadCoreSize, threadQueueCapacity, 30, "GeneralExecutor-");
+    }
+
+    public int getThreadCoreSize() {
+        return threadCoreSize;
+    }
+
+    public void setThreadCoreSize(int threadCoreSize) {
+        this.threadCoreSize = threadCoreSize;
+    }
+
+    public int getThreadQueueCapacity() {
+        return threadQueueCapacity;
+    }
+
+    public void setThreadQueueCapacity(int threadQueueCapacity) {
+        this.threadQueueCapacity = threadQueueCapacity;
+    }
+}

+ 1 - 43
dbsyncer-common/src/main/java/org/dbsyncer/common/config/IncrementDataConfig.java

@@ -1,53 +1,11 @@
 package org.dbsyncer.common.config;
 
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
-
 /**
  * @author AE86
  * @version 1.0.0
  * @date 2022/9/21 22:20
  */
-@Configuration
-@ConfigurationProperties(prefix = "dbsyncer.parser.flush.data")
+@Deprecated
 public class IncrementDataConfig {
 
-    /**
-     * 是否记录同步成功数据
-     */
-    private boolean writerSuccess;
-
-    /**
-     * 是否记录同步失败数据
-     */
-    private boolean writerFail;
-
-    /**
-     * 最大记录异常信息长度
-     */
-    private int maxErrorLength;
-
-    public boolean isWriterSuccess() {
-        return writerSuccess;
-    }
-
-    public void setWriterSuccess(boolean writerSuccess) {
-        this.writerSuccess = writerSuccess;
-    }
-
-    public boolean isWriterFail() {
-        return writerFail;
-    }
-
-    public void setWriterFail(boolean writerFail) {
-        this.writerFail = writerFail;
-    }
-
-    public int getMaxErrorLength() {
-        return maxErrorLength;
-    }
-
-    public void setMaxErrorLength(int maxErrorLength) {
-        this.maxErrorLength = maxErrorLength;
-    }
 }

+ 2 - 2
dbsyncer-common/src/main/java/org/dbsyncer/common/config/TaskSchedulerConfig.java → dbsyncer-common/src/main/java/org/dbsyncer/common/config/SchedulerConfig.java

@@ -14,8 +14,8 @@ import java.util.concurrent.RejectedExecutionHandler;
  * @date 2022/4/29 10:27
  */
 @Configuration
-@ConfigurationProperties(prefix = "dbsyncer.web.task.scheduler")
-public class TaskSchedulerConfig implements SchedulingConfigurer {
+@ConfigurationProperties(prefix = "dbsyncer.web.scheduler")
+public class SchedulerConfig implements SchedulingConfigurer {
 
     /**
      * 工作线程数

+ 89 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/StorageConfig.java

@@ -0,0 +1,89 @@
+package org.dbsyncer.common.config;
+
+import org.dbsyncer.common.util.ThreadPoolUtil;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+/**
+ * 持久化配置
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2023/8/28 23:50
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.storage")
+public class StorageConfig extends BufferActuatorConfig {
+
+    /**
+     * 工作线程数
+     */
+    private int threadCoreSize = Runtime.getRuntime().availableProcessors();
+
+    /**
+     * 工作线任务队列
+     */
+    private int threadQueueCapacity = 500;
+
+    /**
+     * 是否记录同步成功数据
+     */
+    private boolean writerSuccess;
+
+    /**
+     * 是否记录同步失败数据
+     */
+    private boolean writerFail;
+
+    /**
+     * 最大记录异常信息长度
+     */
+    private int maxErrorLength;
+
+    @Bean(name = "storageExecutor", destroyMethod = "shutdown")
+    public ThreadPoolTaskExecutor storageExecutor() {
+        return ThreadPoolUtil.newThreadPoolTaskExecutor(threadCoreSize, threadCoreSize, threadQueueCapacity, 30, "StorageExecutor-");
+    }
+
+    public int getThreadCoreSize() {
+        return threadCoreSize;
+    }
+
+    public void setThreadCoreSize(int threadCoreSize) {
+        this.threadCoreSize = threadCoreSize;
+    }
+
+    public int getThreadQueueCapacity() {
+        return threadQueueCapacity;
+    }
+
+    public void setThreadQueueCapacity(int threadQueueCapacity) {
+        this.threadQueueCapacity = threadQueueCapacity;
+    }
+
+    public boolean isWriterSuccess() {
+        return writerSuccess;
+    }
+
+    public void setWriterSuccess(boolean writerSuccess) {
+        this.writerSuccess = writerSuccess;
+    }
+
+    public boolean isWriterFail() {
+        return writerFail;
+    }
+
+    public void setWriterFail(boolean writerFail) {
+        this.writerFail = writerFail;
+    }
+
+    public int getMaxErrorLength() {
+        return maxErrorLength;
+    }
+
+    public void setMaxErrorLength(int maxErrorLength) {
+        this.maxErrorLength = maxErrorLength;
+    }
+}

+ 54 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/TableGroupBufferConfig.java

@@ -0,0 +1,54 @@
+package org.dbsyncer.common.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * 表执行器配置
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2023/8/28 23:50
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.parser.table.group")
+public class TableGroupBufferConfig extends BufferActuatorConfig {
+    /**
+     * 最多可分配的表执行器个数
+     */
+    private int maxBufferActuatorSize;
+
+    /**
+     * 工作线程数
+     */
+    private int threadCoreSize = Runtime.getRuntime().availableProcessors() * 2;
+
+    /**
+     * 工作线任务队列
+     */
+    private int threadQueueCapacity = 1000;
+
+    public int getMaxBufferActuatorSize() {
+        return maxBufferActuatorSize;
+    }
+
+    public void setMaxBufferActuatorSize(int maxBufferActuatorSize) {
+        this.maxBufferActuatorSize = maxBufferActuatorSize;
+    }
+
+    public int getThreadCoreSize() {
+        return threadCoreSize;
+    }
+
+    public void setThreadCoreSize(int threadCoreSize) {
+        this.threadCoreSize = threadCoreSize;
+    }
+
+    public int getThreadQueueCapacity() {
+        return threadQueueCapacity;
+    }
+
+    public void setThreadQueueCapacity(int threadQueueCapacity) {
+        this.threadQueueCapacity = threadQueueCapacity;
+    }
+}

+ 11 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/WriteExecutorConfig.java

@@ -0,0 +1,11 @@
+package org.dbsyncer.common.config;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020-04-26 23:40
+ */
+@Deprecated
+public class WriteExecutorConfig {
+
+}

+ 27 - 27
dbsyncer-common/src/main/java/org/dbsyncer/common/event/PageChangedEvent.java → dbsyncer-common/src/main/java/org/dbsyncer/common/event/ScanChangedEvent.java

@@ -1,28 +1,28 @@
-/**
- * DBSyncer Copyright 2019-2024 All Rights Reserved.
- */
-package org.dbsyncer.common.event;
-
-import java.util.Map;
-
-/**
- * 分页变更事件
- *
- * @version 1.0.0
- * @Author AE86
- * @Date 2023-08-20 20:00
- */
-public final class PageChangedEvent extends CommonChangedEvent {
-
-    private int tableGroupIndex;
-
-    public PageChangedEvent(int index, String event, Map<String, Object> changedRow) {
-        this.tableGroupIndex = index;
-        setEvent(event);
-        setChangedRow(changedRow);
-    }
-
-    public int getTableGroupIndex() {
-        return tableGroupIndex;
-    }
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
+package org.dbsyncer.common.event;
+
+import java.util.Map;
+
+/**
+ * 定时扫表变更事件
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2023-08-20 20:00
+ */
+public final class ScanChangedEvent extends CommonChangedEvent {
+
+    private int tableGroupIndex;
+
+    public ScanChangedEvent(int index, String event, Map<String, Object> changedRow) {
+        this.tableGroupIndex = index;
+        setEvent(event);
+        setChangedRow(changedRow);
+    }
+
+    public int getTableGroupIndex() {
+        return tableGroupIndex;
+    }
 }

+ 6 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Watcher.java

@@ -38,4 +38,10 @@ public interface Watcher {
      * @return
      */
     long getMetaUpdateTime();
+
+    /**
+     * 关闭事件
+     *
+     */
+    void closeEvent();
 }

+ 2 - 2
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java

@@ -5,11 +5,11 @@ import org.dbsyncer.common.util.UUIDUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 import org.springframework.scheduling.support.CronTrigger;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
@@ -27,7 +27,7 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService, Disposabl
     /**
      * 定时任务线程池
      */
-    @Autowired
+    @Resource
     private ThreadPoolTaskScheduler taskScheduler;
 
     private Map<String, ScheduledFuture> map = new ConcurrentHashMap<>();

+ 16 - 58
dbsyncer-common/src/main/java/org/dbsyncer/common/config/TaskExecutorConfig.java → dbsyncer-common/src/main/java/org/dbsyncer/common/util/ThreadPoolUtil.java

@@ -1,53 +1,36 @@
-package org.dbsyncer.common.config;
+package org.dbsyncer.common.util;
 
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
-import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionHandler;
 
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2020-04-26 23:40
- */
-@Configuration
-@ConfigurationProperties(prefix = "dbsyncer.web.task.executor")
-public class TaskExecutorConfig {
+public abstract class ThreadPoolUtil {
 
     /**
-     * 工作线程数
+     * 新建线程池
+     *
+     * @param corePoolSize 核心线程数
+     * @param maxPoolSize 最大线程数
+     * @param queueCapacity 队列容量
+     * @param keepAliveSeconds 空闲线程销毁时间(秒)
+     * @param threadNamePrefix 线程前缀名称
+     * @return
      */
-    private int coreSize = Runtime.getRuntime().availableProcessors() * 2;
-
-    /**
-     * 最大工作线程数
-     */
-    private int maxSize = 64;
-
-    /**
-     * 工作线任务队列
-     */
-    private int queueCapacity = 1000;
-
-    @Bean("taskExecutor")
-    public Executor taskExecutor() {
+    public static ThreadPoolTaskExecutor newThreadPoolTaskExecutor(int corePoolSize, int maxPoolSize, int queueCapacity, int keepAliveSeconds, String threadNamePrefix) {
         //注意这一行日志:2. do submit,taskCount [101], completedTaskCount [87], activeCount [5], queueSize [9]
         //这说明提交任务到线程池的时候,调用的是submit(Callable task)这个方法,当前已经提交了101个任务,完成了87个,当前有5个线程在处理任务,还剩9个任务在队列中等待,线程池的基本情况一路了然;
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         //核心线程数10:线程池创建时候初始化的线程数
-        executor.setCorePoolSize(coreSize);
+        executor.setCorePoolSize(corePoolSize);
         //最大线程数128:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
         //maxPoolSize 当系统负载大道最大值时,核心线程数已无法按时处理完所有任务,这是就需要增加线程.每秒200个任务需要20个线程,那么当每秒1000个任务时,则需要(1000-queueCapacity)*(20/200),即60个线程,可将maxPoolSize设置为60;
-        executor.setMaxPoolSize(maxSize);
+        executor.setMaxPoolSize(maxPoolSize);
         //缓冲队列:用来缓冲执行任务的队列
         executor.setQueueCapacity(queueCapacity);
         //允许线程的空闲时间30秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
-        executor.setKeepAliveSeconds(30);
+        executor.setKeepAliveSeconds(keepAliveSeconds);
         //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
-        executor.setThreadNamePrefix("taskExecutor-");
+        executor.setThreadNamePrefix(threadNamePrefix);
         //理线程池对拒绝任务的处策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
         /*CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
         这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
@@ -64,7 +47,7 @@ public class TaskExecutorConfig {
         return executor;
     }
 
-    public RejectedExecutionHandler rejectedExecutionHandler() {
+    private static RejectedExecutionHandler rejectedExecutionHandler() {
         return (r, executor) -> {
             try {
                 executor.getQueue().put(r);
@@ -73,29 +56,4 @@ public class TaskExecutorConfig {
             }
         };
     }
-
-    public int getQueueCapacity() {
-        return queueCapacity;
-    }
-
-    public void setQueueCapacity(int queueCapacity) {
-        this.queueCapacity = queueCapacity;
-    }
-
-    public int getCoreSize() {
-        return coreSize;
-    }
-
-    public void setCoreSize(int coreSize) {
-        this.coreSize = coreSize;
-    }
-
-    public int getMaxSize() {
-        return maxSize;
-    }
-
-    public void setMaxSize(int maxSize) {
-        this.maxSize = maxSize;
-    }
-
 }

+ 5 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -44,6 +44,11 @@ public abstract class AbstractExtractor implements Extractor {
         this.watcher = watcher;
     }
 
+    @Override
+    public void closeEvent() {
+        watcher.closeEvent();
+    }
+
     @Override
     public void changeEvent(ChangedEvent event) {
         if (null != event) {

+ 5 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -23,6 +23,11 @@ public interface Extractor {
      */
     void register(Watcher watcher);
 
+    /**
+     * 关闭事件
+     */
+    void closeEvent();
+
     /**
      * 数据变更事件
      *

+ 5 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.listener.quartz;
 
-import org.dbsyncer.common.event.PageChangedEvent;
+import org.dbsyncer.common.event.ScanChangedEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.spi.ConnectorMapper;
@@ -119,21 +119,21 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
             for (Map<String, Object> row : data) {
                 if (forceUpdate) {
-                    changeEvent(new PageChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
+                    changeEvent(new ScanChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
                     continue;
                 }
 
                 Object eventValue = row.get(eventFieldName);
                 if (update.contains(eventValue)) {
-                    changeEvent(new PageChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
+                    changeEvent(new ScanChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
                     continue;
                 }
                 if (insert.contains(eventValue)) {
-                    changeEvent(new PageChangedEvent(index, ConnectorConstant.OPERTION_INSERT, row));
+                    changeEvent(new ScanChangedEvent(index, ConnectorConstant.OPERTION_INSERT, row));
                     continue;
                 }
                 if (delete.contains(eventValue)) {
-                    changeEvent(new PageChangedEvent(index, ConnectorConstant.OPERTION_DELETE, row));
+                    changeEvent(new ScanChangedEvent(index, ConnectorConstant.OPERTION_DELETE, row));
                 }
             }
             // 更新记录点

+ 1 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -145,6 +145,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
 
         meta.setBeginTime(task.getBeginTime());
         meta.setEndTime(task.getEndTime());
+        meta.setUpdateTime(Instant.now().toEpochMilli());
         Map<String, String> snapshot = meta.getSnapshot();
         snapshot.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(task.getPageIndex()));
         snapshot.put(ParserEnum.CURSOR.getCode(), StringUtil.join(task.getCursors(), ","));

+ 54 - 7
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -1,10 +1,11 @@
 package org.dbsyncer.manager.puller;
 
+import org.dbsyncer.common.config.TableGroupBufferConfig;
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
-import org.dbsyncer.common.event.PageChangedEvent;
 import org.dbsyncer.common.event.RefreshOffsetEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.event.ScanChangedEvent;
 import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
@@ -23,12 +24,14 @@ import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.ManagerException;
 import org.dbsyncer.manager.model.FieldPicker;
 import org.dbsyncer.parser.Parser;
+import org.dbsyncer.parser.flush.impl.TableGroupBufferActuator;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +64,9 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    @Resource
+    private TableGroupBufferConfig tableGroupBufferConfig;
+
     @Resource
     private Parser parser;
 
@@ -79,6 +85,9 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
     @Resource
     private ConnectorFactory connectorFactory;
 
+    @Resource
+    private TableGroupBufferActuator tableGroupBufferActuator;
+
     private Map<String, Extractor> map = new ConcurrentHashMap<>();
 
     @PostConstruct
@@ -121,6 +130,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
     public void close(String metaId) {
         Extractor extractor = map.get(metaId);
         if (null != extractor) {
+            extractor.closeEvent();
             extractor.close();
         }
         map.remove(metaId);
@@ -143,7 +153,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
     @Override
     public void run() {
         // 定时同步增量信息
-        map.forEach((k, v) -> v.flushEvent());
+        map.values().forEach(extractor -> extractor.flushEvent());
     }
 
     private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta) throws InstantiationException, IllegalAccessException {
@@ -194,6 +204,8 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
 
     abstract class AbstractConsumer<E extends ChangedEvent> implements Watcher {
         protected Meta meta;
+        private Map<String, TableGroupBufferActuator> router = new ConcurrentHashMap<>();
+        private final int MAX_BUFFER_ACTUATOR_SIZE = 10;
 
         public abstract void onChange(E e);
 
@@ -218,24 +230,58 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
         public long getMetaUpdateTime() {
             return meta.getUpdateTime();
         }
+
+        @Override
+        public void closeEvent() {
+            router.values().forEach(bufferActuator -> bufferActuator.stop());
+        }
+
+        protected void execute(String tableGroupId, ChangedEvent event) {
+            if (router.containsKey(tableGroupId)) {
+                router.get(tableGroupId).offer(new WriterRequest(tableGroupId, event));
+                return;
+            }
+            parser.execute(tableGroupId, event);
+        }
+
+        protected void buildBufferActuator(String tableGroupId) {
+            // TODO 暂定执行器上限,待替换为LRU模型
+            if (router.size() >= tableGroupBufferConfig.getMaxBufferActuatorSize()) {
+                return;
+            }
+            router.computeIfAbsent(tableGroupId, k -> {
+                TableGroupBufferActuator newBufferActuator = null;
+                try {
+                    newBufferActuator = (TableGroupBufferActuator) tableGroupBufferActuator.clone();
+                    newBufferActuator.setTableGroupId(tableGroupId);
+                    newBufferActuator.buildConfig();
+                } catch (CloneNotSupportedException ex) {
+                    logger.error(ex.getMessage(), ex);
+                }
+                return newBufferActuator;
+            });
+        }
     }
 
-    final class QuartzConsumer extends AbstractConsumer<PageChangedEvent> {
+    final class QuartzConsumer extends AbstractConsumer<ScanChangedEvent> {
         private List<FieldPicker> tablePicker = new LinkedList<>();
 
         public QuartzConsumer(Meta meta, Mapping mapping, List<TableGroup> tableGroups) {
             this.meta = meta;
-            tableGroups.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
+            tableGroups.forEach(t -> {
+                tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t)));
+                buildBufferActuator(t.getId());
+            });
         }
 
         @Override
-        public void onChange(PageChangedEvent event) {
+        public void onChange(ScanChangedEvent event) {
             final FieldPicker picker = tablePicker.get(event.getTableGroupIndex());
             TableGroup tableGroup = picker.getTableGroup();
             event.setSourceTableName(tableGroup.getSourceTable().getName());
 
             // 定时暂不支持触发刷新增量点事件
-            parser.execute(tableGroup.getId(), event);
+            execute(tableGroup.getId(), event);
         }
     }
 
@@ -250,6 +296,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
                 tablePicker.putIfAbsent(tableName, new ArrayList<>());
                 TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
                 tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
+                buildBufferActuator(group.getId());
             });
         }
 
@@ -264,7 +311,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
                     final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
                     if (picker.filter(changedRow)) {
                         event.setChangedRow(changedRow);
-                        parser.execute(picker.getTableGroup().getId(), event);
+                        execute(picker.getTableGroup().getId(), event);
                     }
                 });
             }

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/OperationTemplate.java

@@ -160,7 +160,7 @@ public final class OperationTemplate {
         }
     }
 
-    static class Group {
+    class Group {
 
         private List<String> index;
 

+ 5 - 5
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -57,10 +57,10 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
     private Manager manager;
 
     @Resource
-    private Executor taskExecutor;
+    private Executor generalExecutor;
 
     @Resource
-    private BufferActuator syncBufferActuator;
+    private BufferActuator generalBufferActuator;
 
     @Resource
     private BufferActuator storageBufferActuator;
@@ -172,7 +172,7 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
 
     @Override
     public List<MetricResponse> getMetricInfo() {
-        ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) taskExecutor;
+        ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) generalExecutor;
         ThreadPoolExecutor pool = threadTask.getThreadPoolExecutor();
 
         List<MetricResponse> list = new ArrayList<>();
@@ -196,8 +196,8 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
         report.setInsert(mappingReportMetric.getInsert());
         report.setUpdate(mappingReportMetric.getUpdate());
         report.setDelete(mappingReportMetric.getDelete());
-        report.setQueueUp(syncBufferActuator.getQueue().size());
-        report.setQueueCapacity(syncBufferActuator.getQueueCapacity());
+        report.setQueueUp(generalBufferActuator.getQueue().size());
+        report.setQueueCapacity(generalBufferActuator.getQueueCapacity());
         return report;
     }
 

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -85,7 +85,7 @@ public class ParserFactory implements Parser {
     private ApplicationContext applicationContext;
 
     @Resource
-    private BufferActuator syncBufferActuator;
+    private BufferActuator generalBufferActuator;
 
     @Override
     public ConnectorMapper connect(AbstractConnectorConfig config) {
@@ -303,7 +303,7 @@ public class ParserFactory implements Parser {
 
     @Override
     public void execute(String tableGroupId, ChangedEvent event) {
-        syncBufferActuator.offer(new WriterRequest(tableGroupId, event));
+        generalBufferActuator.offer(new WriterRequest(tableGroupId, event));
     }
 
     @Override

+ 44 - 19
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -1,10 +1,16 @@
 package org.dbsyncer.parser.flush;
 
+import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.config.BufferActuatorConfig;
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.parser.enums.MetaEnum;
+import org.dbsyncer.parser.model.Meta;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
+import javax.annotation.Resource;
 import java.lang.reflect.ParameterizedType;
 import java.time.Duration;
 import java.time.Instant;
@@ -29,17 +35,23 @@ import java.util.concurrent.locks.ReentrantLock;
  * @version 1.0.0
  * @date 2022/3/27 17:36
  */
-public abstract class AbstractBufferActuator<Request extends BufferRequest, Response extends BufferResponse> implements BufferActuator {
+public abstract class AbstractBufferActuator<Request extends BufferRequest, Response extends BufferResponse> implements BufferActuator, ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private Class<Response> responseClazz;
     private final Lock taskLock = new ReentrantLock();
     private final Lock queueLock = new ReentrantLock(true);
     private final Condition isFull = queueLock.newCondition();
-    private final Duration OFFER_INTERVAL = Duration.of(500, ChronoUnit.MILLIS);
+    private final Duration offerInterval = Duration.of(500, ChronoUnit.MILLIS);
     private BufferActuatorConfig config;
     private BlockingQueue<Request> queue;
 
+    @Resource
+    private ScheduledTaskService scheduledTaskService;
+
+    @Resource
+    private CacheService cacheService;
+
     public AbstractBufferActuator() {
         int level = 5;
         Class<?> aClass = getClass();
@@ -56,12 +68,19 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
 
     /**
      * 初始化配置
-     *
-     * @param config
      */
-    public void buildConfig(BufferActuatorConfig config) {
-        this.config = config;
-        this.queue = new LinkedBlockingQueue(config.getQueueCapacity());
+    protected void buildConfig() {
+        Assert.notNull(config, "请先配置缓存执行器,setConfig(BufferActuatorConfig config)");
+        buildQueueConfig();
+        scheduledTaskService.start(config.getBufferPeriodMillisecond(), this);
+    }
+
+    /**
+     * 初始化缓存队列配置
+     */
+    protected void buildQueueConfig() {
+        this.queue = new LinkedBlockingQueue(config.getBufferQueueCapacity());
+        logger.info("初始化{}容量:{}", this.getClass().getSimpleName(), config.getBufferQueueCapacity());
     }
 
     /**
@@ -80,6 +99,17 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
      */
     protected abstract void partition(Request request, Response response);
 
+    /**
+     * 驱动是否运行中
+     *
+     * @param request
+     * @return
+     */
+    protected boolean isRunning(BufferRequest request) {
+        Meta meta = cacheService.get(request.getMetaId(), Meta.class);
+        return meta != null && MetaEnum.isRunning(meta.getState());
+    }
+
     /**
      * 是否跳过分区处理
      *
@@ -104,10 +134,10 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
             try {
                 // 公平锁,有序执行,容量上限,阻塞重试
                 queueLock.lock();
-                while (!queue.offer((Request) request)) {
+                while (isRunning(request) && !queue.offer((Request) request)) {
                     logger.warn("[{}]缓存队列容量已达上限[{}], 正在阻塞重试.", this.getClass().getSimpleName(), getQueueCapacity());
                     try {
-                        isFull.await(OFFER_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
+                        isFull.await(offerInterval.toMillis(), TimeUnit.MILLISECONDS);
                     } catch (InterruptedException e) {
                         break;
                     }
@@ -119,7 +149,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
     }
 
     @Override
-    public void batchExecute() {
+    public void run() {
         boolean locked = false;
         try {
             locked = taskLock.tryLock();
@@ -142,12 +172,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
 
     @Override
     public int getQueueCapacity() {
-        return config.getQueueCapacity();
-    }
-
-    @Override
-    public Object clone() throws CloneNotSupportedException {
-        return super.clone();
+        return config.getBufferQueueCapacity();
     }
 
     private void submit() throws IllegalAccessException, InstantiationException {
@@ -157,7 +182,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
 
         AtomicLong batchCounter = new AtomicLong();
         Map<String, Response> map = new LinkedHashMap<>();
-        while (!queue.isEmpty() && batchCounter.get() < config.getBatchCount()) {
+        while (!queue.isEmpty() && batchCounter.get() < config.getBufferPullCount()) {
             Request poll = queue.poll();
             String key = getPartitionKey(poll);
             if (!map.containsKey(key)) {
@@ -187,7 +212,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
         batchCounter = null;
     }
 
-    public BufferActuatorConfig getConfig() {
-        return config;
+    public void setConfig(BufferActuatorConfig config) {
+        this.config = config;
     }
 }

+ 7 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.cache.CacheService;
-import org.dbsyncer.common.config.IncrementDataConfig;
+import org.dbsyncer.common.config.StorageConfig;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
@@ -26,7 +26,7 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     private CacheService cacheService;
 
     @Resource
-    private IncrementDataConfig flushDataConfig;
+    private StorageConfig storageConfig;
 
     @Override
     public void flushFullData(String metaId, Result result, String event) {
@@ -41,14 +41,14 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     protected void flush(String metaId, Result result, String event) {
         refreshTotal(metaId, result);
 
-        if (flushDataConfig.isWriterFail() && !CollectionUtils.isEmpty(result.getFailData())) {
-            final String error = StringUtil.substring(result.getError().toString(), 0, flushDataConfig.getMaxErrorLength());
-            flushService.write(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, false, result.getFailData(), error);
+        if (storageConfig.isWriterFail() && !CollectionUtils.isEmpty(result.getFailData())) {
+            final String error = StringUtil.substring(result.getError().toString(), 0, storageConfig.getMaxErrorLength());
+            flushService.asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, false, result.getFailData(), error);
         }
 
         // 是否写增量数据
-        if (flushDataConfig.isWriterSuccess() && !CollectionUtils.isEmpty(result.getSuccessData())) {
-            flushService.write(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, true, result.getSuccessData(), "");
+        if (storageConfig.isWriterSuccess() && !CollectionUtils.isEmpty(result.getSuccessData())) {
+            flushService.asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, true, result.getSuccessData(), "");
         }
     }
 

+ 1 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -7,7 +7,7 @@ import java.util.Queue;
  * @version 1.0.0
  * @date 2022/3/27 17:34
  */
-public interface BufferActuator extends Cloneable {
+public interface BufferActuator {
 
     /**
      * 提交任务
@@ -16,11 +16,6 @@ public interface BufferActuator extends Cloneable {
      */
     void offer(BufferRequest request);
 
-    /**
-     * 批量执行
-     */
-    void batchExecute();
-
     /**
      * 获取缓存队列
      *

+ 6 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferRequest.java

@@ -7,4 +7,10 @@ package org.dbsyncer.parser.flush;
  */
 public interface BufferRequest {
 
+    /**
+     * 获取驱动ID
+     *
+     * @return
+     */
+    String getMetaId();
 }

+ 1 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java

@@ -1,7 +1,5 @@
 package org.dbsyncer.parser.flush;
 
-import org.springframework.scheduling.annotation.Async;
-
 import java.util.List;
 import java.util.Map;
 
@@ -13,7 +11,6 @@ public interface FlushService {
      * @param type
      * @param error
      */
-    @Async("taskExecutor")
     void asyncWrite(String type, String error);
 
     /**
@@ -26,5 +23,5 @@ public interface FlushService {
      * @param success
      * @param data
      */
-    void write(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error);
+    void asyncWrite(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error);
 }

+ 16 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.parser.flush.impl;
 
-import org.dbsyncer.common.config.IncrementDataConfig;
+import org.dbsyncer.common.config.StorageConfig;
 import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.flush.BufferActuator;
@@ -20,6 +20,7 @@ import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * 持久化
@@ -45,20 +46,25 @@ public class FlushServiceImpl implements FlushService {
     private BufferActuator storageBufferActuator;
 
     @Resource
-    private IncrementDataConfig flushDataConfig;
+    private StorageConfig storageConfig;
+
+    @Resource
+    private Executor storageExecutor;
 
     @Override
     public void asyncWrite(String type, String error) {
-        Map<String, Object> params = new HashMap();
-        params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
-        params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
-        params.put(ConfigConstant.CONFIG_MODEL_JSON, substring(error));
-        params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
-        storageService.add(StorageEnum.LOG, params);
+        storageExecutor.execute(() -> {
+            Map<String, Object> params = new HashMap();
+            params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
+            params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
+            params.put(ConfigConstant.CONFIG_MODEL_JSON, substring(error));
+            params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
+            storageService.add(StorageEnum.LOG, params);
+        });
     }
 
     @Override
-    public void write(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error) {
+    public void asyncWrite(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error) {
         long now = Instant.now().toEpochMilli();
         data.forEach(r -> {
             Map<String, Object> row = new HashMap();
@@ -87,7 +93,7 @@ public class FlushServiceImpl implements FlushService {
      * @return
      */
     private String substring(String error) {
-        return StringUtil.substring(error, 0, flushDataConfig.getMaxErrorLength());
+        return StringUtil.substring(error, 0, storageConfig.getMaxErrorLength());
     }
 
 }

+ 25 - 8
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.cache.CacheService;
+import org.dbsyncer.common.config.GeneralBufferConfig;
 import org.dbsyncer.common.event.RefreshOffsetEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.model.IncrementConvertContext;
@@ -22,21 +23,30 @@ import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
 import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
+import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
 /**
- * 同步任务缓冲执行器
+ * 通用执行器(单线程消费,多线程批量写,按序执行)
  *
  * @author AE86
  * @version 1.0.0
  * @date 2022/3/27 16:50
  */
-public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest, WriterResponse> {
+@Component
+public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest, WriterResponse> {
+
+    @Resource
+    private GeneralBufferConfig generalBufferConfig;
+
+    @Resource
+    private Executor generalExecutor;
 
     @Resource
     private ConnectorFactory connectorFactory;
@@ -53,12 +63,15 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Resource
     private CacheService cacheService;
 
-    @Resource
-    private Executor taskExecutor;
-
     @Resource
     private ApplicationContext applicationContext;
 
+    @PostConstruct
+    public void init() {
+        setConfig(generalBufferConfig);
+        buildConfig();
+    }
+
     @Override
     protected String getPartitionKey(WriterRequest request) {
         return request.getTableGroupId();
@@ -105,9 +118,9 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         final IncrementConvertContext context = new IncrementConvertContext(sConnectorMapper, tConnectorMapper, sourceTableName, targetTableName, event, sourceDataList, targetDataList);
         pluginFactory.convert(group.getPlugin(), context);
 
-        // 5、批量执行同步 TODO 待实现多表并行
-        BatchWriter batchWriter = new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, getConfig().getWriterBatchCount());
-        Result result = parserFactory.writeBatch(context, batchWriter, taskExecutor);
+        // 5、批量执行同步
+        BatchWriter batchWriter = new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, generalBufferConfig.getBufferWriterCount());
+        Result result = parserFactory.writeBatch(context, batchWriter, generalExecutor);
 
         // 6.发布刷新增量点事件
         applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
@@ -133,4 +146,8 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         Assert.notNull(conn, "Connector can not be null.");
         return conn.getConfig();
     }
+
+    public void setGeneralExecutor(Executor generalExecutor) {
+        this.generalExecutor = generalExecutor;
+    }
 }

+ 6 - 16
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -1,8 +1,6 @@
 package org.dbsyncer.parser.flush.impl;
 
-import org.dbsyncer.common.config.BufferActuatorConfig;
-import org.dbsyncer.common.scheduled.ScheduledTaskJob;
-import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.common.config.StorageConfig;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.StorageRequest;
 import org.dbsyncer.parser.model.StorageResponse;
@@ -21,21 +19,18 @@ import javax.annotation.Resource;
  * @date 2022/3/27 16:50
  */
 @Component
-public final class StorageBufferActuator extends AbstractBufferActuator<StorageRequest, StorageResponse> implements ScheduledTaskJob {
+public final class StorageBufferActuator extends AbstractBufferActuator<StorageRequest, StorageResponse> {
 
     @Resource
-    private BufferActuatorConfig bufferActuatorConfig;
-
-    @Resource
-    private ScheduledTaskService scheduledTaskService;
+    private StorageConfig storageConfig;
 
     @Resource
     private StorageService storageService;
 
     @PostConstruct
-    private void init() {
-        super.buildConfig(bufferActuatorConfig);
-        scheduledTaskService.start(bufferActuatorConfig.getPeriodMillisecond(), this);
+    public void init() {
+        setConfig(storageConfig);
+        buildConfig();
     }
 
     @Override
@@ -54,9 +49,4 @@ public final class StorageBufferActuator extends AbstractBufferActuator<StorageR
         storageService.addBatch(StorageEnum.DATA, response.getMetaId(), response.getDataList());
     }
 
-    @Override
-    public void run() {
-        batchExecute();
-    }
-
 }

+ 0 - 37
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/SyncBufferActuator.java

@@ -1,37 +0,0 @@
-package org.dbsyncer.parser.flush.impl;
-
-import org.dbsyncer.common.config.BufferActuatorConfig;
-import org.dbsyncer.common.scheduled.ScheduledTaskJob;
-import org.dbsyncer.common.scheduled.ScheduledTaskService;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
-
-/**
- * 同步缓冲执行器
- *
- * @author AE86
- * @version 1.0.0
- * @date 2022/3/27 16:50
- */
-@Component
-public final class SyncBufferActuator extends WriterBufferActuator implements ScheduledTaskJob {
-
-    @Resource
-    private BufferActuatorConfig bufferActuatorConfig;
-
-    @Resource
-    private ScheduledTaskService scheduledTaskService;
-
-    @PostConstruct
-    private void init() {
-        super.buildConfig(bufferActuatorConfig);
-        scheduledTaskService.start(bufferActuatorConfig.getPeriodMillisecond(), this);
-    }
-
-    @Override
-    public void run() {
-        batchExecute();
-    }
-}

+ 81 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java

@@ -0,0 +1,81 @@
+package org.dbsyncer.parser.flush.impl;
+
+import org.dbsyncer.common.config.TableGroupBufferConfig;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.common.util.ThreadPoolUtil;
+import org.dbsyncer.common.util.UUIDUtil;
+import org.dbsyncer.parser.flush.BufferRequest;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * 表执行器(根据表消费数据,多线程批量写,按序执行)
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:50
+ */
+@Component
+public final class TableGroupBufferActuator extends GeneralBufferActuator implements Cloneable {
+
+    @Resource
+    private TableGroupBufferConfig tableGroupBufferConfig;
+
+    @Resource
+    private ScheduledTaskService scheduledTaskService;
+
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+    private String taskKey;
+
+    private String tableGroupId;
+
+    private volatile boolean running;
+
+    @Override
+    public void init() {
+        // nothing to do
+    }
+
+    @Override
+    protected boolean isRunning(BufferRequest request) {
+        return running;
+    }
+
+    public void buildConfig() {
+        super.setConfig(tableGroupBufferConfig);
+        super.buildQueueConfig();
+        taskKey = UUIDUtil.getUUID();
+        int coreSize = tableGroupBufferConfig.getThreadCoreSize();
+        int queueCapacity = tableGroupBufferConfig.getThreadQueueCapacity();
+        String threadNamePrefix = new StringBuilder("TableGroupExecutor-").append(tableGroupId).append(StringUtil.SYMBOL).toString();
+        threadPoolTaskExecutor = ThreadPoolUtil.newThreadPoolTaskExecutor(coreSize, coreSize, queueCapacity, 30, threadNamePrefix);
+        setGeneralExecutor(threadPoolTaskExecutor);
+        running = true;
+        scheduledTaskService.start(taskKey, tableGroupBufferConfig.getBufferPeriodMillisecond(), this);
+    }
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
+    public void stop() {
+        running = false;
+        if (threadPoolTaskExecutor != null) {
+            threadPoolTaskExecutor.shutdown();
+        }
+        scheduledTaskService.stop(taskKey);
+    }
+
+    public String getTableGroupId() {
+        return tableGroupId;
+    }
+
+    public void setTableGroupId(String tableGroupId) {
+        this.tableGroupId = tableGroupId;
+    }
+}

+ 1 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/StorageRequest.java

@@ -20,6 +20,7 @@ public class StorageRequest implements BufferRequest {
         this.row = row;
     }
 
+    @Override
     public String getMetaId() {
         return metaId;
     }

+ 5 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java

@@ -24,6 +24,11 @@ public class WriterRequest extends AbstractWriter implements BufferRequest {
         this.changedOffset = event.getChangedOffset();
     }
 
+    @Override
+    public String getMetaId() {
+        return changedOffset.getMetaId();
+    }
+    
     public Map getRow() {
         return row;
     }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableFlushStrategy.java

@@ -12,7 +12,7 @@ import org.springframework.stereotype.Component;
  * @date 2021/11/18 22:21
  */
 @Component
-@ConditionalOnProperty(value = "dbsyncer.parser.flush.data.full.enabled", havingValue = "true")
+@ConditionalOnProperty(value = "dbsyncer.storage.full.enabled", havingValue = "true")
 public final class EnableFlushStrategy extends AbstractFlushStrategy {
 
 }

+ 47 - 19
dbsyncer-web/src/main/resources/application.properties

@@ -11,11 +11,37 @@ server.servlet.context-path=/
 # 机器的唯一编号(单机部署则填默认值; 部署集群时, 要保证编号在集群中唯一)
 dbsyncer.web.worker.id=1
 # 定时任务线程数
-dbsyncer.web.task.scheduler.pool-size=8
-# 增量同步任务线程数
-dbsyncer.web.task.executor.core-size=10
-# 增量同步任务线程池队列
-dbsyncer.web.task.executor.queue-capacity=1000
+dbsyncer.web.scheduler.pool-size=8
+
+#parser
+# *********************** 通用执行器配置 ***********************
+# [GeneralBufferActuator]线程数
+dbsyncer.parser.general.thread-core-size=10
+# [GeneralBufferActuator]线程池队列
+dbsyncer.parser.general.thread-queue-capacity=1000
+# [GeneralBufferActuator]单次执行任务数
+dbsyncer.parser.general.buffer-writer-count=100
+# [GeneralBufferActuator]每次消费缓存队列的任务数
+dbsyncer.parser.general.buffer-pull-count=1000
+# [GeneralBufferActuator]缓存队列容量
+dbsyncer.parser.general.buffer-queue-capacity=100000
+# [GeneralBufferActuator]定时消费缓存队列间隔(毫秒)
+dbsyncer.parser.general.buffer-period-millisecond=300
+# *********************** 表执行器配置 ***********************
+# 每个驱动最多可分配的表执行器个数
+dbsyncer.parser.table.group.max-buffer-actuator-size=10
+# [TableGroupBufferActuator]线程数
+dbsyncer.parser.table.group.thread-core-size=5
+# [TableGroupBufferActuator]线程池队列
+dbsyncer.parser.table.group.thread-queue-capacity=1000
+# [TableGroupBufferActuator]单次执行任务数
+dbsyncer.parser.table.group.buffer-writer-count=100
+# [TableGroupBufferActuator]每次消费缓存队列的任务数
+dbsyncer.parser.table.group.buffer-pull-count=1000
+# [TableGroupBufferActuator]缓存队列容量
+dbsyncer.parser.table.group.buffer-queue-capacity=30000
+# [TableGroupBufferActuator]定时消费缓存队列间隔(毫秒)
+dbsyncer.parser.table.group.buffer-period-millisecond=300
 
 #storage
 # 是否使用MySQL存储配置(false-关闭; true-开启)
@@ -24,24 +50,26 @@ dbsyncer.storage.support.mysql.enabled=false
 dbsyncer.storage.support.mysql.config.url=jdbc:mysql://127.0.0.1:3306/dbsyncer?rewriteBatchedStatements=true&seUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false&autoReconnect=true
 dbsyncer.storage.support.mysql.config.username=root
 dbsyncer.storage.support.mysql.config.password=123
-
-#parser
-# 单次执行批量同步任务数
-dbsyncer.parser.flush.buffer.actuator.writer-batch-count=100
-# 每次同步任务缓存队列消费的任务数
-dbsyncer.parser.flush.buffer.actuator.batch-count=1000
-# 同步任务缓存队列容量
-dbsyncer.parser.flush.buffer.actuator.queue-capacity=100000
-# 定时消费缓存队列间隔(毫秒)
-dbsyncer.parser.flush.buffer.actuator.period-millisecond=300
+# [StorageBufferActuator]线程数
+dbsyncer.storage.thread-core-size=5
+# [StorageBufferActuator]线程池队列
+dbsyncer.storage.thread-queue-capacity=500
+# [StorageBufferActuator]单次执行任务数
+dbsyncer.storage.buffer-writer-count=100
+# [StorageBufferActuator]每次消费缓存队列的任务数
+dbsyncer.storage.buffer-pull-count=1000
+# [StorageBufferActuator]缓存队列容量
+dbsyncer.storage.buffer-queue-capacity=50000
+# [StorageBufferActuator]定时消费缓存队列间隔(毫秒)
+dbsyncer.storage.buffer-period-millisecond=300
 # 是否记录全量数据(false-关闭; true-开启)
-dbsyncer.parser.flush.data.full.enabled=false
+dbsyncer.storage.full.enabled=false
 # 是否记录同步成功数据(false-关闭; true-开启)
-dbsyncer.parser.flush.data.writer-success=true
+dbsyncer.storage.writer-success=true
 # 是否记录同步失败数据(false-关闭; true-开启)
-dbsyncer.parser.flush.data.writer-fail=true
+dbsyncer.storage.writer-fail=true
 # 记录同步失败日志最大长度
-dbsyncer.parser.flush.data.max-error-length=2048
+dbsyncer.storage.max-error-length=2048
 
 #plugin
 # 是否开启邮箱通知功能(false-关闭; true-开启)