Browse Source

优化消费模型

AE86 1 year ago
parent
commit
6478dfdaa4
20 changed files with 234 additions and 240 deletions
  1. 2 2
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java
  2. 9 85
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java
  3. 7 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java
  4. 1 3
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java
  5. 1 3
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java
  6. 0 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/OracleExtractor.java
  7. 1 3
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java
  8. 0 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java
  9. 1 3
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java
  10. 3 8
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  11. 9 11
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java
  12. 1 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java
  13. 3 4
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  14. 91 65
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  15. 13 9
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java
  16. 6 9
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  17. 28 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java
  18. 37 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/SyncBufferActuator.java
  19. 19 24
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java
  20. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/StorageResponse.java

+ 2 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -54,7 +54,7 @@ public class DataSyncServiceImpl implements DataSyncService {
     private Manager manager;
     private Manager manager;
 
 
     @Resource
     @Resource
-    private BufferActuator writerBufferActuator;
+    private BufferActuator syncBufferActuator;
 
 
     @Override
     @Override
     public MessageVo getMessageVo(String metaId, String messageId) {
     public MessageVo getMessageVo(String metaId, String messageId) {
@@ -149,7 +149,7 @@ public class DataSyncServiceImpl implements DataSyncService {
             if (StringUtil.isNotBlank(retryDataParams)) {
             if (StringUtil.isNotBlank(retryDataParams)) {
                 JsonUtil.parseMap(retryDataParams).forEach((k, v) -> binlogData.put(k, convertValue(binlogData.get(k), (String) v)));
                 JsonUtil.parseMap(retryDataParams).forEach((k, v) -> binlogData.put(k, convertValue(binlogData.get(k), (String) v)));
             }
             }
-            writerBufferActuator.offer(new WriterRequest(tableGroupId, event, binlogData));
+            syncBufferActuator.offer(new WriterRequest(tableGroupId, event, binlogData));
             monitor.removeData(metaId, messageId);
             monitor.removeData(metaId, messageId);
             // 更新失败数
             // 更新失败数
             Meta meta = manager.getMeta(metaId);
             Meta meta = manager.getMeta(metaId);

+ 9 - 85
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -13,18 +13,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 import java.sql.Timestamp;
 import java.sql.Timestamp;
-import java.time.Duration;
-import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.LocalDateTime;
-import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 
 /**
 /**
  * @version 1.0.0
  * @version 1.0.0
@@ -43,51 +36,8 @@ public abstract class AbstractExtractor implements Extractor {
     protected Map<String, String> snapshot;
     protected Map<String, String> snapshot;
     protected String metaId;
     protected String metaId;
     private Watcher watcher;
     private Watcher watcher;
-    private BlockingQueue<ChangedEvent> queue;
-    private Thread consumer;
-    private volatile boolean enableConsumer;
-    private Lock lock = new ReentrantLock();
-    private Condition isFull;
-    private final Duration pollInterval = Duration.of(500, ChronoUnit.MILLIS);
     private final int FLUSH_DELAYED_SECONDS = 20;
     private final int FLUSH_DELAYED_SECONDS = 20;
 
 
-    @Override
-    public void start() {
-        this.lock = new ReentrantLock();
-        this.isFull = lock.newCondition();
-        enableConsumer = true;
-        consumer = new Thread(() -> {
-            while (enableConsumer) {
-                try {
-                    // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
-                    ChangedEvent event = queue.take();
-                    if (null != event) {
-                        // TODO 待优化多表并行模型
-                        watcher.changeEvent(event);
-                        // 更新增量点
-                        refreshEvent(event);
-                    }
-                    watcher.refreshMetaUpdateTime();
-                } catch (InterruptedException e) {
-                    break;
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-        });
-        consumer.setName(new StringBuilder("extractor-consumer-").append(metaId).toString());
-        consumer.setDaemon(false);
-        consumer.start();
-    }
-
-    @Override
-    public void close() {
-        enableConsumer = false;
-        if (consumer != null && !enableConsumer) {
-            consumer.interrupt();
-        }
-    }
-
     @Override
     @Override
     public void register(Watcher watcher) {
     public void register(Watcher watcher) {
         this.watcher = watcher;
         this.watcher = watcher;
@@ -115,6 +65,11 @@ public abstract class AbstractExtractor implements Extractor {
         }
         }
     }
     }
 
 
+    @Override
+    public void refreshEvent(ChangedEvent event) {
+        // nothing to do
+    }
+
     @Override
     @Override
     public void flushEvent() {
     public void flushEvent() {
         // 20s内更新,执行写入
         // 20s内更新,执行写入
@@ -138,15 +93,6 @@ public abstract class AbstractExtractor implements Extractor {
         watcher.errorEvent(e);
         watcher.errorEvent(e);
     }
     }
 
 
-    /**
-     * 更新增量点
-     *
-     * @param event
-     */
-    protected void refreshEvent(ChangedEvent event) {
-        // nothing to do
-    }
-
     protected void sleepInMills(long timeout) {
     protected void sleepInMills(long timeout) {
         try {
         try {
             TimeUnit.MILLISECONDS.sleep(timeout);
             TimeUnit.MILLISECONDS.sleep(timeout);
@@ -162,29 +108,10 @@ public abstract class AbstractExtractor implements Extractor {
      * @param event
      * @param event
      */
      */
     private void processEvent(boolean permitEvent, ChangedEvent event) {
     private void processEvent(boolean permitEvent, ChangedEvent event) {
-        if (!permitEvent) {
-            return;
-        }
-
-        boolean lock = false;
-        try {
-            lock = this.lock.tryLock();
-            if (lock) {
-                if (!queue.offer(event)) {
-                    // 容量上限,阻塞重试
-                    while (!queue.offer(event) && enableConsumer) {
-                        try {
-                            this.isFull.await(pollInterval.toMillis(), TimeUnit.MILLISECONDS);
-                        } catch (InterruptedException e) {
-                            break;
-                        }
-                    }
-                }
-            }
-        } finally {
-            if (lock) {
-                this.lock.unlock();
-            }
+        if (permitEvent) {
+            watcher.changeEvent(event);
+            // TODO 待优化回调
+            refreshEvent(event);
         }
         }
     }
     }
 
 
@@ -221,7 +148,4 @@ public abstract class AbstractExtractor implements Extractor {
         this.metaId = metaId;
         this.metaId = metaId;
     }
     }
 
 
-    public void setQueue(BlockingQueue<ChangedEvent> queue) {
-        this.queue = queue;
-    }
 }
 }

+ 7 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -29,6 +29,13 @@ public interface Extractor {
      */
      */
     void changeEvent(ChangedEvent event);
     void changeEvent(ChangedEvent event);
 
 
+    /**
+     * 更新增量点
+     *
+     * @param event
+     */
+    void refreshEvent(ChangedEvent event);
+
     /**
     /**
      * 刷新增量点事件
      * 刷新增量点事件
      */
      */

+ 1 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -72,7 +72,6 @@ public class FileExtractor extends AbstractExtractor {
             final FileConfig config = connectorMapper.getConfig();
             final FileConfig config = connectorMapper.getConfig();
             final String mapperCacheKey = connectorFactory.getConnector(connectorMapper).getConnectorMapperCacheKey(connectorConfig);
             final String mapperCacheKey = connectorFactory.getConnector(connectorMapper).getConnectorMapperCacheKey(connectorConfig);
             connected = true;
             connected = true;
-            super.start();
 
 
             separator = config.getSeparator();
             separator = config.getSeparator();
             initPipeline(config.getFileDir());
             initPipeline(config.getFileDir());
@@ -120,7 +119,6 @@ public class FileExtractor extends AbstractExtractor {
     @Override
     @Override
     public void close() {
     public void close() {
         try {
         try {
-            super.close();
             closePipelineAndWatch();
             closePipelineAndWatch();
             connected = false;
             connected = false;
             if (null != worker && !worker.isInterrupted()) {
             if (null != worker && !worker.isInterrupted()) {
@@ -133,7 +131,7 @@ public class FileExtractor extends AbstractExtractor {
     }
     }
 
 
     @Override
     @Override
-    protected void refreshEvent(ChangedEvent event) {
+    public void refreshEvent(ChangedEvent event) {
         if (event.getNextFileName() != null && event.getPosition() != null) {
         if (event.getNextFileName() != null && event.getPosition() != null) {
             snapshot.put(event.getNextFileName(), String.valueOf(event.getPosition()));
             snapshot.put(event.getNextFileName(), String.valueOf(event.getPosition()));
         }
         }

+ 1 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -65,7 +65,6 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                 logger.error("MysqlExtractor is already started");
                 logger.error("MysqlExtractor is already started");
                 return;
                 return;
             }
             }
-            super.start();
             run();
             run();
             connected = true;
             connected = true;
         } catch (Exception e) {
         } catch (Exception e) {
@@ -81,7 +80,6 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
         try {
         try {
             connectLock.lock();
             connectLock.lock();
             connected = false;
             connected = false;
-            super.close();
             if (null != client) {
             if (null != client) {
                 client.disconnect();
                 client.disconnect();
             }
             }
@@ -93,7 +91,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
     }
     }
 
 
     @Override
     @Override
-    protected void refreshEvent(ChangedEvent event) {
+    public void refreshEvent(ChangedEvent event) {
         refreshSnapshot(event.getNextFileName(), (Long) event.getPosition());
         refreshSnapshot(event.getNextFileName(), (Long) event.getPosition());
     }
     }
 
 

+ 0 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/OracleExtractor.java

@@ -21,7 +21,6 @@ public class OracleExtractor extends AbstractDatabaseExtractor {
     @Override
     @Override
     public void start() {
     public void start() {
         try {
         try {
-            super.start();
             final DatabaseConfig config = (DatabaseConfig) connectorConfig;
             final DatabaseConfig config = (DatabaseConfig) connectorConfig;
             String username = config.getUsername();
             String username = config.getUsername();
             String password = config.getPassword();
             String password = config.getPassword();
@@ -38,7 +37,6 @@ public class OracleExtractor extends AbstractDatabaseExtractor {
 
 
     @Override
     @Override
     public void close() {
     public void close() {
-        super.close();
         if (null != client) {
         if (null != client) {
             client.close();
             client.close();
         }
         }

+ 1 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java

@@ -70,7 +70,6 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
                 return;
                 return;
             }
             }
 
 
-            super.start();
             connectorMapper = (DatabaseConnectorMapper) connectorFactory.connect(connectorConfig);
             connectorMapper = (DatabaseConnectorMapper) connectorFactory.connect(connectorConfig);
             config = connectorMapper.getConfig();
             config = connectorMapper.getConfig();
 
 
@@ -120,7 +119,6 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
     public void close() {
     public void close() {
         try {
         try {
             connected = false;
             connected = false;
-            super.close();
             if (null != worker && !worker.isInterrupted()) {
             if (null != worker && !worker.isInterrupted()) {
                 worker.interrupt();
                 worker.interrupt();
                 worker = null;
                 worker = null;
@@ -134,7 +132,7 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
     }
     }
 
 
     @Override
     @Override
-    protected void refreshEvent(ChangedEvent event) {
+    public void refreshEvent(ChangedEvent event) {
         snapshot.put(LSN_POSITION, String.valueOf(event.getPosition()));
         snapshot.put(LSN_POSITION, String.valueOf(event.getPosition()));
     }
     }
 
 

+ 0 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -64,7 +64,6 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
 
         taskKey = UUIDUtil.getUUID();
         taskKey = UUIDUtil.getUUID();
         running = true;
         running = true;
-        super.start();
 
 
         scheduledTaskService.start(taskKey, listenerConfig.getCron(), this);
         scheduledTaskService.start(taskKey, listenerConfig.getCron(), this);
         logger.info("启动定时任务:{} >> {}", taskKey, listenerConfig.getCron());
         logger.info("启动定时任务:{} >> {}", taskKey, listenerConfig.getCron());
@@ -93,7 +92,6 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
 
     @Override
     @Override
     public void close() {
     public void close() {
-        super.close();
         scheduledTaskService.stop(taskKey);
         scheduledTaskService.stop(taskKey);
         running = false;
         running = false;
     }
     }

+ 1 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -82,7 +82,6 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
                 return;
                 return;
             }
             }
             connected = true;
             connected = true;
-            super.start();
             connect();
             connect();
             readTables();
             readTables();
             Assert.notEmpty(tables, "No tables available");
             Assert.notEmpty(tables, "No tables available");
@@ -109,7 +108,6 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     @Override
     @Override
     public void close() {
     public void close() {
         if (connected) {
         if (connected) {
-            super.close();
             LsnPuller.removeExtractor(metaId);
             LsnPuller.removeExtractor(metaId);
             if (null != worker && !worker.isInterrupted()) {
             if (null != worker && !worker.isInterrupted()) {
                 worker.interrupt();
                 worker.interrupt();
@@ -120,7 +118,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     }
     }
 
 
     @Override
     @Override
-    protected void refreshEvent(ChangedEvent event) {
+    public void refreshEvent(ChangedEvent event) {
         if (event.getPosition() != null) {
         if (event.getPosition() != null) {
             snapshot.put(LSN_POSITION, event.getPosition().toString());
             snapshot.put(LSN_POSITION, event.getPosition().toString());
         }
         }

+ 3 - 8
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -2,8 +2,8 @@ package org.dbsyncer.manager.puller;
 
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.PageChangedEvent;
 import org.dbsyncer.common.event.PageChangedEvent;
-import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
@@ -44,7 +44,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 /**
 /**
@@ -173,7 +172,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             extractor.setSourceTable(sourceTable);
             extractor.setSourceTable(sourceTable);
             extractor.setSnapshot(meta.getSnapshot());
             extractor.setSnapshot(meta.getSnapshot());
             extractor.setMetaId(meta.getId());
             extractor.setMetaId(meta.getId());
-            extractor.setQueue(new LinkedBlockingQueue<>(8192));
             return extractor;
             return extractor;
         }
         }
 
 
@@ -181,7 +179,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
     }
     }
 
 
     abstract class AbstractConsumer<E extends ChangedEvent> implements Watcher {
     abstract class AbstractConsumer<E extends ChangedEvent> implements Watcher {
-        protected Mapping mapping;
         protected Meta meta;
         protected Meta meta;
 
 
         public abstract void onChange(E e);
         public abstract void onChange(E e);
@@ -217,7 +214,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
 
 
         private List<FieldPicker> tablePicker = new LinkedList<>();
         private List<FieldPicker> tablePicker = new LinkedList<>();
         public QuartzConsumer(Mapping mapping, List<TableGroup> tableGroups) {
         public QuartzConsumer(Mapping mapping, List<TableGroup> tableGroups) {
-            this.mapping = mapping;
             this.meta = manager.getMeta(mapping.getMetaId());
             this.meta = manager.getMeta(mapping.getMetaId());
             Assert.notNull(meta, "The meta is null.");
             Assert.notNull(meta, "The meta is null.");
             tableGroups.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
             tableGroups.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
@@ -230,7 +226,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             event.setSourceTableName(tableGroup.getSourceTable().getName());
             event.setSourceTableName(tableGroup.getSourceTable().getName());
 
 
             // 处理过程有异常向上抛
             // 处理过程有异常向上抛
-            parser.execute(mapping, tableGroup, event);
+            parser.execute(tableGroup, event);
         }
         }
     }
     }
 
 
@@ -239,7 +235,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         private Map<String, List<FieldPicker>> tablePicker = new LinkedHashMap<>();
         private Map<String, List<FieldPicker>> tablePicker = new LinkedHashMap<>();
 
 
         public LogConsumer(Mapping mapping, List<TableGroup> tableGroups) {
         public LogConsumer(Mapping mapping, List<TableGroup> tableGroups) {
-            this.mapping = mapping;
             this.meta = manager.getMeta(mapping.getMetaId());
             this.meta = manager.getMeta(mapping.getMetaId());
             Assert.notNull(meta, "The meta is null.");
             Assert.notNull(meta, "The meta is null.");
             tableGroups.forEach(t -> {
             tableGroups.forEach(t -> {
@@ -260,7 +255,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
                     final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
                     final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
                     if (picker.filter(changedRow)) {
                     if (picker.filter(changedRow)) {
                         event.setChangedRow(changedRow);
                         event.setChangedRow(changedRow);
-                        parser.execute(mapping, picker.getTableGroup(), event);
+                        parser.execute(picker.getTableGroup(), event);
                     }
                     }
                 });
                 });
             }
             }

+ 9 - 11
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -27,12 +27,11 @@ import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.query.Query;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
 import java.time.LocalDateTime;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
@@ -54,20 +53,19 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
-    @Autowired
+    @Resource
     private Manager manager;
     private Manager manager;
 
 
-    @Qualifier("taskExecutor")
-    @Autowired
+    @Resource
     private Executor taskExecutor;
     private Executor taskExecutor;
 
 
-    @Autowired
-    private BufferActuator writerBufferActuator;
+    @Resource
+    private BufferActuator syncBufferActuator;
 
 
-    @Autowired
+    @Resource
     private BufferActuator storageBufferActuator;
     private BufferActuator storageBufferActuator;
 
 
-    @Autowired
+    @Resource
     private ScheduledTaskService scheduledTaskService;
     private ScheduledTaskService scheduledTaskService;
 
 
     private volatile boolean running;
     private volatile boolean running;
@@ -198,8 +196,8 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
         report.setInsert(mappingReportMetric.getInsert());
         report.setInsert(mappingReportMetric.getInsert());
         report.setUpdate(mappingReportMetric.getUpdate());
         report.setUpdate(mappingReportMetric.getUpdate());
         report.setDelete(mappingReportMetric.getDelete());
         report.setDelete(mappingReportMetric.getDelete());
-        report.setQueueUp(writerBufferActuator.getQueue().size());
-        report.setQueueCapacity(writerBufferActuator.getQueueCapacity());
+        report.setQueueUp(syncBufferActuator.getQueue().size());
+        report.setQueueCapacity(syncBufferActuator.getQueueCapacity());
         return report;
         return report;
     }
     }
 
 

+ 1 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -161,11 +161,10 @@ public interface Parser {
     /**
     /**
      * 增量同步
      * 增量同步
      *
      *
-     * @param mapping
      * @param tableGroup
      * @param tableGroup
      * @param changedEvent
      * @param changedEvent
      */
      */
-    void execute(Mapping mapping, TableGroup tableGroup, ChangedEvent changedEvent);
+    void execute(TableGroup tableGroup, ChangedEvent changedEvent);
 
 
     /**
     /**
      * 批执行
      * 批执行

+ 3 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -89,7 +89,7 @@ public class ParserFactory implements Parser {
     private ApplicationContext applicationContext;
     private ApplicationContext applicationContext;
 
 
     @Resource
     @Resource
-    private BufferActuator writerBufferActuator;
+    private BufferActuator syncBufferActuator;
 
 
     @Override
     @Override
     public ConnectorMapper connect(AbstractConnectorConfig config) {
     public ConnectorMapper connect(AbstractConnectorConfig config) {
@@ -306,9 +306,8 @@ public class ParserFactory implements Parser {
     }
     }
 
 
     @Override
     @Override
-    public void execute(Mapping mapping, TableGroup tableGroup, ChangedEvent event) {
-        logger.debug("Table[{}] {}, data:{}", event.getSourceTableName(), event.getEvent(), event.getChangedRow());
-        writerBufferActuator.offer(new WriterRequest(tableGroup.getId(), event.getEvent(), event.getChangedRow()));
+    public void execute(TableGroup tableGroup, ChangedEvent event) {
+        syncBufferActuator.offer(new WriterRequest(tableGroup.getId(), event.getEvent(), event.getChangedRow()));
     }
     }
 
 
     /**
     /**

+ 91 - 65
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -1,22 +1,22 @@
 package org.dbsyncer.parser.flush;
 package org.dbsyncer.parser.flush;
 
 
 import org.dbsyncer.common.config.BufferActuatorConfig;
 import org.dbsyncer.common.config.BufferActuatorConfig;
-import org.dbsyncer.common.scheduled.ScheduledTaskJob;
-import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.util.Assert;
 
 
-import javax.annotation.PostConstruct;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.ParameterizedType;
+import java.time.Duration;
 import java.time.Instant;
 import java.time.Instant;
-import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantLock;
 
 
@@ -29,30 +29,41 @@ import java.util.concurrent.locks.ReentrantLock;
  * @version 1.0.0
  * @version 1.0.0
  * @date 2022/3/27 17:36
  * @date 2022/3/27 17:36
  */
  */
-public abstract class AbstractBufferActuator<Request, Response> implements BufferActuator, ScheduledTaskJob {
+public abstract class AbstractBufferActuator<Request extends BufferRequest, Response extends BufferResponse> implements BufferActuator {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    @Autowired
-    private ScheduledTaskService scheduledTaskService;
-
-    @Autowired
-    private BufferActuatorConfig bufferActuatorConfig;
-
-    private LocalDateTime lastBufferWarningTime;
-
-    private BlockingQueue<Request> buffer;
-
-    private final Lock lock = new ReentrantLock(true);
-
+    private Class<Response> responseClazz;
+    private final Lock TASK_LOCK = new ReentrantLock(true);
     private volatile boolean running;
     private volatile boolean running;
+    private final Lock BUFFER_LOCK = new ReentrantLock();
+    private final Condition IS_FULL = BUFFER_LOCK.newCondition();
+    private final Duration OFFER_INTERVAL = Duration.of(500, ChronoUnit.MILLIS);
+    private BufferActuatorConfig config;
+    private BlockingQueue<Request> queue;
+
+    public AbstractBufferActuator() {
+        int level = 5;
+        Class<?> aClass = getClass();
+        while (level > 0) {
+            if (aClass.getSuperclass() == AbstractBufferActuator.class) {
+                responseClazz = (Class<Response>) ((ParameterizedType) aClass.getGenericSuperclass()).getActualTypeArguments()[1];
+                break;
+            }
+            aClass = aClass.getSuperclass();
+            level--;
+        }
+        Assert.notNull(responseClazz, String.format("%s的父类%s泛型参数Response为空.", getClass().getName(), AbstractBufferActuator.class.getName()));
+    }
 
 
-    private final Class<Response> responseClazz = (Class<Response>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
 
 
-    @PostConstruct
-    private void init() {
-        buffer = new LinkedBlockingQueue(getQueueCapacity());
-        scheduledTaskService.start(bufferActuatorConfig.getPeriodMillisecond(), this);
+    /**
+     * 初始化配置
+     *
+     * @param config
+     */
+    public void buildConfig(BufferActuatorConfig config) {
+        this.config = config;
+        this.queue = new LinkedBlockingQueue(config.getQueueCapacity());
     }
     }
 
 
     /**
     /**
@@ -71,13 +82,6 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
      */
      */
     protected abstract void partition(Request request, Response response);
     protected abstract void partition(Request request, Response response);
 
 
-    /**
-     * 批处理
-     *
-     * @param response
-     */
-    protected abstract void pull(Response response);
-
     /**
     /**
      * 是否跳过分区处理
      * 是否跳过分区处理
      *
      *
@@ -89,66 +93,85 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
         return false;
         return false;
     }
     }
 
 
-    @Override
-    public Queue getQueue() {
-        return buffer;
-    }
-
-    @Override
-    public int getQueueCapacity() {
-        return bufferActuatorConfig.getQueueCapacity();
-    }
+    /**
+     * 批处理
+     *
+     * @param response
+     */
+    protected abstract void pull(Response response);
 
 
     @Override
     @Override
-    public boolean offer(BufferRequest request) {
-        boolean offer = buffer.offer((Request) request);
-        if (!offer) {
-            LocalDateTime now = LocalDateTime.now();
-            if (null == lastBufferWarningTime) {
-                lastBufferWarningTime = now;
+    public void offer(BufferRequest request) {
+        boolean lock = false;
+        try {
+            lock = BUFFER_LOCK.tryLock();
+            if (lock) {
+                if (!queue.offer((Request) request)) {
+                    logger.warn("[{}]缓存队列容量已达上限[{}], 正在阻塞重试.", this.getClass().getSimpleName(), getQueueCapacity());
+
+                    // 容量上限,阻塞重试
+                    while (!queue.offer((Request) request)) {
+                        try {
+                            IS_FULL.await(OFFER_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
+                        } catch (InterruptedException e) {
+                            break;
+                        }
+                    }
+                }
             }
             }
-
-            // 3s前有警告时间
-            if (now.minusSeconds(3).isAfter(lastBufferWarningTime)) {
-                logger.warn("[{}]缓存队列容量已达上限,建议修改参数[dbsyncer.parser.flush.buffer.actuator.queue-capacity={}], ", this.getClass().getSimpleName(), getQueueCapacity());
-                lastBufferWarningTime = now;
+        } finally {
+            if (lock) {
+                BUFFER_LOCK.unlock();
             }
             }
         }
         }
-        return offer;
     }
     }
 
 
     @Override
     @Override
-    public void run() {
+    public void batchExecute() {
         if (running) {
         if (running) {
             return;
             return;
         }
         }
 
 
-        final Lock bufferLock = lock;
         boolean locked = false;
         boolean locked = false;
         try {
         try {
-            locked = bufferLock.tryLock();
+            locked = TASK_LOCK.tryLock();
             if (locked) {
             if (locked) {
                 running = true;
                 running = true;
-                flush(buffer);
+                submit();
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
-            logger.error(e.getMessage());
+            logger.error(e.getMessage(), e);
         } finally {
         } finally {
             if (locked) {
             if (locked) {
                 running = false;
                 running = false;
-                bufferLock.unlock();
+                TASK_LOCK.unlock();
             }
             }
         }
         }
     }
     }
 
 
-    private void flush(Queue<Request> queue) throws IllegalAccessException, InstantiationException {
+    @Override
+    public Queue getQueue() {
+        return queue;
+    }
+
+    @Override
+    public int getQueueCapacity() {
+        return config.getQueueCapacity();
+    }
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
+    private void submit() throws IllegalAccessException, InstantiationException {
         if (queue.isEmpty()) {
         if (queue.isEmpty()) {
             return;
             return;
         }
         }
 
 
         AtomicLong batchCounter = new AtomicLong();
         AtomicLong batchCounter = new AtomicLong();
         Map<String, Response> map = new LinkedHashMap<>();
         Map<String, Response> map = new LinkedHashMap<>();
-        while (!queue.isEmpty() && batchCounter.get() < bufferActuatorConfig.getBatchCount()) {
+        while (!queue.isEmpty() && batchCounter.get() < config.getBatchCount()) {
             Request poll = queue.poll();
             Request poll = queue.poll();
             String key = getPartitionKey(poll);
             String key = getPartitionKey(poll);
             if (!map.containsKey(key)) {
             if (!map.containsKey(key)) {
@@ -164,19 +187,22 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
             }
             }
         }
         }
 
 
-        map.forEach((key, flushTask) -> {
+        map.forEach((key, response) -> {
             long now = Instant.now().toEpochMilli();
             long now = Instant.now().toEpochMilli();
             try {
             try {
-                pull(flushTask);
+                pull(response);
             } catch (Exception e) {
             } catch (Exception e) {
-                logger.error("[{}]异常{}", key);
+                logger.error(e.getMessage(), e);
             }
             }
-            final BufferResponse task = (BufferResponse) flushTask;
-            logger.info("[{}{}]{}, {}ms", key, task.getSuffixName(), task.getTaskSize(), (Instant.now().toEpochMilli() - now));
+            final BufferResponse resp = (BufferResponse) response;
+            logger.info("[{}{}]{}, {}ms", key, resp.getSuffixName(), resp.getTaskSize(), (Instant.now().toEpochMilli() - now));
         });
         });
         map.clear();
         map.clear();
         map = null;
         map = null;
         batchCounter = null;
         batchCounter = null;
     }
     }
 
 
+    public BufferActuatorConfig getConfig() {
+        return config;
+    }
 }
 }

+ 13 - 9
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -7,7 +7,19 @@ import java.util.Queue;
  * @version 1.0.0
  * @version 1.0.0
  * @date 2022/3/27 17:34
  * @date 2022/3/27 17:34
  */
  */
-public interface BufferActuator {
+public interface BufferActuator extends Cloneable {
+
+    /**
+     * 提交任务
+     *
+     * @param request
+     */
+    void offer(BufferRequest request);
+
+    /**
+     * 批量执行
+     */
+    void batchExecute();
 
 
     /**
     /**
      * 获取缓存队列
      * 获取缓存队列
@@ -23,12 +35,4 @@ public interface BufferActuator {
      */
      */
     int getQueueCapacity();
     int getQueueCapacity();
 
 
-    /**
-     * 提交任务
-     *
-     * @param request
-     * @return true/false
-     */
-    boolean offer(BufferRequest request);
-
 }
 }

+ 6 - 9
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -13,9 +13,9 @@ import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.util.BinlogMessageUtil;
 import org.dbsyncer.storage.util.BinlogMessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
+import javax.annotation.Resource;
 import java.time.Instant;
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
@@ -35,16 +35,16 @@ public class FlushServiceImpl implements FlushService {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
-    @Autowired
+    @Resource
     private StorageService storageService;
     private StorageService storageService;
 
 
-    @Autowired
+    @Resource
     private SnowflakeIdWorker snowflakeIdWorker;
     private SnowflakeIdWorker snowflakeIdWorker;
 
 
-    @Autowired
+    @Resource
     private BufferActuator storageBufferActuator;
     private BufferActuator storageBufferActuator;
 
 
-    @Autowired
+    @Resource
     private IncrementDataConfig flushDataConfig;
     private IncrementDataConfig flushDataConfig;
 
 
     @Override
     @Override
@@ -76,10 +76,7 @@ public class FlushServiceImpl implements FlushService {
                 logger.warn("可能存在Blob或inputStream大文件类型, 无法序列化:{}", r);
                 logger.warn("可能存在Blob或inputStream大文件类型, 无法序列化:{}", r);
             }
             }
 
 
-            // 缓存队列满时,打印日志
-            if (!storageBufferActuator.offer(new StorageRequest(metaId, row))) {
-                logger.error("缓存队列容量已达上限, 无法持久化:{}", r);
-            }
+            storageBufferActuator.offer(new StorageRequest(metaId, row));
         });
         });
     }
     }
 
 

+ 28 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -1,24 +1,43 @@
 package org.dbsyncer.parser.flush.impl;
 package org.dbsyncer.parser.flush.impl;
 
 
+import org.dbsyncer.common.config.BufferActuatorConfig;
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.StorageRequest;
 import org.dbsyncer.parser.model.StorageRequest;
 import org.dbsyncer.parser.model.StorageResponse;
 import org.dbsyncer.parser.model.StorageResponse;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
 /**
 /**
+ * 持久化任务缓冲执行器
+ *
  * @author AE86
  * @author AE86
  * @version 1.0.0
  * @version 1.0.0
  * @date 2022/3/27 16:50
  * @date 2022/3/27 16:50
  */
  */
 @Component
 @Component
-public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest, StorageResponse> {
+public final class StorageBufferActuator extends AbstractBufferActuator<StorageRequest, StorageResponse> implements ScheduledTaskJob {
+
+    @Resource
+    private BufferActuatorConfig bufferActuatorConfig;
+
+    @Resource
+    private ScheduledTaskService scheduledTaskService;
 
 
-    @Autowired
+    @Resource
     private StorageService storageService;
     private StorageService storageService;
 
 
+    @PostConstruct
+    private void init() {
+        super.buildConfig(bufferActuatorConfig);
+        scheduledTaskService.start(bufferActuatorConfig.getPeriodMillisecond(), this);
+    }
+
     @Override
     @Override
     protected String getPartitionKey(StorageRequest request) {
     protected String getPartitionKey(StorageRequest request) {
         return request.getMetaId();
         return request.getMetaId();
@@ -34,4 +53,10 @@ public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest
     protected void pull(StorageResponse response) {
     protected void pull(StorageResponse response) {
         storageService.addBatch(StorageEnum.DATA, response.getMetaId(), response.getDataList());
         storageService.addBatch(StorageEnum.DATA, response.getMetaId(), response.getDataList());
     }
     }
+
+    @Override
+    public void run() {
+        batchExecute();
+    }
+
 }
 }

+ 37 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/SyncBufferActuator.java

@@ -0,0 +1,37 @@
+package org.dbsyncer.parser.flush.impl;
+
+import org.dbsyncer.common.config.BufferActuatorConfig;
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+/**
+ * 同步缓冲执行器
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:50
+ */
+@Component
+public final class SyncBufferActuator extends WriterBufferActuator implements ScheduledTaskJob {
+
+    @Resource
+    private BufferActuatorConfig bufferActuatorConfig;
+
+    @Resource
+    private ScheduledTaskService scheduledTaskService;
+
+    @PostConstruct
+    private void init() {
+        super.buildConfig(bufferActuatorConfig);
+        scheduledTaskService.start(bufferActuatorConfig.getPeriodMillisecond(), this);
+    }
+
+    @Override
+    public void run() {
+        batchExecute();
+    }
+}

+ 19 - 24
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.parser.flush.impl;
 package org.dbsyncer.parser.flush.impl;
 
 
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.cache.CacheService;
-import org.dbsyncer.common.config.BufferActuatorConfig;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.model.IncrementConvertContext;
 import org.dbsyncer.common.model.IncrementConvertContext;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Result;
@@ -21,39 +20,36 @@ import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
 import org.dbsyncer.plugin.PluginFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
 
 
+import javax.annotation.Resource;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
 /**
 /**
+ * 同步任务缓冲执行器
+ *
  * @author AE86
  * @author AE86
  * @version 1.0.0
  * @version 1.0.0
  * @date 2022/3/27 16:50
  * @date 2022/3/27 16:50
  */
  */
-@Component
 public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest, WriterResponse> {
 public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest, WriterResponse> {
 
 
-    @Autowired
+    @Resource
     private ConnectorFactory connectorFactory;
     private ConnectorFactory connectorFactory;
 
 
-    @Autowired
+    @Resource
     private ParserFactory parserFactory;
     private ParserFactory parserFactory;
 
 
-    @Autowired
+    @Resource
     private PluginFactory pluginFactory;
     private PluginFactory pluginFactory;
 
 
-    @Autowired
+    @Resource
     private FlushStrategy flushStrategy;
     private FlushStrategy flushStrategy;
 
 
-    @Autowired
+    @Resource
     private CacheService cacheService;
     private CacheService cacheService;
 
 
-    @Autowired
-    private BufferActuatorConfig bufferActuatorConfig;
-
     @Override
     @Override
     protected String getPartitionKey(WriterRequest request) {
     protected String getPartitionKey(WriterRequest request) {
         return request.getTableGroupId();
         return request.getTableGroupId();
@@ -62,12 +58,17 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Override
     @Override
     protected void partition(WriterRequest request, WriterResponse response) {
     protected void partition(WriterRequest request, WriterResponse response) {
         response.getDataList().add(request.getRow());
         response.getDataList().add(request.getRow());
-        if (response.isMerged()) {
-            return;
+        if (!response.isMerged()) {
+            response.setTableGroupId(request.getTableGroupId());
+            response.setEvent(request.getEvent());
+            response.setMerged(true);
         }
         }
-        response.setTableGroupId(request.getTableGroupId());
-        response.setEvent(request.getEvent());
-        response.setMerged(true);
+    }
+
+    @Override
+    protected boolean skipPartition(WriterRequest nextRequest, WriterResponse response) {
+        // 并发场景,同一条数据可能连续触发Insert > Delete > Insert,批处理任务中出现不同事件时,跳过分区处理
+        return !StringUtil.equals(nextRequest.getEvent(), response.getEvent());
     }
     }
 
 
     @Override
     @Override
@@ -95,7 +96,7 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         pluginFactory.convert(group.getPlugin(), context);
         pluginFactory.convert(group.getPlugin(), context);
 
 
         // 5、批量执行同步
         // 5、批量执行同步
-        BatchWriter batchWriter = new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, bufferActuatorConfig.getWriterBatchCount());
+        BatchWriter batchWriter = new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, getConfig().getWriterBatchCount());
         Result result = parserFactory.writeBatch(context, batchWriter);
         Result result = parserFactory.writeBatch(context, batchWriter);
 
 
         // 6、持久化同步结果
         // 6、持久化同步结果
@@ -107,12 +108,6 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         pluginFactory.postProcessAfter(group.getPlugin(), context);
         pluginFactory.postProcessAfter(group.getPlugin(), context);
     }
     }
 
 
-    @Override
-    protected boolean skipPartition(WriterRequest nextRequest, WriterResponse response) {
-        // 并发场景,同一条数据可能连续触发Insert > Delete > Insert,批处理任务中出现不同事件时,跳过分区处理
-        return !StringUtil.equals(nextRequest.getEvent(), response.getEvent());
-    }
-
     /**
     /**
      * 获取连接器配置
      * 获取连接器配置
      *
      *

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/StorageResponse.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.parser.model;
 package org.dbsyncer.parser.model;
 
 
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.flush.BufferResponse;
 import org.dbsyncer.parser.flush.BufferResponse;
 
 
 import java.util.LinkedList;
 import java.util.LinkedList;
@@ -13,7 +14,6 @@ import java.util.Map;
  */
  */
 public class StorageResponse implements BufferResponse {
 public class StorageResponse implements BufferResponse {
 
 
-    private static final String EMPTY = "";
     private String metaId;
     private String metaId;
     private List<Map> dataList = new LinkedList<>();
     private List<Map> dataList = new LinkedList<>();
 
 
@@ -40,6 +40,6 @@ public class StorageResponse implements BufferResponse {
 
 
     @Override
     @Override
     public String getSuffixName() {
     public String getSuffixName() {
-        return EMPTY;
+        return StringUtil.EMPTY;
     }
     }
 }
 }