Jelajahi Sumber

fix config

Signed-off-by: AE86 <836391306@qq.com>
AE86 1 tahun lalu
induk
melakukan
a054f2e393
17 mengubah file dengan 343 tambahan dan 247 penghapusan
  1. 25 34
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/BufferActuatorConfig.java
  2. 1 39
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/FlushExecutorConfig.java
  3. 49 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/GeneralBufferConfig.java
  4. 1 43
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/IncrementDataConfig.java
  5. 89 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/StorageConfig.java
  6. 54 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/TableGroupBufferConfig.java
  7. 1 39
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/WriteExecutorConfig.java
  8. 5 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  9. 2 2
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java
  10. 19 23
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  11. 5 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java
  12. 5 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  13. 16 9
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java
  14. 11 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java
  15. 13 24
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java
  16. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableFlushStrategy.java
  17. 46 21
      dbsyncer-web/src/main/resources/application.properties

+ 25 - 34
dbsyncer-common/src/main/java/org/dbsyncer/common/config/BufferActuatorConfig.java

@@ -1,8 +1,5 @@
 package org.dbsyncer.common.config;
 
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
-
 /**
  * 缓冲区配置
  *
@@ -10,64 +7,58 @@ import org.springframework.context.annotation.Configuration;
  * @version 1.0.0
  * @date 2022/7/14 23:50
  */
-@Configuration
-@ConfigurationProperties(prefix = "dbsyncer.parser.buffer.actuator")
-public class BufferActuatorConfig implements Cloneable {
+public abstract class BufferActuatorConfig {
 
     /**
-     * 写批量
+     * 单次执行任务
      */
-    private int writerBatchCount = 100;
+    private int bufferWriterCount = 100;
 
     /**
-     * 批量同步
+     * 每次消费缓存队列的任务
      */
-    private int batchCount = 1000;
+    private int bufferPullCount = 1000;
 
     /**
-     * 工作线任务队列
+     * 缓存队列容量
      */
-    private int queueCapacity = 5_0000;
+    private int bufferQueueCapacity = 3_0000;
 
     /**
-     * 同步间隔(毫秒)
+     * 定时消费缓存队列间隔(毫秒)
      */
-    private int periodMillisecond = 300;
+    private int bufferPeriodMillisecond = 300;
 
-    public int getWriterBatchCount() {
-        return writerBatchCount;
+    public int getBufferWriterCount() {
+        return bufferWriterCount;
     }
 
-    public void setWriterBatchCount(int writerBatchCount) {
-        this.writerBatchCount = writerBatchCount;
+    public void setBufferWriterCount(int bufferWriterCount) {
+        this.bufferWriterCount = bufferWriterCount;
     }
 
-    public int getBatchCount() {
-        return batchCount;
+    public int getBufferPullCount() {
+        return bufferPullCount;
     }
 
-    public void setBatchCount(int batchCount) {
-        this.batchCount = batchCount;
+    public void setBufferPullCount(int bufferPullCount) {
+        this.bufferPullCount = bufferPullCount;
     }
 
-    public int getQueueCapacity() {
-        return queueCapacity;
+    public int getBufferQueueCapacity() {
+        return bufferQueueCapacity;
     }
 
-    public void setQueueCapacity(int queueCapacity) {
-        this.queueCapacity = queueCapacity;
+    public void setBufferQueueCapacity(int bufferQueueCapacity) {
+        this.bufferQueueCapacity = bufferQueueCapacity;
     }
 
-    public int getPeriodMillisecond() {
-        return periodMillisecond;
+    public int getBufferPeriodMillisecond() {
+        return bufferPeriodMillisecond;
     }
 
-    public void setPeriodMillisecond(int periodMillisecond) {
-        this.periodMillisecond = periodMillisecond;
+    public void setBufferPeriodMillisecond(int bufferPeriodMillisecond) {
+        this.bufferPeriodMillisecond = bufferPeriodMillisecond;
     }
 
-    @Override
-    public Object clone() throws CloneNotSupportedException {
-        return super.clone();
-    }
 }

+ 1 - 39
dbsyncer-common/src/main/java/org/dbsyncer/common/config/FlushExecutorConfig.java

@@ -1,11 +1,5 @@
 package org.dbsyncer.common.config;
 
-import org.dbsyncer.common.util.ThreadPoolUtil;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
 /**
  * 持久化线程池配置
  *
@@ -13,39 +7,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  * @version 1.0.0
  * @date 2020-04-26 23:40
  */
-@Configuration
-@ConfigurationProperties(prefix = "dbsyncer.parser.flush.executor")
+@Deprecated
 public class FlushExecutorConfig {
 
-    /**
-     * 工作线程数
-     */
-    private int coreSize = Runtime.getRuntime().availableProcessors();
-
-    /**
-     * 工作线任务队列
-     */
-    private int queueCapacity = 300;
-
-    @Bean(name = "flushExecutor", destroyMethod = "shutdown")
-    public ThreadPoolTaskExecutor flushExecutor() {
-        return ThreadPoolUtil.newThreadPoolTaskExecutor(coreSize, coreSize, queueCapacity, 30, "flushExecutor-");
-    }
-
-    public int getQueueCapacity() {
-        return queueCapacity;
-    }
-
-    public void setQueueCapacity(int queueCapacity) {
-        this.queueCapacity = queueCapacity;
-    }
-
-    public int getCoreSize() {
-        return coreSize;
-    }
-
-    public void setCoreSize(int coreSize) {
-        this.coreSize = coreSize;
-    }
-
 }

+ 49 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/GeneralBufferConfig.java

@@ -0,0 +1,49 @@
+package org.dbsyncer.common.config;
+
+import org.dbsyncer.common.util.ThreadPoolUtil;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+/**
+ * 通用执行器配置
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2023/8/28 23:50
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.parser.general")
+public class GeneralBufferConfig extends BufferActuatorConfig {
+    /**
+     * 工作线程数
+     */
+    private int threadCoreSize = Runtime.getRuntime().availableProcessors() * 2;
+
+    /**
+     * 工作线任务队列
+     */
+    private int threadQueueCapacity = 1000;
+
+    @Bean(name = "generalExecutor", destroyMethod = "shutdown")
+    public ThreadPoolTaskExecutor generalExecutor() {
+        return ThreadPoolUtil.newThreadPoolTaskExecutor(threadCoreSize, threadCoreSize, threadQueueCapacity, 30, "GeneralExecutor-");
+    }
+
+    public int getThreadCoreSize() {
+        return threadCoreSize;
+    }
+
+    public void setThreadCoreSize(int threadCoreSize) {
+        this.threadCoreSize = threadCoreSize;
+    }
+
+    public int getThreadQueueCapacity() {
+        return threadQueueCapacity;
+    }
+
+    public void setThreadQueueCapacity(int threadQueueCapacity) {
+        this.threadQueueCapacity = threadQueueCapacity;
+    }
+}

+ 1 - 43
dbsyncer-common/src/main/java/org/dbsyncer/common/config/IncrementDataConfig.java

@@ -1,53 +1,11 @@
 package org.dbsyncer.common.config;
 
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
-
 /**
  * @author AE86
  * @version 1.0.0
  * @date 2022/9/21 22:20
  */
-@Configuration
-@ConfigurationProperties(prefix = "dbsyncer.parser.flush.data")
+@Deprecated
 public class IncrementDataConfig {
 
-    /**
-     * 是否记录同步成功数据
-     */
-    private boolean writerSuccess;
-
-    /**
-     * 是否记录同步失败数据
-     */
-    private boolean writerFail;
-
-    /**
-     * 最大记录异常信息长度
-     */
-    private int maxErrorLength;
-
-    public boolean isWriterSuccess() {
-        return writerSuccess;
-    }
-
-    public void setWriterSuccess(boolean writerSuccess) {
-        this.writerSuccess = writerSuccess;
-    }
-
-    public boolean isWriterFail() {
-        return writerFail;
-    }
-
-    public void setWriterFail(boolean writerFail) {
-        this.writerFail = writerFail;
-    }
-
-    public int getMaxErrorLength() {
-        return maxErrorLength;
-    }
-
-    public void setMaxErrorLength(int maxErrorLength) {
-        this.maxErrorLength = maxErrorLength;
-    }
 }

+ 89 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/StorageConfig.java

@@ -0,0 +1,89 @@
+package org.dbsyncer.common.config;
+
+import org.dbsyncer.common.util.ThreadPoolUtil;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+/**
+ * 持久化配置
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2023/8/28 23:50
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.storage")
+public class StorageConfig extends BufferActuatorConfig {
+
+    /**
+     * 工作线程数
+     */
+    private int threadCoreSize = Runtime.getRuntime().availableProcessors();
+
+    /**
+     * 工作线任务队列
+     */
+    private int threadQueueCapacity = 500;
+
+    /**
+     * 是否记录同步成功数据
+     */
+    private boolean writerSuccess;
+
+    /**
+     * 是否记录同步失败数据
+     */
+    private boolean writerFail;
+
+    /**
+     * 最大记录异常信息长度
+     */
+    private int maxErrorLength;
+
+    @Bean(name = "storageExecutor", destroyMethod = "shutdown")
+    public ThreadPoolTaskExecutor storageExecutor() {
+        return ThreadPoolUtil.newThreadPoolTaskExecutor(threadCoreSize, threadCoreSize, threadQueueCapacity, 30, "StorageExecutor-");
+    }
+
+    public int getThreadCoreSize() {
+        return threadCoreSize;
+    }
+
+    public void setThreadCoreSize(int threadCoreSize) {
+        this.threadCoreSize = threadCoreSize;
+    }
+
+    public int getThreadQueueCapacity() {
+        return threadQueueCapacity;
+    }
+
+    public void setThreadQueueCapacity(int threadQueueCapacity) {
+        this.threadQueueCapacity = threadQueueCapacity;
+    }
+
+    public boolean isWriterSuccess() {
+        return writerSuccess;
+    }
+
+    public void setWriterSuccess(boolean writerSuccess) {
+        this.writerSuccess = writerSuccess;
+    }
+
+    public boolean isWriterFail() {
+        return writerFail;
+    }
+
+    public void setWriterFail(boolean writerFail) {
+        this.writerFail = writerFail;
+    }
+
+    public int getMaxErrorLength() {
+        return maxErrorLength;
+    }
+
+    public void setMaxErrorLength(int maxErrorLength) {
+        this.maxErrorLength = maxErrorLength;
+    }
+}

+ 54 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/TableGroupBufferConfig.java

@@ -0,0 +1,54 @@
+package org.dbsyncer.common.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * 表执行器配置
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2023/8/28 23:50
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.parser.table.group")
+public class TableGroupBufferConfig extends BufferActuatorConfig {
+    /**
+     * 最多可分配的表执行器个数
+     */
+    private int maxBufferActuatorSize;
+
+    /**
+     * 工作线程数
+     */
+    private int threadCoreSize = Runtime.getRuntime().availableProcessors() * 2;
+
+    /**
+     * 工作线任务队列
+     */
+    private int threadQueueCapacity = 1000;
+
+    public int getMaxBufferActuatorSize() {
+        return maxBufferActuatorSize;
+    }
+
+    public void setMaxBufferActuatorSize(int maxBufferActuatorSize) {
+        this.maxBufferActuatorSize = maxBufferActuatorSize;
+    }
+
+    public int getThreadCoreSize() {
+        return threadCoreSize;
+    }
+
+    public void setThreadCoreSize(int threadCoreSize) {
+        this.threadCoreSize = threadCoreSize;
+    }
+
+    public int getThreadQueueCapacity() {
+        return threadQueueCapacity;
+    }
+
+    public void setThreadQueueCapacity(int threadQueueCapacity) {
+        this.threadQueueCapacity = threadQueueCapacity;
+    }
+}

+ 1 - 39
dbsyncer-common/src/main/java/org/dbsyncer/common/config/WriteExecutorConfig.java

@@ -1,49 +1,11 @@
 package org.dbsyncer.common.config;
 
-import org.dbsyncer.common.util.ThreadPoolUtil;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
 /**
  * @author AE86
  * @version 1.0.0
  * @date 2020-04-26 23:40
  */
-@Configuration
-@ConfigurationProperties(prefix = "dbsyncer.parser.write.executor")
+@Deprecated
 public class WriteExecutorConfig {
 
-    /**
-     * 工作线程数
-     */
-    private int coreSize = Runtime.getRuntime().availableProcessors() * 2;
-
-    /**
-     * 工作线任务队列
-     */
-    private int queueCapacity = 1000;
-
-    @Bean(name = "writeExecutor", destroyMethod = "shutdown")
-    public ThreadPoolTaskExecutor writeExecutor() {
-        return ThreadPoolUtil.newThreadPoolTaskExecutor(coreSize, coreSize, queueCapacity, 30, "writeExecutor-");
-    }
-
-    public int getQueueCapacity() {
-        return queueCapacity;
-    }
-
-    public void setQueueCapacity(int queueCapacity) {
-        this.queueCapacity = queueCapacity;
-    }
-
-    public int getCoreSize() {
-        return coreSize;
-    }
-
-    public void setCoreSize(int coreSize) {
-        this.coreSize = coreSize;
-    }
-
 }

+ 5 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.manager.puller;
 
+import org.dbsyncer.common.config.TableGroupBufferConfig;
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedOffset;
 import org.dbsyncer.common.event.RefreshOffsetEvent;
@@ -63,6 +64,9 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    @Resource
+    private TableGroupBufferConfig tableGroupBufferConfig;
+
     @Resource
     private Parser parser;
 
@@ -242,7 +246,7 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
 
         protected void buildBufferActuator(String tableGroupId) {
             // TODO 暂定执行器上限,待替换为LRU模型
-            if (router.size() >= MAX_BUFFER_ACTUATOR_SIZE) {
+            if (router.size() >= tableGroupBufferConfig.getMaxBufferActuatorSize()) {
                 return;
             }
             router.computeIfAbsent(tableGroupId, k -> {
@@ -251,7 +255,6 @@ public class IncrementPuller extends AbstractPuller implements ApplicationListen
                     newBufferActuator = (TableGroupBufferActuator) tableGroupBufferActuator.clone();
                     newBufferActuator.setTableGroupId(tableGroupId);
                     newBufferActuator.buildConfig();
-                    return newBufferActuator;
                 } catch (CloneNotSupportedException ex) {
                     logger.error(ex.getMessage(), ex);
                 }

+ 2 - 2
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -57,7 +57,7 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
     private Manager manager;
 
     @Resource
-    private Executor writeExecutor;
+    private Executor generalExecutor;
 
     @Resource
     private BufferActuator generalBufferActuator;
@@ -172,7 +172,7 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
 
     @Override
     public List<MetricResponse> getMetricInfo() {
-        ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) writeExecutor;
+        ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) generalExecutor;
         ThreadPoolExecutor pool = threadTask.getThreadPoolExecutor();
 
         List<MetricResponse> list = new ArrayList<>();

+ 19 - 23
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -10,7 +10,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import java.lang.reflect.ParameterizedType;
 import java.time.Duration;
@@ -43,12 +42,10 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
     private final Lock taskLock = new ReentrantLock();
     private final Lock queueLock = new ReentrantLock(true);
     private final Condition isFull = queueLock.newCondition();
-    private final Duration OFFER_INTERVAL = Duration.of(500, ChronoUnit.MILLIS);
+    private final Duration offerInterval = Duration.of(500, ChronoUnit.MILLIS);
+    private BufferActuatorConfig config;
     private BlockingQueue<Request> queue;
 
-    @Resource
-    private BufferActuatorConfig bufferActuatorConfig;
-
     @Resource
     private ScheduledTaskService scheduledTaskService;
 
@@ -69,18 +66,21 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
         Assert.notNull(responseClazz, String.format("%s的父类%s泛型参数Response为空.", getClass().getName(), AbstractBufferActuator.class.getName()));
     }
 
-    @PostConstruct
-    public void init() {
-        buildBufferActuatorConfig();
-        scheduledTaskService.start(bufferActuatorConfig.getPeriodMillisecond(), this);
+    /**
+     * 初始化配置
+     */
+    protected void buildConfig() {
+        Assert.notNull(config, "请先配置缓存执行器,setConfig(BufferActuatorConfig config)");
+        buildQueueConfig();
+        scheduledTaskService.start(config.getBufferPeriodMillisecond(), this);
     }
 
     /**
-     * 初始化配置
+     * 初始化缓存队列配置
      */
-    protected void buildBufferActuatorConfig() {
-        this.queue = new LinkedBlockingQueue(bufferActuatorConfig.getQueueCapacity());
-        logger.info("初始化{}容量:{}", this.getClass().getSimpleName(), bufferActuatorConfig.getQueueCapacity());
+    protected void buildQueueConfig() {
+        this.queue = new LinkedBlockingQueue(config.getBufferQueueCapacity());
+        logger.info("初始化{}容量:{}", this.getClass().getSimpleName(), config.getBufferQueueCapacity());
     }
 
     /**
@@ -137,7 +137,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
                 while (isRunning(request) && !queue.offer((Request) request)) {
                     logger.warn("[{}]缓存队列容量已达上限[{}], 正在阻塞重试.", this.getClass().getSimpleName(), getQueueCapacity());
                     try {
-                        isFull.await(OFFER_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
+                        isFull.await(offerInterval.toMillis(), TimeUnit.MILLISECONDS);
                     } catch (InterruptedException e) {
                         break;
                     }
@@ -172,7 +172,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
 
     @Override
     public int getQueueCapacity() {
-        return bufferActuatorConfig.getQueueCapacity();
+        return config.getBufferQueueCapacity();
     }
 
     private void submit() throws IllegalAccessException, InstantiationException {
@@ -182,7 +182,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
 
         AtomicLong batchCounter = new AtomicLong();
         Map<String, Response> map = new LinkedHashMap<>();
-        while (!queue.isEmpty() && batchCounter.get() < bufferActuatorConfig.getBatchCount()) {
+        while (!queue.isEmpty() && batchCounter.get() < config.getBufferPullCount()) {
             Request poll = queue.poll();
             String key = getPartitionKey(poll);
             if (!map.containsKey(key)) {
@@ -205,18 +205,14 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             }
-            logger.info("{}[{}{}]{}, {}ms", this.getClass().getSimpleName(), key, response.getSuffixName(), response.getTaskSize(), (Instant.now().toEpochMilli() - now));
+            logger.info("[{}{}]{}, {}ms", key, response.getSuffixName(), response.getTaskSize(), (Instant.now().toEpochMilli() - now));
         });
         map.clear();
         map = null;
         batchCounter = null;
     }
 
-    public BufferActuatorConfig getBufferActuatorConfig() {
-        return bufferActuatorConfig;
-    }
-
-    public void setBufferActuatorConfig(BufferActuatorConfig bufferActuatorConfig) {
-        this.bufferActuatorConfig = bufferActuatorConfig;
+    public void setConfig(BufferActuatorConfig config) {
+        this.config = config;
     }
 }

+ 5 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.cache.CacheService;
-import org.dbsyncer.common.config.IncrementDataConfig;
+import org.dbsyncer.common.config.StorageConfig;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
@@ -26,7 +26,7 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     private CacheService cacheService;
 
     @Resource
-    private IncrementDataConfig flushDataConfig;
+    private StorageConfig storageConfig;
 
     @Override
     public void flushFullData(String metaId, Result result, String event) {
@@ -41,13 +41,13 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     protected void flush(String metaId, Result result, String event) {
         refreshTotal(metaId, result);
 
-        if (flushDataConfig.isWriterFail() && !CollectionUtils.isEmpty(result.getFailData())) {
-            final String error = StringUtil.substring(result.getError().toString(), 0, flushDataConfig.getMaxErrorLength());
+        if (storageConfig.isWriterFail() && !CollectionUtils.isEmpty(result.getFailData())) {
+            final String error = StringUtil.substring(result.getError().toString(), 0, storageConfig.getMaxErrorLength());
             flushService.asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, false, result.getFailData(), error);
         }
 
         // 是否写增量数据
-        if (flushDataConfig.isWriterSuccess() && !CollectionUtils.isEmpty(result.getSuccessData())) {
+        if (storageConfig.isWriterSuccess() && !CollectionUtils.isEmpty(result.getSuccessData())) {
             flushService.asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, true, result.getSuccessData(), "");
         }
     }

+ 5 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.parser.flush.impl;
 
-import org.dbsyncer.common.config.IncrementDataConfig;
+import org.dbsyncer.common.config.StorageConfig;
 import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.flush.BufferActuator;
@@ -46,14 +46,14 @@ public class FlushServiceImpl implements FlushService {
     private BufferActuator storageBufferActuator;
 
     @Resource
-    private IncrementDataConfig flushDataConfig;
+    private StorageConfig storageConfig;
 
     @Resource
-    private Executor flushExecutor;
+    private Executor storageExecutor;
 
     @Override
     public void asyncWrite(String type, String error) {
-        flushExecutor.execute(() -> {
+        storageExecutor.execute(() -> {
             Map<String, Object> params = new HashMap();
             params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
             params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
@@ -93,7 +93,7 @@ public class FlushServiceImpl implements FlushService {
      * @return
      */
     private String substring(String error) {
-        return StringUtil.substring(error, 0, flushDataConfig.getMaxErrorLength());
+        return StringUtil.substring(error, 0, storageConfig.getMaxErrorLength());
     }
 
 }

+ 16 - 9
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.cache.CacheService;
+import org.dbsyncer.common.config.GeneralBufferConfig;
 import org.dbsyncer.common.event.RefreshOffsetEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.model.IncrementConvertContext;
@@ -25,6 +26,7 @@ import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
+import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import java.util.List;
 import java.util.Map;
@@ -41,7 +43,10 @@ import java.util.concurrent.Executor;
 public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest, WriterResponse> {
 
     @Resource
-    private Executor writeExecutor;
+    private GeneralBufferConfig generalBufferConfig;
+
+    @Resource
+    private Executor generalExecutor;
 
     @Resource
     private ConnectorFactory connectorFactory;
@@ -61,6 +66,12 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Resource
     private ApplicationContext applicationContext;
 
+    @PostConstruct
+    public void init() {
+        setConfig(generalBufferConfig);
+        buildConfig();
+    }
+
     @Override
     protected String getPartitionKey(WriterRequest request) {
         return request.getTableGroupId();
@@ -108,8 +119,8 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         pluginFactory.convert(group.getPlugin(), context);
 
         // 5、批量执行同步
-        BatchWriter batchWriter = new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, getBufferActuatorConfig().getWriterBatchCount());
-        Result result = parserFactory.writeBatch(context, batchWriter, writeExecutor);
+        BatchWriter batchWriter = new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, generalBufferConfig.getBufferWriterCount());
+        Result result = parserFactory.writeBatch(context, batchWriter, generalExecutor);
 
         // 6.发布刷新增量点事件
         applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
@@ -136,11 +147,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         return conn.getConfig();
     }
 
-    public Executor getWriteExecutor() {
-        return writeExecutor;
-    }
-
-    public void setWriteExecutor(Executor writeExecutor) {
-        this.writeExecutor = writeExecutor;
+    public void setGeneralExecutor(Executor generalExecutor) {
+        this.generalExecutor = generalExecutor;
     }
 }

+ 11 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.parser.flush.impl;
 
+import org.dbsyncer.common.config.StorageConfig;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.StorageRequest;
 import org.dbsyncer.parser.model.StorageResponse;
@@ -7,6 +8,7 @@ import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 
 /**
@@ -19,9 +21,18 @@ import javax.annotation.Resource;
 @Component
 public final class StorageBufferActuator extends AbstractBufferActuator<StorageRequest, StorageResponse> {
 
+    @Resource
+    private StorageConfig storageConfig;
+
     @Resource
     private StorageService storageService;
 
+    @PostConstruct
+    public void init() {
+        setConfig(storageConfig);
+        buildConfig();
+    }
+
     @Override
     protected String getPartitionKey(StorageRequest request) {
         return request.getMetaId();

+ 13 - 24
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/TableGroupBufferActuator.java

@@ -1,14 +1,11 @@
 package org.dbsyncer.parser.flush.impl;
 
-import org.dbsyncer.common.config.BufferActuatorConfig;
-import org.dbsyncer.common.config.WriteExecutorConfig;
+import org.dbsyncer.common.config.TableGroupBufferConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.ThreadPoolUtil;
 import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.parser.flush.BufferRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
@@ -24,13 +21,11 @@ import javax.annotation.Resource;
 @Component
 public final class TableGroupBufferActuator extends GeneralBufferActuator implements Cloneable {
 
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
     @Resource
-    private ScheduledTaskService scheduledTaskService;
+    private TableGroupBufferConfig tableGroupBufferConfig;
 
     @Resource
-    private WriteExecutorConfig writeExecutorConfig;
+    private ScheduledTaskService scheduledTaskService;
 
     private ThreadPoolTaskExecutor threadPoolTaskExecutor;
 
@@ -51,22 +46,16 @@ public final class TableGroupBufferActuator extends GeneralBufferActuator implem
     }
 
     public void buildConfig() {
-        BufferActuatorConfig actuatorConfig = super.getBufferActuatorConfig();
-        try {
-            BufferActuatorConfig newConfig = (BufferActuatorConfig) actuatorConfig.clone();
-            // TODO 暂定容量上限
-            newConfig.setQueueCapacity(50000);
-            setBufferActuatorConfig(newConfig);
-            running = true;
-            super.buildBufferActuatorConfig();
-            taskKey = UUIDUtil.getUUID();
-            String threadNamePrefix = new StringBuilder("writeExecutor-").append(tableGroupId).append(StringUtil.SYMBOL).toString();
-            threadPoolTaskExecutor = ThreadPoolUtil.newThreadPoolTaskExecutor(5, 5, 1000, 30, threadNamePrefix);
-            setWriteExecutor(threadPoolTaskExecutor);
-            scheduledTaskService.start(taskKey, getBufferActuatorConfig().getPeriodMillisecond(), this);
-        } catch (CloneNotSupportedException e) {
-            logger.error(e.getMessage(), e);
-        }
+        super.setConfig(tableGroupBufferConfig);
+        super.buildQueueConfig();
+        taskKey = UUIDUtil.getUUID();
+        int coreSize = tableGroupBufferConfig.getThreadCoreSize();
+        int queueCapacity = tableGroupBufferConfig.getThreadQueueCapacity();
+        String threadNamePrefix = new StringBuilder("TableGroupExecutor-").append(tableGroupId).append(StringUtil.SYMBOL).toString();
+        threadPoolTaskExecutor = ThreadPoolUtil.newThreadPoolTaskExecutor(coreSize, coreSize, queueCapacity, 30, threadNamePrefix);
+        setGeneralExecutor(threadPoolTaskExecutor);
+        running = true;
+        scheduledTaskService.start(taskKey, tableGroupBufferConfig.getBufferPeriodMillisecond(), this);
     }
 
     @Override

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableFlushStrategy.java

@@ -12,7 +12,7 @@ import org.springframework.stereotype.Component;
  * @date 2021/11/18 22:21
  */
 @Component
-@ConditionalOnProperty(value = "dbsyncer.parser.flush.data.full.enabled", havingValue = "true")
+@ConditionalOnProperty(value = "dbsyncer.storage.full.enabled", havingValue = "true")
 public final class EnableFlushStrategy extends AbstractFlushStrategy {
 
 }

+ 46 - 21
dbsyncer-web/src/main/resources/application.properties

@@ -13,6 +13,36 @@ dbsyncer.web.worker.id=1
 # 定时任务线程数
 dbsyncer.web.scheduler.pool-size=8
 
+#parser
+# *********************** 通用执行器配置 ***********************
+# [GeneralBufferActuator]线程数
+dbsyncer.parser.general.thread-core-size=10
+# [GeneralBufferActuator]线程池队列
+dbsyncer.parser.general.thread-queue-capacity=1000
+# [GeneralBufferActuator]单次执行任务数
+dbsyncer.parser.general.buffer-writer-count=100
+# [GeneralBufferActuator]每次消费缓存队列的任务数
+dbsyncer.parser.general.buffer-pull-count=1000
+# [GeneralBufferActuator]缓存队列容量
+dbsyncer.parser.general.buffer-queue-capacity=100000
+# [GeneralBufferActuator]定时消费缓存队列间隔(毫秒)
+dbsyncer.parser.general.buffer-period-millisecond=300
+# *********************** 表执行器配置 ***********************
+# 每个驱动最多可分配的表执行器个数
+dbsyncer.parser.table.group.max-buffer-actuator-size=10
+# [TableGroupBufferActuator]线程数
+dbsyncer.parser.table.group.thread-core-size=5
+# [TableGroupBufferActuator]线程池队列
+dbsyncer.parser.table.group.thread-queue-capacity=1000
+# [TableGroupBufferActuator]单次执行任务数
+dbsyncer.parser.table.group.buffer-writer-count=100
+# [TableGroupBufferActuator]每次消费缓存队列的任务数
+dbsyncer.parser.table.group.buffer-pull-count=1000
+# [TableGroupBufferActuator]缓存队列容量
+dbsyncer.parser.table.group.buffer-queue-capacity=30000
+# [TableGroupBufferActuator]定时消费缓存队列间隔(毫秒)
+dbsyncer.parser.table.group.buffer-period-millisecond=300
+
 #storage
 # 是否使用MySQL存储配置(false-关闭; true-开启)
 # false: 保存磁盘/data/config(驱动配置)|data(按驱动分别存储增量数据)|log(系统日志)}
@@ -20,31 +50,26 @@ dbsyncer.storage.support.mysql.enabled=false
 dbsyncer.storage.support.mysql.config.url=jdbc:mysql://127.0.0.1:3306/dbsyncer?rewriteBatchedStatements=true&seUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false&autoReconnect=true
 dbsyncer.storage.support.mysql.config.username=root
 dbsyncer.storage.support.mysql.config.password=123
-
-#parser
-# **************************** 增量同步配置 ****************************
-# 增量同步线程数
-dbsyncer.parser.write.executor.core-size=10
-# 增量同步线程池队列
-dbsyncer.parser.write.executor.queue-capacity=1000
-# **************************** 缓冲区配置 ****************************
-# 单次执行批量同步任务数
-dbsyncer.parser.buffer.actuator.writer-batch-count=100
-# 每次同步任务缓存队列消费的任务数
-dbsyncer.parser.buffer.actuator.batch-count=1000
-# 同步任务缓存队列容量
-dbsyncer.parser.buffer.actuator.queue-capacity=100000
-# 定时消费缓存队列间隔(毫秒)
-dbsyncer.parser.buffer.actuator.period-millisecond=300
-# **************************** 持久化配置 ****************************
+# [StorageBufferActuator]线程数
+dbsyncer.storage.thread-core-size=5
+# [StorageBufferActuator]线程池队列
+dbsyncer.storage.thread-queue-capacity=500
+# [StorageBufferActuator]单次执行任务数
+dbsyncer.storage.buffer-writer-count=100
+# [StorageBufferActuator]每次消费缓存队列的任务数
+dbsyncer.storage.buffer-pull-count=1000
+# [StorageBufferActuator]缓存队列容量
+dbsyncer.storage.buffer-queue-capacity=50000
+# [StorageBufferActuator]定时消费缓存队列间隔(毫秒)
+dbsyncer.storage.buffer-period-millisecond=300
 # 是否记录全量数据(false-关闭; true-开启)
-dbsyncer.parser.flush.data.full.enabled=false
+dbsyncer.storage.full.enabled=false
 # 是否记录同步成功数据(false-关闭; true-开启)
-dbsyncer.parser.flush.data.writer-success=true
+dbsyncer.storage.writer-success=true
 # 是否记录同步失败数据(false-关闭; true-开启)
-dbsyncer.parser.flush.data.writer-fail=true
+dbsyncer.storage.writer-fail=true
 # 记录同步失败日志最大长度
-dbsyncer.parser.flush.data.max-error-length=2048
+dbsyncer.storage.max-error-length=2048
 
 #plugin
 # 是否开启邮箱通知功能(false-关闭; true-开启)