Kaynağa Gözat

优化消费模型

Signed-off-by: AE86 <836391306@qq.com>
AE86 1 yıl önce
ebeveyn
işleme
6d21805612
22 değiştirilmiş dosya ile 369 ekleme ve 222 silme
  1. 51 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/ChangedEvent.java
  2. 78 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/CommonChangedEvent.java
  3. 28 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/PageChangedEvent.java
  4. 8 68
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/RowChangedEvent.java
  5. 4 2
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/Watcher.java
  6. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  7. 8 5
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractDatabaseExtractor.java
  8. 9 11
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java
  9. 2 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java
  10. 25 3
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java
  11. 2 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/DqlMysqlExtractor.java
  12. 18 6
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java
  13. 2 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/DqlOracleExtractor.java
  14. 31 62
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java
  15. 2 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/DqlPostgreSQLExtractor.java
  16. 4 3
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java
  17. 11 14
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java
  18. 2 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/DqlSqlServerExtractor.java
  19. 49 16
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java
  20. 22 13
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  21. 8 4
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java
  22. 4 4
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

+ 51 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/ChangedEvent.java

@@ -0,0 +1,51 @@
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
+package org.dbsyncer.common.event;
+
+import java.util.Map;
+
+/**
+ * 变更事件
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2023-08-20 20:00
+ */
+public interface ChangedEvent {
+
+    /**
+     * 获取变更表名称
+     *
+     * @return
+     */
+    String getSourceTableName();
+
+    /**
+     * 获取变更事件
+     *
+     * @return
+     */
+    String getEvent();
+
+    /**
+     * 获取变更行数据
+     *
+     * @return
+     */
+    Map<String, Object> getChangedRow();
+
+    /**
+     * 获取增量文件名称
+     * @return
+     */
+    String getNextFileName();
+
+    /**
+     * 获取增量偏移量
+     *
+     * @return
+     */
+    Object getPosition();
+
+}

+ 78 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/CommonChangedEvent.java

@@ -0,0 +1,78 @@
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
+package org.dbsyncer.common.event;
+
+import java.util.Map;
+
+/**
+ * 通用变更事件
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2023-08-20 20:00
+ */
+public class CommonChangedEvent implements ChangedEvent {
+
+    /**
+     * 变更表名称
+     */
+    private String sourceTableName;
+    /**
+     * 变更事件
+     */
+    private String event;
+    /**
+     * 变更行数据
+     */
+    private Map<String, Object> changedRow;
+    /**
+     * 增量文件名称
+     */
+    private String nextFileName;
+    /**
+     * 增量偏移量
+     */
+    private Object position;
+
+    public String getSourceTableName() {
+        return sourceTableName;
+    }
+
+    public void setSourceTableName(String sourceTableName) {
+        this.sourceTableName = sourceTableName;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public void setEvent(String event) {
+        this.event = event;
+    }
+
+    public Map<String, Object> getChangedRow() {
+        return changedRow;
+    }
+
+    public void setChangedRow(Map<String, Object> changedRow) {
+        this.changedRow = changedRow;
+    }
+
+    public String getNextFileName() {
+        return nextFileName;
+    }
+
+    public void setNextFileName(String nextFileName) {
+        this.nextFileName = nextFileName;
+    }
+
+    public Object getPosition() {
+        return position;
+    }
+
+    public CommonChangedEvent setPosition(Object position) {
+        this.position = position;
+        return this;
+    }
+}

+ 28 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/PageChangedEvent.java

@@ -0,0 +1,28 @@
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
+package org.dbsyncer.common.event;
+
+import java.util.Map;
+
+/**
+ * 分页变更事件
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2023-08-20 20:00
+ */
+public class PageChangedEvent extends CommonChangedEvent {
+
+    private int tableGroupIndex;
+
+    public PageChangedEvent(int index, String event, Map<String, Object> changedRow) {
+        this.tableGroupIndex = index;
+        setEvent(event);
+        setChangedRow(changedRow);
+    }
+
+    public int getTableGroupIndex() {
+        return tableGroupIndex;
+    }
+}

+ 8 - 68
dbsyncer-common/src/main/java/org/dbsyncer/common/event/RowChangedEvent.java

@@ -3,10 +3,7 @@
  */
 package org.dbsyncer.common.event;
 
-import org.dbsyncer.common.util.JsonUtil;
-
 import java.util.List;
-import java.util.Map;
 
 /**
  * 监听行变更事件
@@ -15,80 +12,23 @@ import java.util.Map;
  * @Author AE86
  * @Date 2020-06-15 20:00
  */
-public class RowChangedEvent {
-
-    private int tableGroupIndex;
-    private String sourceTableName;
-    private String event;
+public class RowChangedEvent extends CommonChangedEvent {
     private List<Object> dataList;
-    private Map<String, Object> dataMap;
-    private String nextFileName;
-    private Object position;
-
-    public RowChangedEvent(int tableGroupIndex, String event, Map<String, Object> data) {
-        this.tableGroupIndex = tableGroupIndex;
-        this.event = event;
-        this.dataMap = data;
-    }
 
     public RowChangedEvent(String sourceTableName, String event, List<Object> data) {
-        this.sourceTableName = sourceTableName;
-        this.event = event;
-        this.dataList = data;
-    }
-
-    public int getTableGroupIndex() {
-        return tableGroupIndex;
+        this(sourceTableName, event, data, null, null);
     }
 
-    public String getSourceTableName() {
-        return sourceTableName;
-    }
-
-    public void setSourceTableName(String sourceTableName) {
-        this.sourceTableName = sourceTableName;
-    }
-
-    public String getEvent() {
-        return event;
+    public RowChangedEvent(String sourceTableName, String event, List<Object> data, String nextFileName, Object position) {
+        setSourceTableName(sourceTableName);
+        setEvent(event);
+        setNextFileName(nextFileName);
+        setPosition(position);
+        this.dataList = data;
     }
 
     public List<Object> getDataList() {
         return dataList;
     }
 
-    public void setDataList(List<Object> dataList) {
-        this.dataList = dataList;
-    }
-
-    public Map<String, Object> getDataMap() {
-        return dataMap;
-    }
-
-    public void setDataMap(Map<String, Object> dataMap) {
-        this.dataMap = dataMap;
-    }
-
-    public String getNextFileName() {
-        return nextFileName;
-    }
-
-    public RowChangedEvent setNextFileName(String nextFileName) {
-        this.nextFileName = nextFileName;
-        return this;
-    }
-
-    public Object getPosition() {
-        return position;
-    }
-
-    public RowChangedEvent setPosition(Object position) {
-        this.position = position;
-        return this;
-    }
-
-    @Override
-    public String toString() {
-        return JsonUtil.objToJson(this);
-    }
 }

+ 4 - 2
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Watcher.java

@@ -3,6 +3,8 @@ package org.dbsyncer.common.event;
 import java.util.Map;
 
 /**
+ * 数据变更监听器
+ *
  * @version 1.0.0
  * @Author AE86
  * @Date 2020-05-11 22:50
@@ -14,10 +16,10 @@ public interface Watcher {
      *
      * @param event
      */
-    void changedEvent(RowChangedEvent event);
+    void changeEvent(ChangedEvent event);
 
     /**
-     * 写入增量点事件
+     * 持久化增量点事件
      *
      * @param snapshot
      */

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -131,7 +131,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         Assert.hasText(querySql, "查询语句不能为空.");
 
         // 2、设置参数
-        Collections.addAll(config.getArgs(), config.isSupportedCursor() ? getPageCursorArgs(config) : getPageArgs(config));
+        Collections.addAll(config.getArgs(), supportedCursor ? getPageCursorArgs(config) : getPageArgs(config));
 
         // 3、执行SQL
         List<Map<String, Object>> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));

+ 8 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractDatabaseExtractor.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.listener;
 
+import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
@@ -36,8 +37,8 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
      *
      * @param event
      */
-    protected void sendChangedEvent(RowChangedEvent event) {
-        changedEvent(event);
+    protected void sendChangedEvent(ChangedEvent event) {
+        changeEvent(event);
     }
 
     /**
@@ -45,7 +46,7 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
      *
      * @param event
      */
-    protected void sendDqlChangedEvent(RowChangedEvent event) {
+    protected void sendDqlChangedEvent(ChangedEvent event) {
         if (null == event) {
             return;
         }
@@ -54,20 +55,22 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
             return;
         }
 
+        RowChangedEvent changedEvent = (RowChangedEvent) event;
         boolean processed = false;
         for (DqlMapper dqlMapper : dqlMappers) {
             if (!processed) {
                 switch (event.getEvent()) {
                     case ConnectorConstant.OPERTION_UPDATE:
                     case ConnectorConstant.OPERTION_INSERT:
-                        queryDqlData(dqlMapper, event.getDataList());
+                        queryDqlData(dqlMapper, changedEvent.getDataList());
                         break;
                     default:
                         break;
                 }
                 processed = true;
             }
-            changedEvent(new RowChangedEvent(dqlMapper.sqlName, event.getEvent(), event.getDataList()));
+            changedEvent.setSourceTableName(dqlMapper.sqlName);
+            changeEvent(changedEvent);
         }
     }
 

+ 9 - 11
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.listener;
 
+import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.Watcher;
-import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
@@ -43,7 +43,7 @@ public abstract class AbstractExtractor implements Extractor {
     protected Map<String, String> snapshot;
     protected String metaId;
     private Watcher watcher;
-    private BlockingQueue<RowChangedEvent> queue;
+    private BlockingQueue<ChangedEvent> queue;
     private Thread consumer;
     private volatile boolean enableConsumer;
     private Lock lock = new ReentrantLock();
@@ -52,7 +52,6 @@ public abstract class AbstractExtractor implements Extractor {
     private static final int FLUSH_DELAYED_SECONDS = 20;
     private long updateTime;
 
-
     @Override
     public void start() {
         this.lock = new ReentrantLock();
@@ -62,19 +61,19 @@ public abstract class AbstractExtractor implements Extractor {
             while (enableConsumer) {
                 try {
                     // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
-                    RowChangedEvent event = queue.take();
+                    ChangedEvent event = queue.take();
                     if (null != event) {
                         // TODO 待优化多表并行模型
-                        watcher.changedEvent(event);
+                        watcher.changeEvent(event);
                         // 更新增量点
                         refreshEvent(event);
-                        updateTime = Instant.now().toEpochMilli();
                     }
                 } catch (InterruptedException e) {
                     break;
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                 }
+                updateTime = Instant.now().toEpochMilli();
             }
         });
         consumer.setName(new StringBuilder("extractor-consumer-").append(metaId).toString());
@@ -96,7 +95,7 @@ public abstract class AbstractExtractor implements Extractor {
     }
 
     @Override
-    public void changedEvent(RowChangedEvent event) {
+    public void changeEvent(ChangedEvent event) {
         if (null != event) {
             switch (event.getEvent()) {
                 case ConnectorConstant.OPERTION_UPDATE:
@@ -127,7 +126,6 @@ public abstract class AbstractExtractor implements Extractor {
         }
     }
 
-
     @Override
     public void forceFlushEvent() {
         logger.info("snapshot:{}", snapshot);
@@ -146,7 +144,7 @@ public abstract class AbstractExtractor implements Extractor {
      *
      * @param event
      */
-    protected void refreshEvent(RowChangedEvent event) {
+    protected void refreshEvent(ChangedEvent event) {
         // nothing to do
     }
 
@@ -164,7 +162,7 @@ public abstract class AbstractExtractor implements Extractor {
      * @param permitEvent
      * @param event
      */
-    private void processEvent(boolean permitEvent, RowChangedEvent event) {
+    private void processEvent(boolean permitEvent, ChangedEvent event) {
         if (!permitEvent) {
             return;
         }
@@ -224,7 +222,7 @@ public abstract class AbstractExtractor implements Extractor {
         this.metaId = metaId;
     }
 
-    public void setQueue(BlockingQueue<RowChangedEvent> queue) {
+    public void setQueue(BlockingQueue<ChangedEvent> queue) {
         this.queue = queue;
     }
 }

+ 2 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.listener;
 
+import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.Watcher;
-import org.dbsyncer.common.event.RowChangedEvent;
 
 public interface Extractor {
 
@@ -27,7 +27,7 @@ public interface Extractor {
      *
      * @param event
      */
-    void changedEvent(RowChangedEvent event);
+    void changeEvent(ChangedEvent event);
 
     /**
      * 刷新增量点事件

+ 25 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -1,8 +1,10 @@
 package org.dbsyncer.listener.file;
 
 import org.apache.commons.io.IOUtils;
+import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.file.BufferedRandomAccessFile;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.FileConfig;
@@ -29,6 +31,7 @@ import java.nio.file.StandardWatchEventKinds;
 import java.nio.file.WatchEvent;
 import java.nio.file.WatchKey;
 import java.nio.file.WatchService;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -106,6 +109,8 @@ public class FileExtractor extends AbstractExtractor {
                 raf.seek(NumberUtil.toLong(snapshot.get(filePosKey), 0L));
             } else {
                 raf.seek(raf.length());
+                snapshot.put(filePosKey, String.valueOf(raf.getFilePointer()));
+                super.forceFlushEvent();
             }
 
             pipeline.put(fileName, new PipelineResolver(fileSchema.getFields(), raf));
@@ -127,6 +132,13 @@ public class FileExtractor extends AbstractExtractor {
         }
     }
 
+    @Override
+    protected void refreshEvent(ChangedEvent event) {
+        if (event.getNextFileName() != null && event.getPosition() != null) {
+            snapshot.put(event.getNextFileName(), String.valueOf(event.getPosition()));
+        }
+    }
+
     private void closePipelineAndWatch() {
         try {
             pipeline.values().forEach(pipelineResolver -> IOUtils.closeQuietly(pipelineResolver.raf));
@@ -150,15 +162,25 @@ public class FileExtractor extends AbstractExtractor {
             final RandomAccessFile raf = pipelineResolver.raf;
 
             final String filePosKey = getFilePosKey(fileName);
+            List<List> list = new ArrayList<>();
             String line;
             while (null != (line = pipelineResolver.readLine())) {
-                snapshot.put(filePosKey, String.valueOf(raf.getFilePointer()));
                 if (StringUtil.isNotBlank(line)) {
-                    List<Object> row = fileResolver.parseList(pipelineResolver.fields, separator, line);
-                    changedEvent(new RowChangedEvent(fileName, ConnectorConstant.OPERTION_UPDATE, row));
+                    list.add(fileResolver.parseList(pipelineResolver.fields, separator, line));
                 }
             }
 
+            if (!CollectionUtils.isEmpty(list)) {
+                int size = list.size();
+                for (int i = 0; i < size; i++) {
+                    RowChangedEvent event = new RowChangedEvent(fileName, ConnectorConstant.OPERTION_UPDATE, list.get(i));
+                    if (i == size - 1) {
+                        event.setNextFileName(filePosKey);
+                        event.setPosition(raf.getFilePointer());
+                    }
+                    changeEvent(event);
+                }
+            }
         }
     }
 

+ 2 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/DqlMysqlExtractor.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.listener.mysql;
 
-import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.event.ChangedEvent;
 
 /**
  * @author AE86
@@ -16,7 +16,7 @@ public class DqlMysqlExtractor extends MysqlExtractor {
     }
 
     @Override
-    public void sendChangedEvent(RowChangedEvent event) {
+    public void sendChangedEvent(ChangedEvent event) {
         super.sendDqlChangedEvent(event);
     }
 }

+ 18 - 6
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -1,7 +1,16 @@
 package org.dbsyncer.listener.mysql;
 
-import com.github.shyiko.mysql.binlog.event.*;
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventHeader;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.RotateEventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
 import com.github.shyiko.mysql.binlog.network.ServerException;
+import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
@@ -14,7 +23,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -81,7 +93,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
     }
 
     @Override
-    protected void refreshEvent(RowChangedEvent event) {
+    protected void refreshEvent(ChangedEvent event) {
         refreshSnapshot(event.getNextFileName(), (Long) event.getPosition());
     }
 
@@ -251,7 +263,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                 if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
                         List<Object> after = Stream.of(m.getValue()).collect(Collectors.toList());
-                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_UPDATE, after).setNextFileName(client.getBinlogFilename()).setPosition(client.getBinlogPosition()));
+                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_UPDATE, after, client.getBinlogFilename(), client.getBinlogPosition()));
                     });
                 }
                 return;
@@ -262,7 +274,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                 if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
                         List<Object> after = Stream.of(m).collect(Collectors.toList());
-                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_INSERT, after).setNextFileName(client.getBinlogFilename()).setPosition(client.getBinlogPosition()));
+                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_INSERT, after, client.getBinlogFilename(), client.getBinlogPosition()));
                     });
                 }
                 return;
@@ -273,7 +285,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                 if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
                         List<Object> before = Stream.of(m).collect(Collectors.toList());
-                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_DELETE, before).setNextFileName(client.getBinlogFilename()).setPosition(client.getBinlogPosition()));
+                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_DELETE, before, client.getBinlogFilename(), client.getBinlogPosition()));
                     });
                 }
                 return;

+ 2 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/DqlOracleExtractor.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.listener.oracle;
 
-import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.connector.model.Field;
 
 import java.util.List;
@@ -19,7 +19,7 @@ public class DqlOracleExtractor extends OracleExtractor {
     }
 
     @Override
-    public void sendChangedEvent(RowChangedEvent event) {
+    public void sendChangedEvent(ChangedEvent event) {
         super.sendDqlChangedEvent(event);
     }
 

+ 31 - 62
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -5,7 +5,11 @@ package org.dbsyncer.listener.oracle.dcn;
 
 import oracle.jdbc.OracleDriver;
 import oracle.jdbc.OracleStatement;
-import oracle.jdbc.dcn.*;
+import oracle.jdbc.dcn.DatabaseChangeEvent;
+import oracle.jdbc.dcn.DatabaseChangeListener;
+import oracle.jdbc.dcn.DatabaseChangeRegistration;
+import oracle.jdbc.dcn.RowChangeDescription;
+import oracle.jdbc.dcn.TableChangeDescription;
 import oracle.jdbc.driver.OracleConnection;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.StringUtil;
@@ -20,9 +24,12 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -53,10 +60,8 @@ public class DBChangeNotification {
     private OracleConnection conn;
     private DatabaseChangeRegistration dcr;
     private Map<Integer, String> tables;
-    private Worker worker;
     private Set<String> filterTable;
     private List<RowEventListener> listeners = new ArrayList<>();
-    private BlockingQueue<DCNEvent> queue = new LinkedBlockingQueue<>(100);
     private final Lock connectLock = new ReentrantLock();
     private volatile boolean connected;
 
@@ -101,12 +106,6 @@ public class DBChangeNotification {
             clean(statement, regId, callback);
             statement.setDatabaseChangeRegistration(dcr);
 
-            // 启动消费线程
-            worker = new Worker();
-            worker.setName(new StringBuilder("dcn-parser-").append(host).append(":").append(port).append("_").append(regId).toString());
-            worker.setDaemon(false);
-            worker.start();
-
             // 配置监听表
             for (Map.Entry<Integer, String> m : tables.entrySet()) {
                 String sql = String.format(QUERY_TABLE_SQL, m.getValue());
@@ -137,10 +136,6 @@ public class DBChangeNotification {
 
     public void close() {
         connected = false;
-        if (null != worker && !worker.isInterrupted()) {
-            worker.interrupt();
-            worker = null;
-        }
         try {
             if (null != conn) {
                 conn.unregisterDatabaseChangeNotification(dcr);
@@ -155,8 +150,6 @@ public class DBChangeNotification {
         if (null != rs) {
             try {
                 rs.close();
-            } catch (SQLException e) {
-                logger.error(e.getMessage());
             } catch (Exception e) {
                 logger.error(e.getMessage());
             }
@@ -320,6 +313,24 @@ public class DBChangeNotification {
         T apply(ResultSet rs) throws SQLException;
     }
 
+    private void parseEvent(DCNEvent event) {
+        List<Object> data = new ArrayList<>();
+        if (event.getCode() == TableChangeDescription.TableOperation.UPDATE.getCode()) {
+            read(event.getTableName(), event.getRowId(), data);
+            listeners.forEach(listener -> listener.onEvents(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, data)));
+            return;
+        }
+
+        if (event.getCode() == TableChangeDescription.TableOperation.INSERT.getCode()) {
+            read(event.getTableName(), event.getRowId(), data);
+            listeners.forEach(listener -> listener.onEvents(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, data)));
+            return;
+        }
+
+        data.add(event.getRowId());
+        listeners.forEach(listener -> listener.onEvents(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, data)));
+    }
+
     final class DCNListener implements DatabaseChangeListener {
 
         @Override
@@ -329,7 +340,7 @@ public class DBChangeNotification {
                 return;
             }
             DatabaseChangeEvent.EventType eventType = event.getEventType();
-            if(eventType == DatabaseChangeEvent.EventType.OBJCHANGE){
+            if (eventType == DatabaseChangeEvent.EventType.OBJCHANGE) {
                 for (TableChangeDescription td : event.getTableChangeDescription()) {
                     RowChangeDescription[] rds = td.getRowChangeDescription();
                     for (RowChangeDescription rd : rds) {
@@ -338,14 +349,7 @@ public class DBChangeNotification {
                             logger.info("Table[{}] {}", tableName, rd.getRowOperation().name());
                             continue;
                         }
-                        try {
-                            // 如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续
-                            queue.put(new DCNEvent(tableName, rd.getRowid().stringValue(),
-                                    rd.getRowOperation().getCode()));
-                        } catch (InterruptedException ex) {
-                            logger.error("Table[{}], RowId:{}, Code:{}, Error:{}", tableName,
-                                    rd.getRowid().stringValue(), rd.getRowOperation().getCode(), ex.getMessage());
-                        }
+                        parseEvent(new DCNEvent(tableName, rd.getRowid().stringValue(), rd.getRowOperation().getCode()));
                     }
                 }
                 return;
@@ -372,39 +376,4 @@ public class DBChangeNotification {
 
     }
 
-    final class Worker extends Thread {
-
-        @Override
-        public void run() {
-            while (!isInterrupted() && connected) {
-                try {
-                    // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
-                    DCNEvent event = queue.take();
-                    if (null != event) {
-                        parseEvent(event);
-                    }
-                } catch (InterruptedException e) {
-                    break;
-                } catch (Exception e) {
-                    logger.error(e.getMessage());
-                }
-            }
-        }
-
-        private void parseEvent(DCNEvent event) {
-            List<Object> data = new ArrayList<>();
-            if (event.getCode() == TableChangeDescription.TableOperation.UPDATE.getCode()) {
-                read(event.getTableName(), event.getRowId(), data);
-                listeners.forEach(listener -> listener.onEvents(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, data)));
-
-            } else if (event.getCode() == TableChangeDescription.TableOperation.INSERT.getCode()) {
-                read(event.getTableName(), event.getRowId(), data);
-                listeners.forEach(listener -> listener.onEvents(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, data)));
-
-            } else {
-                data.add(event.getRowId());
-                listeners.forEach(listener -> listener.onEvents(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, data)));
-            }
-        }
-    }
 }

+ 2 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/DqlPostgreSQLExtractor.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.listener.postgresql;
 
-import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.event.ChangedEvent;
 
 /**
  * @author AE86
@@ -16,7 +16,7 @@ public class DqlPostgreSQLExtractor extends PostgreSQLExtractor {
     }
 
     @Override
-    public void sendChangedEvent(RowChangedEvent event) {
+    public void sendChangedEvent(ChangedEvent event) {
         super.sendDqlChangedEvent(event);
     }
 }

+ 4 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.listener.postgresql;
 
+import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.BooleanUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
@@ -133,8 +134,8 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
     }
 
     @Override
-    protected void refreshEvent(RowChangedEvent event) {
-        snapshot.put(LSN_POSITION, LogSequenceNumber.valueOf((Long) event.getPosition()).asString());
+    protected void refreshEvent(ChangedEvent event) {
+        snapshot.put(LSN_POSITION, String.valueOf(event.getPosition()));
     }
 
     private void connect() throws SQLException {
@@ -282,7 +283,7 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
                     // process decoder
                     RowChangedEvent event = messageDecoder.processMessage(msg);
                     if(event != null){
-                        event.setPosition(lsn.asLong());
+                        event.setPosition(lsn.asString());
                     }
                     sendChangedEvent(event);
 

+ 11 - 14
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.listener.quartz;
 
-import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.event.PageChangedEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.spi.ConnectorMapper;
@@ -33,8 +33,8 @@ import java.util.stream.Stream;
 public abstract class AbstractQuartzExtractor extends AbstractExtractor implements ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
-    private static final String CURSOR = "cursor";
-    private static final int READ_NUM = 1000;
+    private final String CURSOR = "cursor";
+    private final int READ_NUM = 1000;
     private List<TableGroupQuartzCommand> commands;
     private String eventFieldName;
     private boolean forceUpdate;
@@ -119,27 +119,24 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
                 break;
             }
 
-            Object event = null;
             for (Map<String, Object> row : data) {
                 if (forceUpdate) {
-                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
+                    changeEvent(new PageChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
                     continue;
                 }
 
-                event = row.get(eventFieldName);
-                if (update.contains(event)) {
-                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
+                Object eventValue = row.get(eventFieldName);
+                if (update.contains(eventValue)) {
+                    changeEvent(new PageChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
                     continue;
                 }
-                if (insert.contains(event)) {
-                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_INSERT, row));
+                if (insert.contains(eventValue)) {
+                    changeEvent(new PageChangedEvent(index, ConnectorConstant.OPERTION_INSERT, row));
                     continue;
                 }
-                if (delete.contains(event)) {
-                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_DELETE, row));
-                    continue;
+                if (delete.contains(eventValue)) {
+                    changeEvent(new PageChangedEvent(index, ConnectorConstant.OPERTION_DELETE, row));
                 }
-
             }
             // 更新记录点
             cursors = PrimaryKeyUtil.getLastCursors(data, primaryKeys);

+ 2 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/DqlSqlServerExtractor.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.listener.sqlserver;
 
-import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.event.ChangedEvent;
 
 /**
  * @author AE86
@@ -16,7 +16,7 @@ public class DqlSqlServerExtractor extends SqlServerExtractor {
     }
 
     @Override
-    public void sendChangedEvent(RowChangedEvent event) {
+    public void sendChangedEvent(ChangedEvent event) {
         super.sendDqlChangedEvent(event);
     }
 }

+ 49 - 16
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.listener.sqlserver;
 
 import com.microsoft.sqlserver.jdbc.SQLServerException;
+import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.DatabaseConfig;
@@ -16,13 +17,17 @@ import org.springframework.util.Assert;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -62,7 +67,11 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     private Lsn lastLsn;
     private String serverName;
     private String schema;
-    private LinkedBlockingQueue<Lsn> stopLsnQueue = new LinkedBlockingQueue<>(256);
+    private final int BUFFER_CAPACITY = 256;
+    private BlockingQueue<Lsn> buffer = new LinkedBlockingQueue<>(BUFFER_CAPACITY);
+    private Lock lock = new ReentrantLock();
+    private Condition isFull = lock.newCondition();
+    private final Duration pollInterval = Duration.of(500, ChronoUnit.MILLIS);
 
     @Override
     public void start() {
@@ -111,8 +120,10 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     }
 
     @Override
-    protected void refreshEvent(RowChangedEvent event) {
-        snapshot.put(LSN_POSITION, event.getPosition().toString());
+    protected void refreshEvent(ChangedEvent event) {
+        if (event.getPosition() != null) {
+            snapshot.put(LSN_POSITION, event.getPosition().toString());
+        }
     }
 
     private void close(AutoCloseable closeable) {
@@ -256,20 +267,22 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     }
 
     private void parseEvent(List<CDCEvent> list, Lsn stopLsn) {
-        for (CDCEvent event : list) {
-            int code = event.getCode();
-            if (TableOperationEnum.isUpdateAfter(code)) {
-                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, event.getRow()).setPosition(stopLsn));
+        int size = list.size();
+        for (int i = 0; i < size; i++) {
+            boolean isEnd = i == size - 1;
+            CDCEvent event = list.get(i);
+            if (TableOperationEnum.isUpdateAfter(event.getCode())) {
+                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, event.getRow(), null, (isEnd ? stopLsn : null)));
                 continue;
             }
 
-            if (TableOperationEnum.isInsert(code)) {
-                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, event.getRow()).setPosition(stopLsn));
+            if (TableOperationEnum.isInsert(event.getCode())) {
+                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, event.getRow(), null, (isEnd ? stopLsn : null)));
                 continue;
             }
 
-            if (TableOperationEnum.isDelete(code)) {
-                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, event.getRow()).setPosition(stopLsn));
+            if (TableOperationEnum.isDelete(event.getCode())) {
+                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, event.getRow(), null, (isEnd ? stopLsn : null)));
             }
         }
     }
@@ -336,9 +349,9 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
         public void run() {
             while (!isInterrupted() && connected) {
                 try {
-                    Lsn stopLsn = stopLsnQueue.take();
+                    Lsn stopLsn = buffer.take();
                     Lsn poll;
-                    while ((poll = stopLsnQueue.poll()) != null) {
+                    while ((poll = buffer.poll()) != null) {
                         stopLsn = poll;
                     }
                     if (!stopLsn.isAvailable() || stopLsn.compareTo(lastLsn) <= 0) {
@@ -349,6 +362,8 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
 
                     lastLsn = stopLsn;
                     snapshot.put(LSN_POSITION, lastLsn.toString());
+                } catch (InterruptedException e) {
+                    break;
                 } catch (Exception e) {
                     if (connected) {
                         logger.error(e.getMessage(), e);
@@ -364,10 +379,28 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     }
 
     public void pushStopLsn(Lsn stopLsn) {
-        if (stopLsnQueue.contains(stopLsn)) {
+        if (buffer.contains(stopLsn)) {
             return;
         }
-        // TODO 优化采用阻塞写
-        stopLsnQueue.offer(stopLsn);
+        boolean lock = false;
+        try {
+            lock = this.lock.tryLock();
+            if (lock) {
+                if (!buffer.offer(stopLsn)) {
+                    logger.warn("[{}]缓存队列容量已达上限,正在重试", this.getClass().getSimpleName(), BUFFER_CAPACITY);
+                    while (!buffer.offer(stopLsn) && connected) {
+                        try {
+                            this.isFull.await(pollInterval.toMillis(), TimeUnit.MILLISECONDS);
+                        } catch (InterruptedException e) {
+                            break;
+                        }
+                    }
+                }
+            }
+        } finally {
+            if (lock) {
+                this.lock.unlock();
+            }
+        }
     }
 }

+ 22 - 13
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.manager.puller;
 
+import org.dbsyncer.common.event.ChangedEvent;
+import org.dbsyncer.common.event.PageChangedEvent;
 import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
@@ -178,9 +180,16 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         throw new ManagerException("未知的监听配置.");
     }
 
-    abstract class AbstractConsumer implements Watcher {
+    abstract class AbstractConsumer<E extends ChangedEvent> implements Watcher {
         protected Mapping mapping;
 
+        public abstract void onChange(E e);
+
+        @Override
+        public void changeEvent(ChangedEvent event) {
+            onChange((E) event);
+        }
+
         @Override
         public void flushEvent(Map<String, String> snapshot) {
             Meta meta = manager.getMeta(mapping.getMetaId());
@@ -196,7 +205,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         }
     }
 
-    final class QuartzConsumer extends AbstractConsumer {
+    final class QuartzConsumer extends AbstractConsumer<PageChangedEvent> {
 
         private List<FieldPicker> tablePicker = new LinkedList<>();
         public QuartzConsumer(Mapping mapping, List<TableGroup> tableGroups) {
@@ -205,17 +214,17 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         }
 
         @Override
-        public void changedEvent(RowChangedEvent rowChangedEvent) {
-            final FieldPicker picker = tablePicker.get(rowChangedEvent.getTableGroupIndex());
+        public void onChange(PageChangedEvent event) {
+            final FieldPicker picker = tablePicker.get(event.getTableGroupIndex());
             TableGroup tableGroup = picker.getTableGroup();
-            rowChangedEvent.setSourceTableName(tableGroup.getSourceTable().getName());
+            event.setSourceTableName(tableGroup.getSourceTable().getName());
 
             // 处理过程有异常向上抛
-            parser.execute(mapping, tableGroup, rowChangedEvent);
+            parser.execute(mapping, tableGroup, event);
         }
     }
 
-    final class LogConsumer extends AbstractConsumer {
+    final class LogConsumer extends AbstractConsumer<RowChangedEvent> {
 
         private Map<String, List<FieldPicker>> tablePicker = new LinkedHashMap<>();
 
@@ -231,15 +240,15 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         }
 
         @Override
-        public void changedEvent(RowChangedEvent rowChangedEvent) {
+        public void onChange(RowChangedEvent event) {
             // 处理过程有异常向上抛
-            List<FieldPicker> pickers = tablePicker.get(rowChangedEvent.getSourceTableName());
+            List<FieldPicker> pickers = tablePicker.get(event.getSourceTableName());
             if (!CollectionUtils.isEmpty(pickers)) {
                 pickers.forEach(picker -> {
-                    final Map<String, Object> dataMap = picker.getColumns(rowChangedEvent.getDataList());
-                    if (picker.filter(dataMap)) {
-                        rowChangedEvent.setDataMap(dataMap);
-                        parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
+                    final Map<String, Object> changedRow = picker.getColumns(event.getDataList());
+                    if (picker.filter(changedRow)) {
+                        event.setChangedRow(changedRow);
+                        parser.execute(mapping, picker.getTableGroup(), event);
                     }
                 });
             }

+ 8 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.parser;
 
-import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.spi.ConnectorMapper;
@@ -12,7 +12,11 @@ import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.listener.enums.QuartzFilterEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
-import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.model.BatchWriter;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.Task;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 
 import java.util.List;
@@ -159,9 +163,9 @@ public interface Parser {
      *
      * @param mapping
      * @param tableGroup
-     * @param rowChangedEvent
+     * @param changedEvent
      */
-    void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent rowChangedEvent);
+    void execute(Mapping mapping, TableGroup tableGroup, ChangedEvent changedEvent);
 
     /**
      * 批执行

+ 4 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.parser;
 
 import org.dbsyncer.cache.CacheService;
-import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.model.FullConvertContext;
 import org.dbsyncer.common.model.Result;
@@ -308,9 +308,9 @@ public class ParserFactory implements Parser {
     }
 
     @Override
-    public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event) {
-        logger.debug("Table[{}] {}, data:{}", event.getSourceTableName(), event.getEvent(), event.getDataMap());
-        parserStrategy.execute(tableGroup.getId(), event.getEvent(), event.getDataMap());
+    public void execute(Mapping mapping, TableGroup tableGroup, ChangedEvent event) {
+        logger.debug("Table[{}] {}, data:{}", event.getSourceTableName(), event.getEvent(), event.getChangedRow());
+        parserStrategy.execute(tableGroup.getId(), event.getEvent(), event.getChangedRow());
     }
 
     /**