Browse Source

!56 merge
Merge pull request !56 from AE86/V_1.0.0_Beta

AE86 3 years ago
parent
commit
2b895c890d
32 changed files with 810 additions and 344 deletions
  1. 4 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskService.java
  2. 21 1
      dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java
  3. 8 8
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java
  4. 3 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterBatchConfig.java
  5. 12 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterConfig.java
  6. 3 17
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterSingleConfig.java
  7. 21 16
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  8. 18 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleDataSource.java
  9. 2 15
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  10. 16 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java
  11. 75 75
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  12. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/config/ParserFlushStrategyConfiguration.java
  13. 130 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  14. 13 14
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java
  15. 14 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java
  16. 0 173
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushServiceImpl.java
  17. 7 7
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushStrategy.java
  18. 6 6
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/DisableFullFlushStrategy.java
  19. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/EnableFlushStrategy.java
  20. 48 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushBufferActuator.java
  21. 82 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  22. 65 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java
  23. 10 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractRequest.java
  24. 17 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractResponse.java
  25. 29 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/FlushRequest.java
  26. 37 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/FlushResponse.java
  27. 67 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterRequest.java
  28. 92 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterResponse.java
  29. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Mapping.java
  30. 2 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/PickerUtil.java
  31. 2 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java
  32. 1 1
      dbsyncer-web/src/main/resources/application.properties

+ 4 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskService.java

@@ -19,6 +19,10 @@ public interface ScheduledTaskService {
 
     void start(String key, long period, ScheduledTaskJob job);
 
+    void start(String cron, ScheduledTaskJob job);
+
+    void start(long period, ScheduledTaskJob job);
+
     void stop(String key);
 
 }

+ 21 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java

@@ -1,8 +1,10 @@
 package org.dbsyncer.common.scheduled;
 
 import org.dbsyncer.common.CommonException;
+import org.dbsyncer.common.util.UUIDUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 import org.springframework.scheduling.support.CronTrigger;
@@ -18,7 +20,7 @@ import java.util.concurrent.ScheduledFuture;
  * @Date 2020-05-24 22:06
  */
 @Component
-public class ScheduledTaskServiceImpl implements ScheduledTaskService {
+public class ScheduledTaskServiceImpl implements ScheduledTaskService, DisposableBean {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -32,20 +34,33 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService {
 
     @Override
     public void start(String key, String cron, ScheduledTaskJob job) {
+        logger.info("{}-[{}], Started task [{}]", key, cron, job.getClass().getName());
         apply(key, () -> taskScheduler.schedule(job, (trigger) -> new CronTrigger(cron).nextExecutionTime(trigger)));
     }
 
     @Override
     public void start(String key, long period, ScheduledTaskJob job) {
+        logger.info("[period={}], Started task [{}]", period, key);
         apply(key, () -> taskScheduler.scheduleAtFixedRate(job, period));
     }
 
+    @Override
+    public void start(String cron, ScheduledTaskJob job) {
+        start(UUIDUtil.getUUID(), cron, job);
+    }
+
+    @Override
+    public void start(long period, ScheduledTaskJob job) {
+        start(UUIDUtil.getUUID(), period, job);
+    }
+
     @Override
     public void stop(String key) {
         ScheduledFuture job = map.get(key);
         if (null != job) {
             job.cancel(true);
             map.remove(key);
+            logger.info("Stopped task [{}]", key);
         }
     }
 
@@ -59,6 +74,11 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService {
         map.putIfAbsent(key, scheduledFutureMapper.apply());
     }
 
+    @Override
+    public void destroy() {
+        map.keySet().forEach(this::stop);
+    }
+
     private interface ScheduledFutureMapper {
         /**
          * 返回定时任务

+ 8 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -121,24 +121,24 @@ public class ConnectorFactory implements DisposableBean {
         return map;
     }
 
-    public long getCount(ConnectorMapper connectionMapper, Map<String, String> command) {
-        return getConnector(connectionMapper).getCount(connectionMapper, command);
+    public long getCount(ConnectorMapper connectorMapper, Map<String, String> command) {
+        return getConnector(connectorMapper).getCount(connectorMapper, command);
     }
 
-    public Result reader(ConnectorMapper connectionMapper, ReaderConfig config) {
-        Result result = getConnector(connectionMapper).reader(connectionMapper, config);
+    public Result reader(ConnectorMapper connectorMapper, ReaderConfig config) {
+        Result result = getConnector(connectorMapper).reader(connectorMapper, config);
         Assert.notNull(result, "Connector reader result can not null");
         return result;
     }
 
-    public Result writer(ConnectorMapper connectionMapper, WriterBatchConfig config) {
-        Result result = getConnector(connectionMapper).writer(connectionMapper, config);
+    public Result writer(ConnectorMapper connectorMapper, WriterBatchConfig config) {
+        Result result = getConnector(connectorMapper).writer(connectorMapper, config);
         Assert.notNull(result, "Connector writer batch result can not null");
         return result;
     }
 
-    public Result writer(ConnectorMapper connectionMapper, WriterSingleConfig config) {
-        Result result = getConnector(connectionMapper).writer(connectionMapper, config);
+    public Result writer(ConnectorMapper connectorMapper, WriterSingleConfig config) {
+        Result result = getConnector(connectorMapper).writer(connectorMapper, config);
         Assert.notNull(result, "Connector writer single result can not null");
         return result;
     }

+ 3 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterBatchConfig.java

@@ -10,10 +10,11 @@ public class WriterBatchConfig extends WriterConfig {
      */
     private List<Map> data;
 
-    public WriterBatchConfig(Map<String, String> command, List<Field> fields, List<Map> data) {
+    public WriterBatchConfig(String event, Map<String, String> command, List<Field> fields, List<Map> data) {
+        setEvent(event);
         setCommand(command);
         setFields(fields);
-        setData(data);
+        this.data = data;
     }
 
     public List<Map> getData() {

+ 12 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterConfig.java

@@ -5,6 +5,10 @@ import java.util.Map;
 
 public class WriterConfig {
 
+    /**
+     * 事件
+     */
+    private String event;
     /**
      * 执行命令
      */
@@ -14,6 +18,14 @@ public class WriterConfig {
      */
     private List<Field> fields;
 
+    public String getEvent() {
+        return event;
+    }
+
+    public void setEvent(String event) {
+        this.event = event;
+    }
+
     public Map<String, String> getCommand() {
         return command;
     }

+ 3 - 17
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterSingleConfig.java

@@ -10,11 +10,6 @@ public class WriterSingleConfig extends WriterConfig {
      */
     private Map<String, Object> data;
 
-    /**
-     * 事件
-     */
-    private String event;
-
     /**
      * 表名
      */
@@ -31,11 +26,11 @@ public class WriterSingleConfig extends WriterConfig {
     private boolean forceUpdate;
 
     public WriterSingleConfig(List<Field> fields, Map<String, String> command, String event, Map<String, Object> data, String table, boolean forceUpdate) {
+        setEvent(event);
         setCommand(command);
         setFields(fields);
-        setData(data);
-        setEvent(event);
-        setTable(table);
+        this.data = data;
+        this.table = table;
         this.forceUpdate = forceUpdate;
     }
 
@@ -48,15 +43,6 @@ public class WriterSingleConfig extends WriterConfig {
         return this;
     }
 
-    public String getEvent() {
-        return event;
-    }
-
-    public WriterSingleConfig setEvent(String event) {
-        this.event = event;
-        return this;
-    }
-
     public String getTable() {
         return table;
     }

+ 21 - 16
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -103,13 +103,13 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
     @Override
     public Result writer(DatabaseConnectorMapper connectorMapper, WriterBatchConfig config) {
-        List<Field> fields = config.getFields();
+        String event = config.getEvent();
         List<Map> data = config.getData();
 
-        // 1、获取select SQL
-        String insertSql = config.getCommand().get(SqlBuilderEnum.INSERT.getName());
-        Assert.hasText(insertSql, "插入语句不能为空.");
-        if (CollectionUtils.isEmpty(fields)) {
+        // 1、获取SQL
+        String executeSql = config.getCommand().get(event);
+        Assert.hasText(executeSql, "执行SQL语句不能为空.");
+        if (CollectionUtils.isEmpty(config.getFields())) {
             logger.error("writer fields can not be empty.");
             throw new ConnectorException("writer fields can not be empty.");
         }
@@ -117,14 +117,23 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             logger.error("writer data can not be empty.");
             throw new ConnectorException("writer data can not be empty.");
         }
+        List<Field> fields = new ArrayList<>(config.getFields());
+        Field pkField = getPrimaryKeyField(config.getFields());
+        // Update / Delete
+        if (!isInsert(event)) {
+            if (isDelete(event)) {
+                fields.clear();
+            }
+            fields.add(pkField);
+        }
+
         final int size = data.size();
         final int fSize = fields.size();
-
         Result result = new Result();
         try {
             // 2、设置参数
             connectorMapper.execute(databaseTemplate -> {
-                databaseTemplate.batchUpdate(insertSql, new BatchPreparedStatementSetter() {
+                databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
                     @Override
                     public void setValues(PreparedStatement preparedStatement, int i) {
                         batchRowsSetter(databaseTemplate.getConnection(), preparedStatement, fields, fSize, data.get(i));
@@ -161,7 +170,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
         Field pkField = getPrimaryKeyField(fields);
         // Update / Delete
-        if (isUpdate(event) || isDelete(event)) {
+        if (!isInsert(event)) {
             if (isDelete(event)) {
                 fields.clear();
             }
@@ -174,13 +183,9 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         try {
             // 2、设置参数
             execute = connectorMapper.execute(databaseTemplate ->
-                    databaseTemplate.update(sql, (ps) -> {
-                        Field f = null;
-                        for (int i = 0; i < size; i++) {
-                            f = fields.get(i);
-                            SetterEnum.getSetter(f.getType()).set(databaseTemplate.getConnection(), ps, i + 1, f.getType(), data.get(f.getName()));
-                        }
-                    })
+                    databaseTemplate.update(sql, (ps) ->
+                            batchRowsSetter(databaseTemplate.getConnection(), ps, fields, size, data)
+                    )
             );
         } catch (Exception e) {
             // 记录错误数据
@@ -572,7 +577,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
                 }
             }
         }
-        if(!TableTypeEnum.isView(table.getType())){
+        if (!TableTypeEnum.isView(table.getType())) {
             throw new ConnectorException("Table primary key can not be empty.");
         }
         return "";

+ 18 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleDataSource.java

@@ -28,12 +28,23 @@ public class SimpleDataSource implements DataSource, AutoCloseable {
 
     @Override
     public Connection getConnection() throws SQLException {
-        synchronized (pool) {
+        SimpleConnection poll = null;
+        int i = 3;
+        do {
             if (pool.isEmpty()) {
-                pool.offer(new SimpleConnection(this, DatabaseUtil.getConnection(url, username, password)));
+                pool.offer(createConnection());
             }
-            return pool.poll();
+            poll = pool.poll();
+            if (null != poll) {
+                break;
+            }
+            i--;
+        } while (i > 1);
+
+        if (null == poll) {
+            poll = createConnection();
         }
+        return poll;
     }
 
     @Override
@@ -81,6 +92,10 @@ public class SimpleDataSource implements DataSource, AutoCloseable {
         pool.forEach(c -> c.closeQuietly());
     }
 
+    private SimpleConnection createConnection() throws SQLException {
+        return new SimpleConnection(this, DatabaseUtil.getConnection(url, username, password));
+    }
+
     public BlockingQueue<SimpleConnection> getPool() {
         return pool;
     }

+ 2 - 15
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -6,7 +6,6 @@ import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.Field;
@@ -31,7 +30,6 @@ import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
@@ -53,7 +51,7 @@ import java.util.stream.Collectors;
  * @date 2020/04/26 15:28
  */
 @Component
-public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob, DisposableBean {
+public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -78,16 +76,11 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
     @Autowired
     private Executor taskExecutor;
 
-    private String key;
-
     private Map<String, Extractor> map = new ConcurrentHashMap<>();
 
     @PostConstruct
     private void init() {
-        key = UUIDUtil.getUUID();
-        String cron = "*/10 * * * * ?";
-        scheduledTaskService.start(key, cron, this);
-        logger.info("[{}], Started scheduled task", cron);
+        scheduledTaskService.start("*/10 * * * * ?", this);
     }
 
     @Override
@@ -137,12 +130,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         map.forEach((k, v) -> v.flushEvent());
     }
 
-    @Override
-    public void destroy() {
-        scheduledTaskService.stop(key);
-        logger.info("Stopped scheduled task.");
-    }
-
     private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta)
             throws InstantiationException, IllegalAccessException {
         ConnectorConfig connectorConfig = connector.getConfig();

+ 16 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -1,6 +1,8 @@
 package org.dbsyncer.parser;
 
 import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.model.Result;
+import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.parser.model.Task;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
@@ -161,4 +163,18 @@ public interface Parser {
      * @param rowChangedEvent
      */
     void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent rowChangedEvent);
+
+    /**
+     * 批执行
+     *
+     * @param connectorMapper
+     * @param command
+     * @param event
+     * @param fields
+     * @param dataList
+     * @param batchSize
+     * @return
+     */
+    Result writeBatch(ConnectorMapper connectorMapper, Map<String, String> command, String event, List<Field> fields, List<Map> dataList, int batchSize);
+
 }

+ 75 - 75
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -1,10 +1,8 @@
 package org.dbsyncer.parser;
 
 import org.dbsyncer.cache.CacheService;
-import org.dbsyncer.parser.event.FullRefreshEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Result;
-import org.dbsyncer.parser.model.Task;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
@@ -18,10 +16,13 @@ import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.listener.enums.QuartzFilterEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.enums.ParserEnum;
+import org.dbsyncer.parser.event.FullRefreshEvent;
+import org.dbsyncer.parser.flush.BufferActuator;
+import org.dbsyncer.parser.flush.FlushStrategy;
+import org.dbsyncer.parser.flush.model.WriterRequest;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.*;
-import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
@@ -39,8 +40,10 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import java.time.Instant;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 
@@ -76,6 +79,9 @@ public class ParserFactory implements Parser {
     @Autowired
     private ApplicationContext applicationContext;
 
+    @Autowired
+    private BufferActuator writerBufferActuator;
+
     @Override
     public ConnectorMapper connect(ConnectorConfig config) {
         return connectorFactory.connect(config);
@@ -265,8 +271,8 @@ public class ParserFactory implements Parser {
         params.putIfAbsent(ParserEnum.PAGE_INDEX.getCode(), ParserEnum.PAGE_INDEX.getDefaultValue());
         int pageSize = mapping.getReadNum();
         int batchSize = mapping.getBatchNum();
-        ConnectorMapper sConnectionMapper = connectorFactory.connect(sConfig);
-        ConnectorMapper tConnectionMapper = connectorFactory.connect(tConfig);
+        ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
+        ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
 
         for (; ; ) {
             if (!task.isRunning()) {
@@ -276,7 +282,7 @@ public class ParserFactory implements Parser {
 
             // 1、获取数据源数据
             int pageIndex = Integer.parseInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
-            Result reader = connectorFactory.reader(sConnectionMapper, new ReaderConfig(command, new ArrayList<>(), pageIndex, pageSize));
+            Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), pageIndex, pageSize));
             List<Map> data = reader.getData();
             if (CollectionUtils.isEmpty(data)) {
                 params.clear();
@@ -294,7 +300,7 @@ public class ParserFactory implements Parser {
             pluginFactory.convert(group.getPlugin(), data, target);
 
             // 5、写入目标源
-            Result writer = writeBatch(tConnectionMapper, command, picker.getTargetFields(), target, batchSize);
+            Result writer = writeBatch(tConnectorMapper, command, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
 
             // 6、更新结果
             flush(task, writer, target);
@@ -323,11 +329,67 @@ public class ParserFactory implements Parser {
         // 3、插件转换
         pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
 
-        // 4、写入目标源
-        Result writer = connectorFactory.writer(tConnectorMapper, new WriterSingleConfig(picker.getTargetFields(), tableGroup.getCommand(), event, target, rowChangedEvent.getTableName(), rowChangedEvent.isForceUpdate()));
+        // 4、写入缓冲执行器
+        writerBufferActuator.offer(new WriterRequest(metaId, tableGroup.getId(), event, tConnectorMapper, picker.getTargetFields(), tableGroup.getCommand(), target));
+    }
+
+    /**
+     * 批量写入
+     *
+     * @param connectorMapper
+     * @param command
+     * @param fields
+     * @param dataList
+     * @param batchSize
+     * @return
+     */
+    @Override
+    public Result writeBatch(ConnectorMapper connectorMapper, Map<String, String> command, String event, List<Field> fields, List<Map> dataList, int batchSize) {
+        // 总数
+        int total = dataList.size();
+        // 单次任务
+        if (total <= batchSize) {
+            return connectorFactory.writer(connectorMapper, new WriterBatchConfig(event, command, fields, dataList));
+        }
+
+        // 批量任务, 拆分
+        int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
 
-        // 5、更新结果
-        flushStrategy.flushIncrementData(metaId, writer, event, picker.getTargetMapList());
+        final Result result = new Result();
+        final CountDownLatch latch = new CountDownLatch(taskSize);
+        int fromIndex = 0;
+        int toIndex = batchSize;
+        for (int i = 0; i < taskSize; i++) {
+            final List<Map> data;
+            if (toIndex > total) {
+                toIndex = fromIndex + (total % batchSize);
+                data = dataList.subList(fromIndex, toIndex);
+            } else {
+                data = dataList.subList(fromIndex, toIndex);
+                fromIndex += batchSize;
+                toIndex += batchSize;
+            }
+
+            taskExecutor.execute(() -> {
+                try {
+                    Result w = connectorFactory.writer(connectorMapper, new WriterBatchConfig(event, command, fields, data));
+                    // CAS
+                    result.getFailData().addAll(w.getFailData());
+                    result.getFail().getAndAdd(w.getFail().get());
+                    result.getError().append(w.getError());
+                } catch (Exception e) {
+                    result.getError().append(e.getMessage()).append(System.lineSeparator());
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        }
+        return result;
     }
 
     /**
@@ -384,66 +446,4 @@ public class ParserFactory implements Parser {
         return connector.getConfig();
     }
 
-    /**
-     * 批量写入
-     *
-     * @param connectorMapper
-     * @param command
-     * @param fields
-     * @param target
-     * @param batchSize
-     * @return
-     */
-    private Result writeBatch(ConnectorMapper connectorMapper, Map<String, String> command, List<Field> fields, List<Map> target, int batchSize) {
-        // 总数
-        int total = target.size();
-        // 单次任务
-        if (total <= batchSize) {
-            return connectorFactory.writer(connectorMapper, new WriterBatchConfig(command, fields, target));
-        }
-
-        // 批量任务, 拆分
-        int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
-
-        // 转换为消息队列,并发写入
-        Queue<Map> queue = new ConcurrentLinkedQueue<>(target);
-
-        final Result result = new Result();
-        final CountDownLatch latch = new CountDownLatch(taskSize);
-        for (int i = 0; i < taskSize; i++) {
-            taskExecutor.execute(() -> {
-                try {
-                    Result w = parallelTask(batchSize, queue, connectorMapper, command, fields);
-                    // CAS
-                    result.getFailData().addAll(w.getFailData());
-                    result.getFail().getAndAdd(w.getFail().get());
-                    result.getError().append(w.getError());
-                } catch (Exception e) {
-                    result.getError().append(e.getMessage()).append(System.lineSeparator());
-                } finally {
-                    latch.countDown();
-                }
-            });
-        }
-        try {
-            latch.await();
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage());
-        }
-        return result;
-    }
-
-    private Result parallelTask(int batchSize, Queue<Map> queue, ConnectorMapper connectorMapper, Map<String, String> command,
-                                List<Field> fields) {
-        List<Map> data = new ArrayList<>();
-        for (int j = 0; j < batchSize; j++) {
-            Map poll = queue.poll();
-            if (null == poll) {
-                break;
-            }
-            data.add(poll);
-        }
-        return connectorFactory.writer(connectorMapper, new WriterBatchConfig(command, fields, data));
-    }
-
 }

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/config/ParserFlushStrategyConfiguration.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.parser.config;
 
-import org.dbsyncer.parser.strategy.FlushStrategy;
-import org.dbsyncer.parser.strategy.impl.DisableFullFlushStrategy;
+import org.dbsyncer.parser.flush.FlushStrategy;
+import org.dbsyncer.parser.flush.impl.DisableFullFlushStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;

+ 130 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -0,0 +1,130 @@
+package org.dbsyncer.parser.flush;
+
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.parser.flush.model.AbstractRequest;
+import org.dbsyncer.parser.flush.model.AbstractResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.annotation.PostConstruct;
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 17:36
+ */
+public abstract class AbstractBufferActuator<Request, Response> implements BufferActuator, ScheduledTaskJob {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private ScheduledTaskService scheduledTaskService;
+
+    private Queue<Request> buffer = new ConcurrentLinkedQueue();
+
+    private Queue<Request> temp = new ConcurrentLinkedQueue();
+
+    private final Object LOCK = new Object();
+
+    private volatile boolean running;
+
+    @PostConstruct
+    private void init() {
+        scheduledTaskService.start(getPeriod(), this);
+    }
+
+    /**
+     * 获取定时间隔(毫秒)
+     *
+     * @return
+     */
+    protected abstract long getPeriod();
+
+    /**
+     * 生成缓存value
+     *
+     * @return
+     */
+    protected abstract AbstractResponse getValue();
+
+    /**
+     * 生成分区key
+     *
+     * @param request
+     * @return
+     */
+    protected abstract String getPartitionKey(Request request);
+
+    /**
+     * 分区
+     *
+     * @param request
+     * @param response
+     */
+    protected abstract void partition(Request request, Response response);
+
+    /**
+     * 批处理
+     *
+     * @param response
+     */
+    protected abstract void pull(Response response);
+
+    @Override
+    public void offer(AbstractRequest request) {
+        if (running) {
+            temp.offer((Request) request);
+            return;
+        }
+        buffer.offer((Request) request);
+    }
+
+    @Override
+    public void run() {
+        if (running) {
+            return;
+        }
+        synchronized (LOCK) {
+            if (running) {
+                return;
+            }
+            running = true;
+            flush(buffer);
+            running = false;
+            flush(temp);
+        }
+    }
+
+    private void flush(Queue<Request> queue) {
+        if (!queue.isEmpty()) {
+            final Map<String, AbstractResponse> map = new LinkedHashMap<>();
+            while (!queue.isEmpty()) {
+                Request poll = queue.poll();
+                String key = getPartitionKey(poll);
+                if (!map.containsKey(key)) {
+                    map.putIfAbsent(key, getValue());
+                }
+                partition(poll, (Response) map.get(key));
+            }
+
+            map.forEach((key, flushTask) -> {
+                long now = Instant.now().toEpochMilli();
+                try {
+                    pull((Response) flushTask);
+                } catch (Exception e) {
+                    logger.error("[{}]-flush异常{}", key);
+                }
+                logger.info("[{}]-flush{}条,耗时{}秒", key, flushTask.getTaskSize(), (Instant.now().toEpochMilli() - now) / 1000);
+            });
+            map.clear();
+        }
+    }
+
+}

+ 13 - 14
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/AbstractFlushStrategy.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -1,8 +1,7 @@
-package org.dbsyncer.parser.strategy;
+package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.model.Result;
-import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.model.Meta;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.Assert;
@@ -24,31 +23,31 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     private CacheService cacheService;
 
     @Override
-    public void flushFullData(String metaId, Result writer, String event, List<Map> data) {
-        flush(metaId, writer, event, data);
+    public void flushFullData(String metaId, Result result, String event, List<Map> dataList) {
+        flush(metaId, result, event, dataList);
     }
 
     @Override
-    public void flushIncrementData(String metaId, Result writer, String event, List<Map> data) {
-        flush(metaId, writer, event, data);
+    public void flushIncrementData(String metaId, Result result, String event, List<Map> dataList) {
+        flush(metaId, result, event, dataList);
     }
 
-    protected void flush(String metaId, Result writer, String event, List<Map> data) {
-        refreshTotal(metaId, writer, data);
+    protected void flush(String metaId, Result result, String event, List<Map> dataList) {
+        refreshTotal(metaId, result, dataList);
 
-        boolean success = 0 == writer.getFail().get();
+        boolean success = 0 == result.getFail().get();
         if (!success) {
-            data.clear();
-            data.addAll(writer.getFailData());
+            dataList.clear();
+            dataList.addAll(result.getFailData());
         }
-        flushService.asyncWrite(metaId, event, success, data, writer.getError().toString());
+        flushService.asyncWrite(metaId, event, success, dataList, result.getError().toString());
     }
 
-    protected void refreshTotal(String metaId, Result writer, List<Map> data){
+    protected void refreshTotal(String metaId, Result writer, List<Map> dataList){
         long fail = writer.getFail().get();
         Meta meta = getMeta(metaId);
         meta.getFail().getAndAdd(fail);
-        meta.getSuccess().getAndAdd(data.size() - fail);
+        meta.getSuccess().getAndAdd(dataList.size() - fail);
     }
 
     protected Meta getMeta(String metaId) {

+ 14 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -0,0 +1,14 @@
+package org.dbsyncer.parser.flush;
+
+import org.dbsyncer.parser.flush.model.AbstractRequest;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 17:34
+ */
+public interface BufferActuator {
+
+    void offer(AbstractRequest request);
+
+}

+ 0 - 173
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushServiceImpl.java

@@ -1,173 +0,0 @@
-package org.dbsyncer.parser.flush;
-
-import com.alibaba.fastjson.JSONException;
-import org.dbsyncer.common.scheduled.ScheduledTaskJob;
-import org.dbsyncer.common.scheduled.ScheduledTaskService;
-import org.dbsyncer.common.util.JsonUtil;
-import org.dbsyncer.common.util.UUIDUtil;
-import org.dbsyncer.storage.SnowflakeIdWorker;
-import org.dbsyncer.storage.StorageService;
-import org.dbsyncer.storage.constant.ConfigConstant;
-import org.dbsyncer.storage.enums.StorageDataStatusEnum;
-import org.dbsyncer.storage.enums.StorageEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-import java.time.Instant;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-/**
- * 持久化
- * <p>全量或增量数据</p>
- * <p>系统日志</p>
- *
- * @author AE86
- * @version 1.0.0
- * @date 2020/05/19 18:38
- */
-@Component
-public class FlushServiceImpl implements FlushService, ScheduledTaskJob, DisposableBean {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    @Autowired
-    private StorageService storageService;
-
-    @Autowired
-    private SnowflakeIdWorker snowflakeIdWorker;
-
-    @Autowired
-    private ScheduledTaskService scheduledTaskService;
-
-    @Autowired
-    private Executor taskExecutor;
-
-    private Queue<Task> buffer = new ConcurrentLinkedQueue();
-
-    private Queue<Task> temp = new ConcurrentLinkedQueue();
-
-    private final Object LOCK = new Object();
-
-    private volatile boolean running;
-
-    private String key;
-
-    @PostConstruct
-    private void init() {
-        key = UUIDUtil.getUUID();
-        String cron = "*/3 * * * * ?";
-        scheduledTaskService.start(key, cron, this);
-        logger.info("[{}], Started scheduled task", cron);
-    }
-
-    @Override
-    public void asyncWrite(String type, String error) {
-        Map<String, Object> params = new HashMap();
-        params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
-        params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
-        params.put(ConfigConstant.CONFIG_MODEL_JSON, error);
-        params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
-        storageService.addLog(StorageEnum.LOG, params);
-    }
-
-    @Override
-    public void asyncWrite(String metaId, String event, boolean success, List<Map> data, String error) {
-        long now = Instant.now().toEpochMilli();
-        AtomicBoolean added = new AtomicBoolean(false);
-        List<Map> list = data.parallelStream().map(r -> {
-            Map<String, Object> params = new HashMap();
-            params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
-            params.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
-            params.put(ConfigConstant.DATA_EVENT, event);
-            params.put(ConfigConstant.DATA_ERROR, added.get() ? "" : error);
-            try {
-                params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(r));
-            } catch (JSONException e) {
-                logger.warn("可能存在Blob或inputStream大文件类型, 无法序列化:{}", r);
-                params.put(ConfigConstant.CONFIG_MODEL_JSON, r.toString());
-            }
-            params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
-            added.set(true);
-            return params;
-        }).collect(Collectors.toList());
-
-        if (running) {
-            temp.offer(new Task(metaId, list));
-            return;
-        }
-
-        buffer.offer(new Task(metaId, list));
-    }
-
-    @Override
-    public void run() {
-        if (running) {
-            return;
-        }
-        synchronized (LOCK) {
-            if (running) {
-                return;
-            }
-            running = true;
-            flush(buffer);
-            running = false;
-            try {
-                TimeUnit.MILLISECONDS.sleep(10);
-            } catch (InterruptedException e) {
-                logger.error(e.getMessage());
-            }
-            flush(temp);
-        }
-    }
-
-    private void flush(Queue<Task> buffer) {
-        if (!buffer.isEmpty()) {
-            final Map<String, List<Map>> task = new LinkedHashMap<>();
-            while (!buffer.isEmpty()) {
-                Task t = buffer.poll();
-                if (!task.containsKey(t.metaId)) {
-                    task.putIfAbsent(t.metaId, new LinkedList<>());
-                }
-                task.get(t.metaId).addAll(t.list);
-            }
-            task.forEach((metaId, list) -> {
-                taskExecutor.execute(() -> {
-                    long now = Instant.now().toEpochMilli();
-                    try {
-                        storageService.addData(StorageEnum.DATA, metaId, list);
-                    } catch (Exception e) {
-                        logger.error("[{}]-flush异常{}", metaId, list.size());
-                    }
-                    logger.info("[{}]-flush{}条,耗时{}秒", metaId, list.size(), (Instant.now().toEpochMilli() - now) / 1000);
-                });
-            });
-            task.clear();
-        }
-    }
-
-    @Override
-    public void destroy() {
-        scheduledTaskService.stop(key);
-        logger.info("Stopped scheduled task.");
-    }
-
-    final class Task {
-        String metaId;
-        List<Map> list;
-
-        public Task(String metaId, List<Map> list) {
-            this.metaId = metaId;
-            this.list = list;
-        }
-    }
-
-}

+ 7 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/FlushStrategy.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushStrategy.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.parser.strategy;
+package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.common.model.Result;
 
@@ -18,20 +18,20 @@ public interface FlushStrategy {
      * 记录全量同步数据
      *
      * @param metaId
-     * @param writer
+     * @param result
      * @param event
-     * @param data
+     * @param dataList
      */
-    void flushFullData(String metaId, Result writer, String event, List<Map> data);
+    void flushFullData(String metaId, Result result, String event, List<Map> dataList);
 
     /**
      * 记录增量同步数据
      *
      * @param metaId
-     * @param writer
+     * @param result
      * @param event
-     * @param data
+     * @param dataList
      */
-    void flushIncrementData(String metaId, Result writer, String event, List<Map> data);
+    void flushIncrementData(String metaId, Result result, String event, List<Map> dataList);
 
 }

+ 6 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableFullFlushStrategy.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/DisableFullFlushStrategy.java

@@ -1,9 +1,9 @@
-package org.dbsyncer.parser.strategy.impl;
+package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.parser.flush.AbstractFlushStrategy;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
-import org.dbsyncer.parser.strategy.AbstractFlushStrategy;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.List;
@@ -22,13 +22,13 @@ public final class DisableFullFlushStrategy extends AbstractFlushStrategy {
     private LogService logService;
 
     @Override
-    public void flushFullData(String metaId, Result writer, String event, List<Map> data) {
+    public void flushFullData(String metaId, Result result, String event, List<Map> dataList) {
         // 不记录全量数据,只统计成功失败总数
-        refreshTotal(metaId, writer, data);
+        refreshTotal(metaId, result, dataList);
 
-        if (0 < writer.getFail().get()) {
+        if (0 < result.getFail().get()) {
             LogType logType = LogType.TableGroupLog.FULL_FAILED;
-            logService.log(logType, "%s:%s", logType.getMessage(), writer.getError().toString());
+            logService.log(logType, "%s:%s", logType.getMessage(), result.getError().toString());
         }
     }
 

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableFlushStrategy.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/EnableFlushStrategy.java

@@ -1,6 +1,6 @@
-package org.dbsyncer.parser.strategy.impl;
+package org.dbsyncer.parser.flush.impl;
 
-import org.dbsyncer.parser.strategy.AbstractFlushStrategy;
+import org.dbsyncer.parser.flush.AbstractFlushStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 

+ 48 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushBufferActuator.java

@@ -0,0 +1,48 @@
+package org.dbsyncer.parser.flush.impl;
+
+import org.dbsyncer.parser.flush.AbstractBufferActuator;
+import org.dbsyncer.parser.flush.model.AbstractResponse;
+import org.dbsyncer.parser.flush.model.FlushRequest;
+import org.dbsyncer.parser.flush.model.FlushResponse;
+import org.dbsyncer.storage.StorageService;
+import org.dbsyncer.storage.enums.StorageEnum;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:50
+ */
+@Component
+public class FlushBufferActuator extends AbstractBufferActuator<FlushRequest, FlushResponse> {
+
+    @Autowired
+    private StorageService storageService;
+
+    @Override
+    protected long getPeriod() {
+        return 3000;
+    }
+
+    @Override
+    protected AbstractResponse getValue() {
+        return new FlushResponse();
+    }
+
+    @Override
+    protected String getPartitionKey(FlushRequest bufferTask) {
+        return bufferTask.getMetaId();
+    }
+
+    @Override
+    protected void partition(FlushRequest request, FlushResponse response) {
+        response.setMetaId(request.getMetaId());
+        response.getDataList().addAll(request.getList());
+    }
+
+    @Override
+    protected void pull(FlushResponse response) {
+        storageService.addData(StorageEnum.DATA, response.getMetaId(), response.getDataList());
+    }
+}

+ 82 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -0,0 +1,82 @@
+package org.dbsyncer.parser.flush.impl;
+
+import com.alibaba.fastjson.JSONException;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.parser.flush.BufferActuator;
+import org.dbsyncer.parser.flush.FlushService;
+import org.dbsyncer.parser.flush.model.FlushRequest;
+import org.dbsyncer.storage.SnowflakeIdWorker;
+import org.dbsyncer.storage.StorageService;
+import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.StorageDataStatusEnum;
+import org.dbsyncer.storage.enums.StorageEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/**
+ * 持久化
+ * <p>全量或增量数据</p>
+ * <p>系统日志</p>
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/05/19 18:38
+ */
+@Component
+public class FlushServiceImpl implements FlushService {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private StorageService storageService;
+
+    @Autowired
+    private SnowflakeIdWorker snowflakeIdWorker;
+
+    @Autowired
+    private BufferActuator flushBufferActuator;
+
+    @Override
+    public void asyncWrite(String type, String error) {
+        Map<String, Object> params = new HashMap();
+        params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
+        params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
+        params.put(ConfigConstant.CONFIG_MODEL_JSON, error);
+        params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
+        storageService.addLog(StorageEnum.LOG, params);
+    }
+
+    @Override
+    public void asyncWrite(String metaId, String event, boolean success, List<Map> data, String error) {
+        long now = Instant.now().toEpochMilli();
+        AtomicBoolean added = new AtomicBoolean(false);
+        List<Map> list = data.parallelStream().map(r -> {
+            Map<String, Object> params = new HashMap();
+            params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
+            params.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
+            params.put(ConfigConstant.DATA_EVENT, event);
+            params.put(ConfigConstant.DATA_ERROR, added.get() ? "" : error);
+            try {
+                params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(r));
+            } catch (JSONException e) {
+                logger.warn("可能存在Blob或inputStream大文件类型, 无法序列化:{}", r);
+                params.put(ConfigConstant.CONFIG_MODEL_JSON, r.toString());
+            }
+            params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
+            added.set(true);
+            return params;
+        }).collect(Collectors.toList());
+
+        flushBufferActuator.offer(new FlushRequest(metaId, list));
+    }
+
+}

+ 65 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -0,0 +1,65 @@
+package org.dbsyncer.parser.flush.impl;
+
+import org.dbsyncer.common.model.Result;
+import org.dbsyncer.parser.ParserFactory;
+import org.dbsyncer.parser.flush.AbstractBufferActuator;
+import org.dbsyncer.parser.flush.model.AbstractResponse;
+import org.dbsyncer.parser.flush.FlushStrategy;
+import org.dbsyncer.parser.flush.model.WriterRequest;
+import org.dbsyncer.parser.flush.model.WriterResponse;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Collections;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:50
+ */
+@Component
+public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest, WriterResponse> {
+
+    @Autowired
+    private ParserFactory parserFactory;
+
+    @Autowired
+    private FlushStrategy flushStrategy;
+
+    private final static int BATCH_SIZE = 100;
+
+    @Override
+    protected long getPeriod() {
+        return 300;
+    }
+
+    @Override
+    protected AbstractResponse getValue() {
+        return new WriterResponse();
+    }
+
+    @Override
+    protected String getPartitionKey(WriterRequest request) {
+        return new StringBuilder(request.getTableGroupId()).append("-").append(request.getEvent()).toString();
+    }
+
+    @Override
+    protected void partition(WriterRequest request, WriterResponse response) {
+        response.getDataList().add(request.getRow());
+        if (response.isMerged()) {
+            return;
+        }
+        response.setMetaId(request.getMetaId());
+        response.setEvent(request.getEvent());
+        response.setConnectorMapper(request.getConnectorMapper());
+        response.setFields(Collections.unmodifiableList(request.getFields()));
+        response.setCommand(request.getCommand());
+        response.setMerged(true);
+    }
+
+    @Override
+    protected void pull(WriterResponse response) {
+        Result result = parserFactory.writeBatch(response.getConnectorMapper(), response.getCommand(), response.getEvent(), response.getFields(), response.getDataList(), BATCH_SIZE);
+        flushStrategy.flushIncrementData(response.getMetaId(), result, response.getEvent(), response.getDataList());
+    }
+}

+ 10 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractRequest.java

@@ -0,0 +1,10 @@
+package org.dbsyncer.parser.flush.model;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:57
+ */
+public abstract class AbstractRequest {
+
+}

+ 17 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractResponse.java

@@ -0,0 +1,17 @@
+package org.dbsyncer.parser.flush.model;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 18:11
+ */
+public abstract class AbstractResponse {
+
+    /**
+     * 获取批处理数
+     *
+     * @return
+     */
+    public abstract int getTaskSize();
+
+}

+ 29 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/FlushRequest.java

@@ -0,0 +1,29 @@
+package org.dbsyncer.parser.flush.model;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:57
+ */
+public class FlushRequest extends AbstractRequest {
+
+    private String metaId;
+
+    private List<Map> list;
+
+    public FlushRequest(String metaId, List<Map> list) {
+        this.metaId = metaId;
+        this.list = list;
+    }
+
+    public String getMetaId() {
+        return metaId;
+    }
+
+    public List<Map> getList() {
+        return list;
+    }
+}

+ 37 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/FlushResponse.java

@@ -0,0 +1,37 @@
+package org.dbsyncer.parser.flush.model;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:57
+ */
+public class FlushResponse extends AbstractResponse {
+
+    private String metaId;
+    private List<Map> dataList = new LinkedList<>();
+
+    public String getMetaId() {
+        return metaId;
+    }
+
+    public void setMetaId(String metaId) {
+        this.metaId = metaId;
+    }
+
+    public List<Map> getDataList() {
+        return dataList;
+    }
+
+    public void setDataList(List<Map> dataList) {
+        this.dataList = dataList;
+    }
+
+    @Override
+    public int getTaskSize() {
+        return dataList.size();
+    }
+}

+ 67 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterRequest.java

@@ -0,0 +1,67 @@
+package org.dbsyncer.parser.flush.model;
+
+import org.dbsyncer.connector.ConnectorMapper;
+import org.dbsyncer.connector.config.Field;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:57
+ */
+public class WriterRequest extends AbstractRequest {
+
+    private String metaId;
+
+    private String tableGroupId;
+
+    private String event;
+
+    private ConnectorMapper connectorMapper;
+
+    private List<Field> fields;
+
+    private Map<String, String> command;
+
+    private Map row;
+
+    public WriterRequest(String metaId, String tableGroupId, String event, ConnectorMapper connectorMapper, List<Field> fields, Map<String, String> command, Map row) {
+        this.metaId = metaId;
+        this.tableGroupId = tableGroupId;
+        this.event = event;
+        this.connectorMapper = connectorMapper;
+        this.fields = fields;
+        this.command = command;
+        this.row = row;
+    }
+
+    public String getMetaId() {
+        return metaId;
+    }
+
+    public String getTableGroupId() {
+        return tableGroupId;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public ConnectorMapper getConnectorMapper() {
+        return connectorMapper;
+    }
+
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+
+    public Map getRow() {
+        return row;
+    }
+}

+ 92 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterResponse.java

@@ -0,0 +1,92 @@
+package org.dbsyncer.parser.flush.model;
+
+import org.dbsyncer.connector.ConnectorMapper;
+import org.dbsyncer.connector.config.Field;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 18:11
+ */
+public class WriterResponse extends AbstractResponse {
+
+    private boolean isMerged;
+
+    private String metaId;
+
+    private String event;
+
+    private ConnectorMapper connectorMapper;
+
+    private List<Field> fields;
+
+    private Map<String, String> command;
+
+    private List<Map> dataList = new LinkedList<>();
+
+    public boolean isMerged() {
+        return isMerged;
+    }
+
+    public void setMerged(boolean merged) {
+        isMerged = merged;
+    }
+
+    public String getMetaId() {
+        return metaId;
+    }
+
+    public void setMetaId(String metaId) {
+        this.metaId = metaId;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public void setEvent(String event) {
+        this.event = event;
+    }
+
+    public ConnectorMapper getConnectorMapper() {
+        return connectorMapper;
+    }
+
+    public void setConnectorMapper(ConnectorMapper connectorMapper) {
+        this.connectorMapper = connectorMapper;
+    }
+
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    public void setFields(List<Field> fields) {
+        this.fields = fields;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+
+    public void setCommand(Map<String, String> command) {
+        this.command = command;
+    }
+
+    public List<Map> getDataList() {
+        return dataList;
+    }
+
+    public void setDataList(List<Map> dataList) {
+        this.dataList = dataList;
+    }
+
+    @Override
+    public int getTaskSize() {
+        return dataList.size();
+    }
+
+}

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Mapping.java

@@ -44,7 +44,7 @@ public class Mapping extends AbstractConfigModel {
     private int readNum = 10000;
 
     // 单次写入
-    private int batchNum = 200;
+    private int batchNum = 1000;
 
     public String getSourceConnectorId() {
         return sourceConnectorId;

+ 2 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/PickerUtil.java

@@ -26,6 +26,8 @@ public abstract class PickerUtil {
      */
     public static TableGroup mergeTableGroupConfig(Mapping mapping, TableGroup tableGroup) {
         TableGroup group = new TableGroup();
+        group.setId(tableGroup.getId());
+        group.setMappingId(mapping.getId());
         group.setFieldMapping(tableGroup.getFieldMapping());
         group.setSourceTable(tableGroup.getSourceTable());
         group.setTargetTable(tableGroup.getTargetTable());

+ 2 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -6,6 +6,7 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.Database;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
@@ -176,7 +177,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             Map<String, String> command = new HashMap<>();
             command.put(SqlBuilderEnum.INSERT.getName(), executor.getInsert());
             ConnectorMapper connectorMapper = connectorFactory.connect(config);
-            connectorFactory.writer(connectorMapper, new WriterBatchConfig(command, executor.getFields(), list));
+            connectorFactory.writer(connectorMapper, new WriterBatchConfig(ConnectorConstant.OPERTION_INSERT, command, executor.getFields(), list));
         }
 
     }

+ 1 - 1
dbsyncer-web/src/main/resources/application.properties

@@ -4,7 +4,7 @@ server.port=18686
 #web
 dbsyncer.web.login.username=admin
 dbsyncer.web.login.password=0DPiKuNIrrVmD8IUCuw1hQxNqZc=
-dbsyncer.web.thread.pool.core.size=32
+dbsyncer.web.thread.pool.core.size=20
 dbsyncer.web.thread.pool.queue.capacity=5000
 server.servlet.session.timeout=1800
 server.servlet.context-path=/