Explorar el Código

发布刷新增量点事件

Signed-off-by: AE86 <836391306@qq.com>
AE86 hace 1 año
padre
commit
43ccbf28ec
Se han modificado 20 ficheros con 195 adiciones y 74 borrados
  1. 5 1
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java
  2. 1 7
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/ChangedEvent.java
  3. 59 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/ChangedOffset.java
  4. 7 7
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/ClosedEvent.java
  5. 7 15
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/CommonChangedEvent.java
  6. 33 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/RefreshOffsetEvent.java
  7. 3 2
      dbsyncer-common/src/main/java/org/dbsyncer/common/spi/Extractor.java
  8. 3 1
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java
  9. 4 4
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java
  10. 3 3
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java
  11. 4 4
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java
  12. 4 4
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java
  13. 1 1
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java
  14. 29 16
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  15. 3 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java
  16. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  17. 0 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  18. 9 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java
  19. 11 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java
  20. 7 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterResponse.java

+ 5 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -4,6 +4,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import org.dbsyncer.biz.DataSyncService;
 import org.dbsyncer.biz.vo.BinlogColumnVo;
 import org.dbsyncer.biz.vo.MessageVo;
+import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.common.util.JsonUtil;
@@ -149,7 +150,10 @@ public class DataSyncServiceImpl implements DataSyncService {
             if (StringUtil.isNotBlank(retryDataParams)) {
                 JsonUtil.parseMap(retryDataParams).forEach((k, v) -> binlogData.put(k, convertValue(binlogData.get(k), (String) v)));
             }
-            syncBufferActuator.offer(new WriterRequest(tableGroupId, event, binlogData));
+            // TODO 待获取源表名称
+            RowChangedEvent changedEvent = new RowChangedEvent(StringUtil.EMPTY, event, Collections.EMPTY_LIST);
+            changedEvent.setChangedRow(binlogData);
+            syncBufferActuator.offer(new WriterRequest(tableGroupId, changedEvent));
             monitor.removeData(metaId, messageId);
             // 更新失败数
             Meta meta = manager.getMeta(metaId);

+ 1 - 7
dbsyncer-common/src/main/java/org/dbsyncer/common/event/ChangedEvent.java

@@ -35,17 +35,11 @@ public interface ChangedEvent {
      */
     Map<String, Object> getChangedRow();
 
-    /**
-     * 获取增量文件名称
-     * @return
-     */
-    String getNextFileName();
-
     /**
      * 获取增量偏移量
      *
      * @return
      */
-    Object getPosition();
+    ChangedOffset getChangedOffset();
 
 }

+ 59 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/ChangedOffset.java

@@ -0,0 +1,59 @@
+package org.dbsyncer.common.event;
+
+/**
+ * 增量偏移量
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2023-08-23 20:00
+ */
+public final class ChangedOffset {
+    /**
+     * 驱动ID
+     */
+    private String metaId;
+    /**
+     * 增量文件名称
+     */
+    private String nextFileName;
+    /**
+     * 增量偏移量
+     */
+    private Object position;
+    /**
+     * 是否触发刷新增量点事件
+     */
+    private boolean refreshOffset;
+
+    public String getMetaId() {
+        return metaId;
+    }
+
+    public void setMetaId(String metaId) {
+        this.metaId = metaId;
+    }
+
+    public String getNextFileName() {
+        return nextFileName;
+    }
+
+    public void setNextFileName(String nextFileName) {
+        this.nextFileName = nextFileName;
+    }
+
+    public Object getPosition() {
+        return position;
+    }
+
+    public void setPosition(Object position) {
+        this.position = position;
+    }
+
+    public boolean isRefreshOffset() {
+        return refreshOffset;
+    }
+
+    public void setRefreshOffset(boolean refreshOffset) {
+        this.refreshOffset = refreshOffset;
+    }
+}

+ 7 - 7
dbsyncer-common/src/main/java/org/dbsyncer/common/event/ClosedEvent.java

@@ -10,9 +10,9 @@ import org.springframework.context.event.ApplicationContextEvent;
  * @Author AE86
  * @Date 2020-04-26 22:45
  */
-public class ClosedEvent extends ApplicationContextEvent {
+public final class ClosedEvent extends ApplicationContextEvent {
 
-    private String id;
+    private String metaId;
 
     /**
      * Create a new ContextStartedEvent.
@@ -20,12 +20,12 @@ public class ClosedEvent extends ApplicationContextEvent {
      * @param source the {@code ApplicationContext} that the event is raised for
      *               (must not be {@code null})
      */
-    public ClosedEvent(ApplicationContext source, String id) {
+    public ClosedEvent(ApplicationContext source, String metaId) {
         super(source);
-        this.id = id;
+        this.metaId = metaId;
     }
 
-    public String getId() {
-        return id;
+    public String getMetaId() {
+        return metaId;
     }
-}
+}

+ 7 - 15
dbsyncer-common/src/main/java/org/dbsyncer/common/event/CommonChangedEvent.java

@@ -26,14 +26,10 @@ public class CommonChangedEvent implements ChangedEvent {
      * 变更行数据
      */
     private Map<String, Object> changedRow;
-    /**
-     * 增量文件名称
-     */
-    private String nextFileName;
     /**
      * 增量偏移量
      */
-    private Object position;
+    private ChangedOffset changedOffset = new ChangedOffset();
 
     public String getSourceTableName() {
         return sourceTableName;
@@ -59,20 +55,16 @@ public class CommonChangedEvent implements ChangedEvent {
         this.changedRow = changedRow;
     }
 
-    public String getNextFileName() {
-        return nextFileName;
+    @Override
+    public ChangedOffset getChangedOffset() {
+        return changedOffset;
     }
 
     public void setNextFileName(String nextFileName) {
-        this.nextFileName = nextFileName;
-    }
-
-    public Object getPosition() {
-        return position;
+        changedOffset.setNextFileName(nextFileName);
     }
 
-    public CommonChangedEvent setPosition(Object position) {
-        this.position = position;
-        return this;
+    public void setPosition(Object position) {
+        changedOffset.setPosition(position);
     }
 }

+ 33 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/RefreshOffsetEvent.java

@@ -0,0 +1,33 @@
+package org.dbsyncer.common.event;
+
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.event.ApplicationContextEvent;
+
+import java.util.List;
+
+/**
+ * 刷新偏移量事件
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2023-08-23 22:45
+ */
+public final class RefreshOffsetEvent extends ApplicationContextEvent {
+
+    private List<ChangedOffset> offsetList;
+
+    /**
+     * Create a new ContextStartedEvent.
+     *
+     * @param source the {@code ApplicationContext} that the event is raised for
+     *               (must not be {@code null})
+     */
+    public RefreshOffsetEvent(ApplicationContext source, List<ChangedOffset> offsetList) {
+        super(source);
+        this.offsetList = offsetList;
+    }
+
+    public List<ChangedOffset> getOffsetList() {
+        return offsetList;
+    }
+}

+ 3 - 2
dbsyncer-common/src/main/java/org/dbsyncer/common/spi/Extractor.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.common.spi;
 
 import org.dbsyncer.common.event.ChangedEvent;
+import org.dbsyncer.common.event.ChangedOffset;
 import org.dbsyncer.common.event.Watcher;
 
 public interface Extractor {
@@ -32,9 +33,9 @@ public interface Extractor {
     /**
      * 更新增量点
      *
-     * @param event
+     * @param offset
      */
-    void refreshEvent(ChangedEvent event);
+    void refreshEvent(ChangedOffset offset);
 
     /**
      * 刷新增量点事件

+ 3 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.listener;
 
 import org.dbsyncer.common.event.ChangedEvent;
+import org.dbsyncer.common.event.ChangedOffset;
 import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
@@ -68,7 +69,7 @@ public abstract class AbstractExtractor implements Extractor {
     }
 
     @Override
-    public void refreshEvent(ChangedEvent event) {
+    public void refreshEvent(ChangedOffset offset) {
         // nothing to do
     }
 
@@ -76,6 +77,7 @@ public abstract class AbstractExtractor implements Extractor {
     public void flushEvent() {
         // 20s内更新,执行写入
         if (watcher.getMetaUpdateTime() > Timestamp.valueOf(LocalDateTime.now().minusSeconds(FLUSH_DELAYED_SECONDS)).getTime()) {
+            logger.info("snapshot:{}", snapshot);
             if (!CollectionUtils.isEmpty(snapshot)) {
                 watcher.flushEvent(snapshot);
             }

+ 4 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.listener.file;
 
 import org.apache.commons.io.IOUtils;
-import org.dbsyncer.common.event.ChangedEvent;
+import org.dbsyncer.common.event.ChangedOffset;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.file.BufferedRandomAccessFile;
 import org.dbsyncer.common.util.CollectionUtils;
@@ -131,9 +131,9 @@ public class FileExtractor extends AbstractExtractor {
     }
 
     @Override
-    public void refreshEvent(ChangedEvent event) {
-        if (event.getNextFileName() != null && event.getPosition() != null) {
-            snapshot.put(event.getNextFileName(), String.valueOf(event.getPosition()));
+    public void refreshEvent(ChangedOffset offset) {
+        if (offset.getNextFileName() != null && offset.getPosition() != null) {
+            snapshot.put(offset.getNextFileName(), String.valueOf(offset.getPosition()));
         }
     }
 

+ 3 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -10,7 +10,7 @@ import com.github.shyiko.mysql.binlog.event.TableMapEventData;
 import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
 import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
 import com.github.shyiko.mysql.binlog.network.ServerException;
-import org.dbsyncer.common.event.ChangedEvent;
+import org.dbsyncer.common.event.ChangedOffset;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
@@ -91,8 +91,8 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
     }
 
     @Override
-    public void refreshEvent(ChangedEvent event) {
-        refreshSnapshot(event.getNextFileName(), (Long) event.getPosition());
+    public void refreshEvent(ChangedOffset offset) {
+        refreshSnapshot(offset.getNextFileName(), (Long) offset.getPosition());
     }
 
     private void run() throws Exception {

+ 4 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.listener.postgresql;
 
-import org.dbsyncer.common.event.ChangedEvent;
+import org.dbsyncer.common.event.ChangedOffset;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.BooleanUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
@@ -132,8 +132,8 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
     }
 
     @Override
-    public void refreshEvent(ChangedEvent event) {
-        snapshot.put(LSN_POSITION, String.valueOf(event.getPosition()));
+    public void refreshEvent(ChangedOffset offset) {
+        snapshot.put(LSN_POSITION, String.valueOf(offset.getPosition()));
     }
 
     private void connect() throws SQLException {
@@ -282,8 +282,8 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
                     RowChangedEvent event = messageDecoder.processMessage(msg);
                     if(event != null){
                         event.setPosition(lsn.asString());
+                        sendChangedEvent(event);
                     }
-                    sendChangedEvent(event);
 
                     // feedback
                     stream.setAppliedLSN(lsn);

+ 4 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.listener.sqlserver;
 
 import com.microsoft.sqlserver.jdbc.SQLServerException;
-import org.dbsyncer.common.event.ChangedEvent;
+import org.dbsyncer.common.event.ChangedOffset;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.DatabaseConfig;
@@ -118,9 +118,9 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     }
 
     @Override
-    public void refreshEvent(ChangedEvent event) {
-        if (event.getPosition() != null) {
-            snapshot.put(LSN_POSITION, event.getPosition().toString());
+    public void refreshEvent(ChangedOffset offset) {
+        if (offset.getPosition() != null) {
+            snapshot.put(LSN_POSITION, offset.getPosition().toString());
         }
     }
 

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -341,7 +341,7 @@ public class ManagerFactory implements Manager, ApplicationListener<ClosedEvent>
     @Override
     public void onApplicationEvent(ClosedEvent event) {
         // 异步监听任务关闭事件
-        changeMetaState(event.getId(), MetaEnum.READY);
+        changeMetaState(event.getMetaId(), MetaEnum.READY);
     }
 
     private Puller getPuller(Mapping mapping) {

+ 29 - 16
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -1,17 +1,19 @@
 package org.dbsyncer.manager.puller;
 
 import org.dbsyncer.common.event.ChangedEvent;
+import org.dbsyncer.common.event.ChangedOffset;
 import org.dbsyncer.common.event.PageChangedEvent;
+import org.dbsyncer.common.event.RefreshOffsetEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.common.spi.Extractor;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.listener.AbstractExtractor;
-import org.dbsyncer.common.spi.Extractor;
 import org.dbsyncer.listener.Listener;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerTypeEnum;
@@ -30,6 +32,7 @@ import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
@@ -54,7 +57,7 @@ import java.util.stream.Collectors;
  * @date 2020/04/26 15:28
  */
 @Component
-public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob {
+public class IncrementPuller extends AbstractPuller implements ApplicationListener<RefreshOffsetEvent>, ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -125,6 +128,18 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         logger.info("关闭成功:{}", metaId);
     }
 
+    @Override
+    public void onApplicationEvent(RefreshOffsetEvent event) {
+        List<ChangedOffset> offsetList = event.getOffsetList();
+        if (!CollectionUtils.isEmpty(offsetList)) {
+            offsetList.forEach(offset -> {
+                if (offset.isRefreshOffset() && map.containsKey(offset.getMetaId())) {
+                    map.get(offset.getMetaId()).refreshEvent(offset);
+                }
+            });
+        }
+    }
+
     @Override
     public void run() {
         // 定时同步增量信息
@@ -134,7 +149,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
     private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta) throws InstantiationException, IllegalAccessException {
         AbstractConnectorConfig connectorConfig = connector.getConfig();
         ListenerConfig listenerConfig = mapping.getListener();
-
         // timing/log
         final String listenerType = listenerConfig.getListenerType();
 
@@ -143,14 +157,14 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         if (ListenerTypeEnum.isTiming(listenerType)) {
             AbstractQuartzExtractor quartzExtractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
             quartzExtractor.setCommands(list.stream().map(t -> new TableGroupQuartzCommand(t.getSourceTable(), t.getCommand())).collect(Collectors.toList()));
-            quartzExtractor.register(new QuartzConsumer(mapping, list));
+            quartzExtractor.register(new QuartzConsumer(meta, mapping, list));
             extractor = quartzExtractor;
         }
 
         // 基于日志抽取
         if (ListenerTypeEnum.isLog(listenerType)) {
             extractor = listener.getExtractor(ListenerTypeEnum.LOG, connectorConfig.getConnectorType(), AbstractExtractor.class);
-            extractor.register(new LogConsumer(mapping, list));
+            extractor.register(new LogConsumer(meta, mapping, list));
         }
 
         if (null != extractor) {
@@ -185,8 +199,8 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
 
         @Override
         public void changeEvent(ChangedEvent event) {
+            event.getChangedOffset().setMetaId(meta.getId());
             onChange((E) event);
-            meta.setUpdateTime(Instant.now().toEpochMilli());
         }
 
         @Override
@@ -209,9 +223,8 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
     final class QuartzConsumer extends AbstractConsumer<PageChangedEvent> {
 
         private List<FieldPicker> tablePicker = new LinkedList<>();
-        public QuartzConsumer(Mapping mapping, List<TableGroup> tableGroups) {
-            this.meta = manager.getMeta(mapping.getMetaId());
-            Assert.notNull(meta, "The meta is null.");
+        public QuartzConsumer(Meta meta, Mapping mapping, List<TableGroup> tableGroups) {
+            this.meta = meta;
             tableGroups.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
         }
 
@@ -221,8 +234,8 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             TableGroup tableGroup = picker.getTableGroup();
             event.setSourceTableName(tableGroup.getSourceTable().getName());
 
-            // 处理过程有异常向上抛
-            parser.execute(tableGroup, event);
+            // 定时暂不支持触发刷新增量点事件
+            parser.execute(tableGroup.getId(), event);
         }
     }
 
@@ -230,9 +243,8 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         private Extractor extractor;
         private Map<String, List<FieldPicker>> tablePicker = new LinkedHashMap<>();
 
-        public LogConsumer(Mapping mapping, List<TableGroup> tableGroups) {
-            this.meta = manager.getMeta(mapping.getMetaId());
-            Assert.notNull(meta, "The meta is null.");
+        public LogConsumer(Meta meta, Mapping mapping, List<TableGroup> tableGroups) {
+            this.meta = meta;
             tableGroups.forEach(t -> {
                 final Table table = t.getSourceTable();
                 final String tableName = table.getName();
@@ -247,14 +259,15 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             // 处理过程有异常向上抛
             List<FieldPicker> pickers = tablePicker.get(event.getSourceTableName());
             if (!CollectionUtils.isEmpty(pickers)) {
+                // 触发刷新增量点事件
+                event.getChangedOffset().setRefreshOffset(true);
                 pickers.forEach(picker -> {
                     final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
                     if (picker.filter(changedRow)) {
                         event.setChangedRow(changedRow);
-                        parser.execute(picker.getTableGroup(), event);
+                        parser.execute(picker.getTableGroup().getId(), event);
                     }
                 });
-                extractor.refreshEvent(event);
             }
         }
 

+ 3 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -161,10 +161,10 @@ public interface Parser {
     /**
      * 增量同步
      *
-     * @param tableGroup
-     * @param changedEvent
+     * @param tableGroupId 表关系ID
+     * @param changedEvent 增量事件
      */
-    void execute(TableGroup tableGroup, ChangedEvent changedEvent);
+    void execute(String tableGroupId, ChangedEvent changedEvent);
 
     /**
      * 批执行

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -306,8 +306,8 @@ public class ParserFactory implements Parser {
     }
 
     @Override
-    public void execute(TableGroup tableGroup, ChangedEvent event) {
-        syncBufferActuator.offer(new WriterRequest(tableGroup.getId(), event.getEvent(), event.getChangedRow()));
+    public void execute(String tableGroupId, ChangedEvent event) {
+        syncBufferActuator.offer(new WriterRequest(tableGroupId, event));
     }
 
     /**

+ 0 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -55,7 +55,6 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
         Assert.notNull(responseClazz, String.format("%s的父类%s泛型参数Response为空.", getClass().getName(), AbstractBufferActuator.class.getName()));
     }
 
-
     /**
      * 初始化配置
      *

+ 9 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.cache.CacheService;
+import org.dbsyncer.common.event.RefreshOffsetEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.model.IncrementConvertContext;
 import org.dbsyncer.common.model.Result;
@@ -20,6 +21,7 @@ import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
+import org.springframework.context.ApplicationContext;
 import org.springframework.util.Assert;
 
 import javax.annotation.Resource;
@@ -50,6 +52,9 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Resource
     private CacheService cacheService;
 
+    @Resource
+    private ApplicationContext applicationContext;
+
     @Override
     protected String getPartitionKey(WriterRequest request) {
         return request.getTableGroupId();
@@ -58,6 +63,7 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Override
     protected void partition(WriterRequest request, WriterResponse response) {
         response.getDataList().add(request.getRow());
+        response.getOffsetList().add(request.getChangedOffset());
         if (!response.isMerged()) {
             response.setTableGroupId(request.getTableGroupId());
             response.setEvent(request.getEvent());
@@ -106,6 +112,9 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
 
         // 7、执行批量处理后的
         pluginFactory.postProcessAfter(group.getPlugin(), context);
+
+        // 8.发布刷新增量点事件
+        applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
     }
 
     /**

+ 11 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.parser.model;
 
+import org.dbsyncer.common.event.ChangedEvent;
+import org.dbsyncer.common.event.ChangedOffset;
 import org.dbsyncer.parser.flush.BufferRequest;
 
 import java.util.Map;
@@ -13,14 +15,20 @@ public class WriterRequest extends AbstractWriter implements BufferRequest {
 
     private Map row;
 
-    public WriterRequest(String tableGroupId, String event, Map row) {
+    private ChangedOffset changedOffset;
+
+    public WriterRequest(String tableGroupId, ChangedEvent event) {
         setTableGroupId(tableGroupId);
-        setEvent(event);
-        this.row = row;
+        setEvent(event.getEvent());
+        this.row = event.getChangedRow();
+        this.changedOffset = event.getChangedOffset();
     }
 
     public Map getRow() {
         return row;
     }
 
+    public ChangedOffset getChangedOffset() {
+        return changedOffset;
+    }
 }

+ 7 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterResponse.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.parser.model;
 
+import org.dbsyncer.common.event.ChangedOffset;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.flush.BufferResponse;
 
@@ -16,6 +17,8 @@ public class WriterResponse extends AbstractWriter implements BufferResponse {
 
     private List<Map> dataList = new LinkedList<>();
 
+    private List<ChangedOffset> offsetList = new LinkedList<>();
+
     private boolean isMerged;
 
     @Override
@@ -32,6 +35,10 @@ public class WriterResponse extends AbstractWriter implements BufferResponse {
         return dataList;
     }
 
+    public List<ChangedOffset> getOffsetList() {
+        return offsetList;
+    }
+
     public boolean isMerged() {
         return isMerged;
     }