浏览代码

优化增量写入

AE86 3 年之前
父节点
当前提交
6b6d337f6c
共有 19 个文件被更改,包括 523 次插入107 次删除
  1. 2 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskService.java
  2. 5 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java
  3. 8 8
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java
  4. 14 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  5. 16 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java
  6. 70 69
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  7. 121 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  8. 10 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferTask.java
  9. 12 12
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java
  10. 17 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushTask.java
  11. 12 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java
  12. 6 6
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushStrategy.java
  13. 4 4
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/DisableFullFlushStrategy.java
  14. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  15. 60 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java
  16. 68 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterBufferTask.java
  17. 93 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterFlushTask.java
  18. 2 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/PickerUtil.java
  19. 1 1
      dbsyncer-web/src/main/resources/application.properties

+ 2 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskService.java

@@ -21,6 +21,8 @@ public interface ScheduledTaskService {
 
     void start(String cron, ScheduledTaskJob job);
 
+    void start(long period, ScheduledTaskJob job);
+
     void stop(String key);
 
 }

+ 5 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java

@@ -49,6 +49,11 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService, Disposabl
         start(UUIDUtil.getUUID(), cron, job);
     }
 
+    @Override
+    public void start(long period, ScheduledTaskJob job) {
+        start(UUIDUtil.getUUID(), period, job);
+    }
+
     @Override
     public void stop(String key) {
         ScheduledFuture job = map.get(key);

+ 8 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -121,24 +121,24 @@ public class ConnectorFactory implements DisposableBean {
         return map;
     }
 
-    public long getCount(ConnectorMapper connectionMapper, Map<String, String> command) {
-        return getConnector(connectionMapper).getCount(connectionMapper, command);
+    public long getCount(ConnectorMapper connectorMapper, Map<String, String> command) {
+        return getConnector(connectorMapper).getCount(connectorMapper, command);
     }
 
-    public Result reader(ConnectorMapper connectionMapper, ReaderConfig config) {
-        Result result = getConnector(connectionMapper).reader(connectionMapper, config);
+    public Result reader(ConnectorMapper connectorMapper, ReaderConfig config) {
+        Result result = getConnector(connectorMapper).reader(connectorMapper, config);
         Assert.notNull(result, "Connector reader result can not null");
         return result;
     }
 
-    public Result writer(ConnectorMapper connectionMapper, WriterBatchConfig config) {
-        Result result = getConnector(connectionMapper).writer(connectionMapper, config);
+    public Result writer(ConnectorMapper connectorMapper, WriterBatchConfig config) {
+        Result result = getConnector(connectorMapper).writer(connectorMapper, config);
         Assert.notNull(result, "Connector writer batch result can not null");
         return result;
     }
 
-    public Result writer(ConnectorMapper connectionMapper, WriterSingleConfig config) {
-        Result result = getConnector(connectionMapper).writer(connectionMapper, config);
+    public Result writer(ConnectorMapper connectorMapper, WriterSingleConfig config) {
+        Result result = getConnector(connectorMapper).writer(connectorMapper, config);
         Assert.notNull(result, "Connector writer single result can not null");
         return result;
     }

+ 14 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -103,13 +103,13 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
     @Override
     public Result writer(DatabaseConnectorMapper connectorMapper, WriterBatchConfig config) {
-        List<Field> fields = config.getFields();
+        String event = config.getEvent();
         List<Map> data = config.getData();
 
         // 1、获取SQL
-        String executeSql = config.getCommand().get(config.getEvent());
+        String executeSql = config.getCommand().get(event);
         Assert.hasText(executeSql, "执行SQL语句不能为空.");
-        if (CollectionUtils.isEmpty(fields)) {
+        if (CollectionUtils.isEmpty(config.getFields())) {
             logger.error("writer fields can not be empty.");
             throw new ConnectorException("writer fields can not be empty.");
         }
@@ -117,9 +117,18 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             logger.error("writer data can not be empty.");
             throw new ConnectorException("writer data can not be empty.");
         }
+        List<Field> fields = new ArrayList<>(config.getFields());
+        Field pkField = getPrimaryKeyField(config.getFields());
+        // Update / Delete
+        if (!isInsert(event)) {
+            if (isDelete(event)) {
+                fields.clear();
+            }
+            fields.add(pkField);
+        }
+
         final int size = data.size();
         final int fSize = fields.size();
-
         Result result = new Result();
         try {
             // 2、设置参数
@@ -161,7 +170,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
         Field pkField = getPrimaryKeyField(fields);
         // Update / Delete
-        if (isUpdate(event) || isDelete(event)) {
+        if (!isInsert(event)) {
             if (isDelete(event)) {
                 fields.clear();
             }

+ 16 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -1,6 +1,8 @@
 package org.dbsyncer.parser;
 
 import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.model.Result;
+import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.parser.model.Task;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
@@ -161,4 +163,18 @@ public interface Parser {
      * @param rowChangedEvent
      */
     void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent rowChangedEvent);
+
+    /**
+     * 批执行
+     *
+     * @param connectorMapper
+     * @param command
+     * @param event
+     * @param fields
+     * @param dataList
+     * @param batchSize
+     * @return
+     */
+    Result writeBatch(ConnectorMapper connectorMapper, Map<String, String> command, String event, List<Field> fields, List<Map> dataList, int batchSize);
+
 }

+ 70 - 69
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -17,10 +17,12 @@ import org.dbsyncer.listener.enums.QuartzFilterEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.enums.ParserEnum;
 import org.dbsyncer.parser.event.FullRefreshEvent;
+import org.dbsyncer.parser.flush.BufferActuator;
+import org.dbsyncer.parser.flush.FlushStrategy;
+import org.dbsyncer.parser.flush.model.WriterBufferTask;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.*;
-import org.dbsyncer.parser.flush.FlushStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
@@ -77,6 +79,9 @@ public class ParserFactory implements Parser {
     @Autowired
     private ApplicationContext applicationContext;
 
+    @Autowired
+    private BufferActuator writerBufferActuator;
+
     @Override
     public ConnectorMapper connect(ConnectorConfig config) {
         return connectorFactory.connect(config);
@@ -266,8 +271,8 @@ public class ParserFactory implements Parser {
         params.putIfAbsent(ParserEnum.PAGE_INDEX.getCode(), ParserEnum.PAGE_INDEX.getDefaultValue());
         int pageSize = mapping.getReadNum();
         int batchSize = mapping.getBatchNum();
-        ConnectorMapper sConnectionMapper = connectorFactory.connect(sConfig);
-        ConnectorMapper tConnectionMapper = connectorFactory.connect(tConfig);
+        ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
+        ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
 
         for (; ; ) {
             if (!task.isRunning()) {
@@ -277,7 +282,7 @@ public class ParserFactory implements Parser {
 
             // 1、获取数据源数据
             int pageIndex = Integer.parseInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
-            Result reader = connectorFactory.reader(sConnectionMapper, new ReaderConfig(command, new ArrayList<>(), pageIndex, pageSize));
+            Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), pageIndex, pageSize));
             List<Map> data = reader.getData();
             if (CollectionUtils.isEmpty(data)) {
                 params.clear();
@@ -295,7 +300,7 @@ public class ParserFactory implements Parser {
             pluginFactory.convert(group.getPlugin(), data, target);
 
             // 5、写入目标源
-            Result writer = writeBatch(tConnectionMapper, command, picker.getTargetFields(), target, batchSize);
+            Result writer = writeBatch(tConnectorMapper, command, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
 
             // 6、更新结果
             flush(task, writer, target);
@@ -324,11 +329,67 @@ public class ParserFactory implements Parser {
         // 3、插件转换
         pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
 
-        // 4、写入目标源
-        Result writer = connectorFactory.writer(tConnectorMapper, new WriterSingleConfig(picker.getTargetFields(), tableGroup.getCommand(), event, target, rowChangedEvent.getTableName(), rowChangedEvent.isForceUpdate()));
+        // 4、写入缓冲执行器
+        writerBufferActuator.offer(new WriterBufferTask(metaId, tableGroup.getId(), event, tConnectorMapper, picker.getTargetFields(), tableGroup.getCommand(), target));
+    }
+
+    /**
+     * 批量写入
+     *
+     * @param connectorMapper
+     * @param command
+     * @param fields
+     * @param dataList
+     * @param batchSize
+     * @return
+     */
+    @Override
+    public Result writeBatch(ConnectorMapper connectorMapper, Map<String, String> command, String event, List<Field> fields, List<Map> dataList, int batchSize) {
+        // 总数
+        int total = dataList.size();
+        // 单次任务
+        if (total <= batchSize) {
+            return connectorFactory.writer(connectorMapper, new WriterBatchConfig(event, command, fields, dataList));
+        }
+
+        // 批量任务, 拆分
+        int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
+
+        final Result result = new Result();
+        final CountDownLatch latch = new CountDownLatch(taskSize);
+        int fromIndex = 0;
+        int toIndex = batchSize;
+        for (int i = 0; i < taskSize; i++) {
+            final List<Map> data;
+            if (toIndex > total) {
+                toIndex = fromIndex + (total % batchSize);
+                data = dataList.subList(fromIndex, toIndex);
+            } else {
+                data = dataList.subList(fromIndex, toIndex);
+                fromIndex += batchSize;
+                toIndex += batchSize;
+            }
 
-        // 5、更新结果
-        flushStrategy.flushIncrementData(metaId, writer, event, picker.getTargetMapList());
+            taskExecutor.execute(() -> {
+                try {
+                    Result w = connectorFactory.writer(connectorMapper, new WriterBatchConfig(event, command, fields, data));
+                    // CAS
+                    result.getFailData().addAll(w.getFailData());
+                    result.getFail().getAndAdd(w.getFail().get());
+                    result.getError().append(w.getError());
+                } catch (Exception e) {
+                    result.getError().append(e.getMessage()).append(System.lineSeparator());
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        }
+        return result;
     }
 
     /**
@@ -385,64 +446,4 @@ public class ParserFactory implements Parser {
         return connector.getConfig();
     }
 
-    /**
-     * 批量写入
-     *
-     * @param connectorMapper
-     * @param command
-     * @param fields
-     * @param target
-     * @param batchSize
-     * @return
-     */
-    private Result writeBatch(ConnectorMapper connectorMapper, Map<String, String> command, List<Field> fields, List<Map> target, int batchSize) {
-        // 事件
-        String event = ConnectorConstant.OPERTION_INSERT;
-        // 总数
-        int total = target.size();
-        // 单次任务
-        if (total <= batchSize) {
-            return connectorFactory.writer(connectorMapper, new WriterBatchConfig(event, command, fields, target));
-        }
-
-        // 批量任务, 拆分
-        int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
-
-        final Result result = new Result();
-        final CountDownLatch latch = new CountDownLatch(taskSize);
-        int fromIndex = 0;
-        int toIndex = batchSize;
-        for (int i = 0; i < taskSize; i++) {
-            final List<Map> data;
-            if (toIndex > total) {
-                toIndex = fromIndex + (total % batchSize);
-                data = target.subList(fromIndex, toIndex);
-            } else {
-                data = target.subList(fromIndex, toIndex);
-                fromIndex += batchSize;
-                toIndex += batchSize;
-            }
-
-            taskExecutor.execute(() -> {
-                try {
-                    Result w = connectorFactory.writer(connectorMapper, new WriterBatchConfig(event, command, fields, data));
-                    // CAS
-                    result.getFailData().addAll(w.getFailData());
-                    result.getFail().getAndAdd(w.getFail().get());
-                    result.getError().append(w.getError());
-                } catch (Exception e) {
-                    result.getError().append(e.getMessage()).append(System.lineSeparator());
-                } finally {
-                    latch.countDown();
-                }
-            });
-        }
-        try {
-            latch.await();
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage());
-        }
-        return result;
-    }
-
 }

+ 121 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -0,0 +1,121 @@
+package org.dbsyncer.parser.flush;
+
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.annotation.PostConstruct;
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 17:36
+ */
+public abstract class AbstractBufferActuator<B, F> implements BufferActuator, ScheduledTaskJob {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private ScheduledTaskService scheduledTaskService;
+
+    private Queue<B> buffer = new ConcurrentLinkedQueue();
+
+    private Queue<B> temp = new ConcurrentLinkedQueue();
+
+    private final Object LOCK = new Object();
+
+    private volatile boolean running;
+
+    @PostConstruct
+    private void init() {
+        scheduledTaskService.start(300, this);
+    }
+
+    /**
+     * 生成缓存value
+     *
+     * @return
+     */
+    protected abstract AbstractFlushTask getValue();
+
+    /**
+     * 生成分区key
+     *
+     * @param bufferTask
+     * @return
+     */
+    protected abstract String getPartitionKey(B bufferTask);
+
+    /**
+     * 分区
+     *
+     * @param bufferTask
+     * @param flushTask
+     */
+    protected abstract void partition(B bufferTask, F flushTask);
+
+    /**
+     * 异步批处理
+     *
+     * @param flushTask
+     */
+    protected abstract void flush(F flushTask);
+
+    @Override
+    public void offer(AbstractBufferTask task) {
+        if (running) {
+            temp.offer((B) task);
+            return;
+        }
+        buffer.offer((B) task);
+    }
+
+    @Override
+    public void run() {
+        if (running) {
+            return;
+        }
+        synchronized (LOCK) {
+            if (running) {
+                return;
+            }
+            running = true;
+            flush(buffer);
+            running = false;
+            flush(temp);
+        }
+    }
+
+    private void flush(Queue<B> queue) {
+        if (!queue.isEmpty()) {
+            final Map<String, AbstractFlushTask> map = new LinkedHashMap<>();
+            while (!queue.isEmpty()) {
+                B poll = queue.poll();
+                String key = getPartitionKey(poll);
+                if (!map.containsKey(key)) {
+                    map.putIfAbsent(key, getValue());
+                }
+                partition(poll, (F) map.get(key));
+            }
+
+            map.forEach((key, flushTask) -> {
+                long now = Instant.now().toEpochMilli();
+                try {
+                    flush((F) flushTask);
+                } catch (Exception e) {
+                    logger.error("[{}]-flush异常{}", key);
+                }
+                logger.info("[{}]-flush{}条,耗时{}秒", key, flushTask.getFlushTaskSize(), (Instant.now().toEpochMilli() - now) / 1000);
+            });
+            map.clear();
+        }
+    }
+
+}

+ 10 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferTask.java

@@ -0,0 +1,10 @@
+package org.dbsyncer.parser.flush;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:57
+ */
+public abstract class AbstractBufferTask {
+
+}

+ 12 - 12
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -23,31 +23,31 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     private CacheService cacheService;
 
     @Override
-    public void flushFullData(String metaId, Result writer, String event, List<Map> data) {
-        flush(metaId, writer, event, data);
+    public void flushFullData(String metaId, Result result, String event, List<Map> dataList) {
+        flush(metaId, result, event, dataList);
     }
 
     @Override
-    public void flushIncrementData(String metaId, Result writer, String event, List<Map> data) {
-        flush(metaId, writer, event, data);
+    public void flushIncrementData(String metaId, Result result, String event, List<Map> dataList) {
+        flush(metaId, result, event, dataList);
     }
 
-    protected void flush(String metaId, Result writer, String event, List<Map> data) {
-        refreshTotal(metaId, writer, data);
+    protected void flush(String metaId, Result result, String event, List<Map> dataList) {
+        refreshTotal(metaId, result, dataList);
 
-        boolean success = 0 == writer.getFail().get();
+        boolean success = 0 == result.getFail().get();
         if (!success) {
-            data.clear();
-            data.addAll(writer.getFailData());
+            dataList.clear();
+            dataList.addAll(result.getFailData());
         }
-        flushService.asyncWrite(metaId, event, success, data, writer.getError().toString());
+        flushService.asyncWrite(metaId, event, success, dataList, result.getError().toString());
     }
 
-    protected void refreshTotal(String metaId, Result writer, List<Map> data){
+    protected void refreshTotal(String metaId, Result writer, List<Map> dataList){
         long fail = writer.getFail().get();
         Meta meta = getMeta(metaId);
         meta.getFail().getAndAdd(fail);
-        meta.getSuccess().getAndAdd(data.size() - fail);
+        meta.getSuccess().getAndAdd(dataList.size() - fail);
     }
 
     protected Meta getMeta(String metaId) {

+ 17 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushTask.java

@@ -0,0 +1,17 @@
+package org.dbsyncer.parser.flush;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 18:11
+ */
+public abstract class AbstractFlushTask {
+
+    /**
+     * 获取批处理数
+     *
+     * @return
+     */
+    public abstract int getFlushTaskSize();
+
+}

+ 12 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -0,0 +1,12 @@
+package org.dbsyncer.parser.flush;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 17:34
+ */
+public interface BufferActuator {
+
+    void offer(AbstractBufferTask task);
+
+}

+ 6 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushStrategy.java

@@ -18,20 +18,20 @@ public interface FlushStrategy {
      * 记录全量同步数据
      *
      * @param metaId
-     * @param writer
+     * @param result
      * @param event
-     * @param data
+     * @param dataList
      */
-    void flushFullData(String metaId, Result writer, String event, List<Map> data);
+    void flushFullData(String metaId, Result result, String event, List<Map> dataList);
 
     /**
      * 记录增量同步数据
      *
      * @param metaId
-     * @param writer
+     * @param result
      * @param event
-     * @param data
+     * @param dataList
      */
-    void flushIncrementData(String metaId, Result writer, String event, List<Map> data);
+    void flushIncrementData(String metaId, Result result, String event, List<Map> dataList);
 
 }

+ 4 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/DisableFullFlushStrategy.java

@@ -22,13 +22,13 @@ public final class DisableFullFlushStrategy extends AbstractFlushStrategy {
     private LogService logService;
 
     @Override
-    public void flushFullData(String metaId, Result writer, String event, List<Map> data) {
+    public void flushFullData(String metaId, Result result, String event, List<Map> dataList) {
         // 不记录全量数据,只统计成功失败总数
-        refreshTotal(metaId, writer, data);
+        refreshTotal(metaId, result, dataList);
 
-        if (0 < writer.getFail().get()) {
+        if (0 < result.getFail().get()) {
             LogType logType = LogType.TableGroupLog.FULL_FAILED;
-            logService.log(logType, "%s:%s", logType.getMessage(), writer.getError().toString());
+            logService.log(logType, "%s:%s", logType.getMessage(), result.getError().toString());
         }
     }
 

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -139,9 +139,9 @@ public class FlushServiceImpl implements FlushService, ScheduledTaskJob {
                     try {
                         storageService.addData(StorageEnum.DATA, metaId, list);
                     } catch (Exception e) {
-                        logger.error("[{}]-flush异常{}", metaId, list.size());
+                        logger.error("[{}]-flushData异常{}", metaId, list.size());
                     }
-                    logger.info("[{}]-flush{}条,耗时{}秒", metaId, list.size(), (Instant.now().toEpochMilli() - now) / 1000);
+                    logger.info("[{}]-flushData{}条,耗时{}秒", metaId, list.size(), (Instant.now().toEpochMilli() - now) / 1000);
                 });
             });
             task.clear();

+ 60 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -0,0 +1,60 @@
+package org.dbsyncer.parser.flush.impl;
+
+import org.dbsyncer.common.model.Result;
+import org.dbsyncer.parser.ParserFactory;
+import org.dbsyncer.parser.flush.AbstractBufferActuator;
+import org.dbsyncer.parser.flush.AbstractFlushTask;
+import org.dbsyncer.parser.flush.FlushStrategy;
+import org.dbsyncer.parser.flush.model.WriterBufferTask;
+import org.dbsyncer.parser.flush.model.WriterFlushTask;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Collections;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:50
+ */
+@Component
+public class WriterBufferActuator extends AbstractBufferActuator<WriterBufferTask, WriterFlushTask> {
+
+    @Autowired
+    private ParserFactory parserFactory;
+
+    @Autowired
+    private FlushStrategy flushStrategy;
+
+    private final static int BATCH_SIZE = 100;
+
+    @Override
+    protected AbstractFlushTask getValue() {
+        return new WriterFlushTask();
+    }
+
+    @Override
+    protected String getPartitionKey(WriterBufferTask bufferTask) {
+        return new StringBuilder(bufferTask.getTableGroupId()).append(bufferTask.getEvent()).toString();
+    }
+
+    @Override
+    protected void partition(WriterBufferTask bufferTask, WriterFlushTask flushTask) {
+        flushTask.getDataList().add(bufferTask.getRow());
+        if (flushTask.isMerged()) {
+            return;
+        }
+        flushTask.setMetaId(bufferTask.getMetaId());
+        flushTask.setEvent(bufferTask.getEvent());
+        flushTask.setConnectorMapper(bufferTask.getConnectorMapper());
+        flushTask.setFields(Collections.unmodifiableList(bufferTask.getFields()));
+        flushTask.setCommand(bufferTask.getCommand());
+        flushTask.setMerged(true);
+    }
+
+    @Override
+    protected void flush(WriterFlushTask flushTask) {
+        Result result = parserFactory.writeBatch(flushTask.getConnectorMapper(), flushTask.getCommand(), flushTask.getEvent(), flushTask.getFields(), flushTask.getDataList(), BATCH_SIZE);
+        flushStrategy.flushIncrementData(flushTask.getMetaId(), result, flushTask.getEvent(), flushTask.getDataList());
+    }
+}

+ 68 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterBufferTask.java

@@ -0,0 +1,68 @@
+package org.dbsyncer.parser.flush.model;
+
+import org.dbsyncer.connector.ConnectorMapper;
+import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.parser.flush.AbstractBufferTask;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:57
+ */
+public class WriterBufferTask extends AbstractBufferTask {
+
+    private String metaId;
+
+    private String tableGroupId;
+
+    private String event;
+
+    private ConnectorMapper connectorMapper;
+
+    private List<Field> fields;
+
+    private Map<String, String> command;
+
+    private Map row;
+
+    public WriterBufferTask(String metaId, String tableGroupId, String event, ConnectorMapper connectorMapper, List<Field> fields, Map<String, String> command, Map row) {
+        this.metaId = metaId;
+        this.tableGroupId = tableGroupId;
+        this.event = event;
+        this.connectorMapper = connectorMapper;
+        this.fields = fields;
+        this.command = command;
+        this.row = row;
+    }
+
+    public String getMetaId() {
+        return metaId;
+    }
+
+    public String getTableGroupId() {
+        return tableGroupId;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public ConnectorMapper getConnectorMapper() {
+        return connectorMapper;
+    }
+
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+
+    public Map getRow() {
+        return row;
+    }
+}

+ 93 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterFlushTask.java

@@ -0,0 +1,93 @@
+package org.dbsyncer.parser.flush.model;
+
+import org.dbsyncer.connector.ConnectorMapper;
+import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.parser.flush.AbstractFlushTask;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 18:11
+ */
+public class WriterFlushTask extends AbstractFlushTask {
+
+    private boolean isMerged;
+
+    private String metaId;
+
+    private String event;
+
+    private ConnectorMapper connectorMapper;
+
+    private List<Field> fields;
+
+    private Map<String, String> command;
+
+    private List<Map> dataList = new LinkedList<>();
+
+    public boolean isMerged() {
+        return isMerged;
+    }
+
+    public void setMerged(boolean merged) {
+        isMerged = merged;
+    }
+
+    public String getMetaId() {
+        return metaId;
+    }
+
+    public void setMetaId(String metaId) {
+        this.metaId = metaId;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public void setEvent(String event) {
+        this.event = event;
+    }
+
+    public ConnectorMapper getConnectorMapper() {
+        return connectorMapper;
+    }
+
+    public void setConnectorMapper(ConnectorMapper connectorMapper) {
+        this.connectorMapper = connectorMapper;
+    }
+
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    public void setFields(List<Field> fields) {
+        this.fields = fields;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+
+    public void setCommand(Map<String, String> command) {
+        this.command = command;
+    }
+
+    public List<Map> getDataList() {
+        return dataList;
+    }
+
+    public void setDataList(List<Map> dataList) {
+        this.dataList = dataList;
+    }
+
+    @Override
+    public int getFlushTaskSize() {
+        return dataList.size();
+    }
+
+}

+ 2 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/PickerUtil.java

@@ -26,6 +26,8 @@ public abstract class PickerUtil {
      */
     public static TableGroup mergeTableGroupConfig(Mapping mapping, TableGroup tableGroup) {
         TableGroup group = new TableGroup();
+        group.setId(tableGroup.getId());
+        group.setMappingId(mapping.getId());
         group.setFieldMapping(tableGroup.getFieldMapping());
         group.setSourceTable(tableGroup.getSourceTable());
         group.setTargetTable(tableGroup.getTargetTable());

+ 1 - 1
dbsyncer-web/src/main/resources/application.properties

@@ -4,7 +4,7 @@ server.port=18686
 #web
 dbsyncer.web.login.username=admin
 dbsyncer.web.login.password=0DPiKuNIrrVmD8IUCuw1hQxNqZc=
-dbsyncer.web.thread.pool.core.size=10
+dbsyncer.web.thread.pool.core.size=20
 dbsyncer.web.thread.pool.queue.capacity=5000
 server.servlet.session.timeout=1800
 server.servlet.context-path=/