瀏覽代碼

修复Oracle增量同步,解析rowid映射

AE86 4 年之前
父節點
當前提交
0edd0c008c
共有 17 個文件被更改,包括 375 次插入136 次删除
  1. 8 11
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java
  2. 129 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/RowChangedEvent.java
  3. 5 4
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java
  4. 5 13
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java
  5. 4 3
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java
  6. 2 24
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/OracleExtractor.java
  7. 32 20
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java
  8. 3 1
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/RowEventListener.java
  9. 4 3
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java
  10. 13 9
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/FieldPicker.java
  11. 46 36
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java
  12. 5 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java
  13. 14 8
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  14. 56 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/enums/PrimaryKeyMappingEnum.java
  15. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java
  16. 21 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/PrimaryKeyMappingStrategy.java
  17. 27 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/OraclePrimaryKeyMappingStrategy.java

+ 8 - 11
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.common.event;
 
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -13,22 +12,20 @@ public interface Event {
     /**
      * 日志数据变更事件
      *
-     * @param tableName 表名
-     * @param event     事件
-     * @param before    变化前
-     * @param after     变化后
+     * @param rowChangedEvent
      */
-    void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after);
+    default void changedLogEvent(RowChangedEvent rowChangedEvent) {
+        // nothing to do
+    }
 
     /**
      * 定时数据变更事件
      *
-     * @param tableGroupIndex
-     * @param event
-     * @param before
-     * @param after
+     * @param rowChangedEvent
      */
-    void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after);
+    default void changedQuartzEvent(RowChangedEvent rowChangedEvent){
+        // nothing to do
+    }
 
     /**
      * 写入增量点事件

+ 129 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/RowChangedEvent.java

@@ -0,0 +1,129 @@
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
+package org.dbsyncer.common.event;
+
+import org.dbsyncer.common.util.JsonUtil;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 监听行变更事件
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-06-15 20:00
+ */
+public class RowChangedEvent {
+
+    private int                 tableGroupIndex;
+    private String              tableName;
+    private String              event;
+    private List<Object>        beforeData;
+    private List<Object>        afterData;
+    private String              rowId;
+    private String              pk;
+    private Map<String, Object> before;
+    private Map<String, Object> after;
+
+    /**
+     * 定时模式
+     *
+     * @param tableGroupIndex
+     * @param event
+     * @param before
+     * @param after
+     */
+    public RowChangedEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
+        this.tableGroupIndex = tableGroupIndex;
+        this.event = event;
+        this.before = before;
+        this.after = after;
+    }
+
+    /**
+     * Mysql
+     *
+     * @param tableName
+     * @param event
+     * @param beforeData
+     * @param afterData
+     */
+    public RowChangedEvent(String tableName, String event, List<Object> beforeData, List<Object> afterData) {
+        this.tableName = tableName;
+        this.event = event;
+        this.beforeData = beforeData;
+        this.afterData = afterData;
+    }
+
+    /**
+     * Oracle
+     *
+     * @param tableName
+     * @param event
+     * @param beforeData
+     * @param afterData
+     * @param rowId
+     */
+    public RowChangedEvent(String tableName, String event, List<Object> beforeData, List<Object> afterData, String rowId) {
+        this.tableName = tableName;
+        this.event = event;
+        this.beforeData = beforeData;
+        this.afterData = afterData;
+        this.rowId = rowId;
+    }
+
+    public int getTableGroupIndex() {
+        return tableGroupIndex;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public List<Object> getBeforeData() {
+        return beforeData;
+    }
+
+    public List<Object> getAfterData() {
+        return afterData;
+    }
+
+    public String getRowId() {
+        return rowId;
+    }
+
+    public String getPk() {
+        return pk;
+    }
+
+    public void setPk(String pk) {
+        this.pk = pk;
+    }
+
+    public Map<String, Object> getBefore() {
+        return before;
+    }
+
+    public void setBefore(Map<String, Object> before) {
+        this.before = before;
+    }
+
+    public Map<String, Object> getAfter() {
+        return after;
+    }
+
+    public void setAfter(Map<String, Object> after) {
+        this.after = after;
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.objToJson(this);
+    }
+}

+ 5 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.listener;
 
 import org.dbsyncer.common.event.Event;
+import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
@@ -40,16 +41,16 @@ public abstract class AbstractExtractor implements Extractor {
     }
 
     @Override
-    public void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
+    public void changedQuartzEvent(RowChangedEvent rowChangedEvent) {
         if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.changedQuartzEvent(tableGroupIndex, event, before, after));
+            watcher.forEach(w -> w.changedQuartzEvent(rowChangedEvent));
         }
     }
 
     @Override
-    public void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after) {
+    public void changedLogEvent(RowChangedEvent rowChangedEvent) {
         if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.changedLogEvent(tableName, event, before, after));
+            watcher.forEach(w -> w.changedLogEvent(rowChangedEvent));
         }
     }
 

+ 5 - 13
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -1,9 +1,7 @@
 package org.dbsyncer.listener;
 
 import org.dbsyncer.common.event.Event;
-
-import java.util.List;
-import java.util.Map;
+import org.dbsyncer.common.event.RowChangedEvent;
 
 public interface Extractor {
 
@@ -32,22 +30,16 @@ public interface Extractor {
     /**
      * 定时模式: 监听增量事件
      *
-     * @param tableGroupIndex
-     * @param event
-     * @param before
-     * @param after
+     * @param rowChangedEvent
      */
-    void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after);
+    void changedQuartzEvent(RowChangedEvent rowChangedEvent);
 
     /**
      * 日志模式: 监听增量事件
      *
-     * @param tableName
-     * @param event
-     * @param before
-     * @param after
+     * @param rowChangedEvent
      */
-    void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after);
+    void changedLogEvent(RowChangedEvent rowChangedEvent);
 
     /**
      * 刷新增量点事件

+ 4 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -2,6 +2,7 @@ package org.dbsyncer.listener.mysql;
 
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
@@ -185,7 +186,7 @@ public class MysqlExtractor extends AbstractExtractor {
                     List<Object> after = new ArrayList<>();
                     addAll(before, p.getBefore().getColumns());
                     addAll(after, p.getAfter().getColumns());
-                    changedLogEvent(tableName, ConnectorConstant.OPERTION_UPDATE, before, after);
+                    changedLogEvent(new RowChangedEvent(tableName, ConnectorConstant.OPERTION_UPDATE, before, after));
                     break;
                 }
                 return;
@@ -198,7 +199,7 @@ public class MysqlExtractor extends AbstractExtractor {
                 for (Row row : rows) {
                     List<Object> after = new ArrayList<>();
                     addAll(after, row.getColumns());
-                    changedLogEvent(tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after);
+                    changedLogEvent(new RowChangedEvent(tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after));
                     break;
                 }
                 return;
@@ -211,7 +212,7 @@ public class MysqlExtractor extends AbstractExtractor {
                 for (Row row : rows) {
                     List<Object> before = new ArrayList<>();
                     addAll(before, row.getColumns());
-                    changedLogEvent(tableName, ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST);
+                    changedLogEvent(new RowChangedEvent(tableName, ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST));
                     break;
                 }
                 return;

+ 2 - 24
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/OracleExtractor.java

@@ -1,17 +1,12 @@
 package org.dbsyncer.listener.oracle;
 
-import oracle.jdbc.dcn.TableChangeDescription;
 import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.oracle.dcn.DBChangeNotification;
-import org.dbsyncer.listener.oracle.dcn.RowChangeEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
-
 /**
  * @version 1.0.0
  * @Author AE86
@@ -31,7 +26,7 @@ public class OracleExtractor extends AbstractExtractor {
             String password = config.getPassword();
             String url = config.getUrl();
             client = new DBChangeNotification(username, password, url);
-            client.addRowEventListener((e) -> onEvent(e));
+            client.addRowEventListener((e) -> changedLogEvent(e));
             client.start();
         } catch (Exception e) {
             logger.error("启动失败:{}", e.getMessage());
@@ -41,26 +36,9 @@ public class OracleExtractor extends AbstractExtractor {
 
     @Override
     public void close() {
-        if(null != client){
+        if (null != client) {
             client.close();
         }
     }
 
-    private void onEvent(RowChangeEvent event){
-        if(event.getEvent() == TableChangeDescription.TableOperation.UPDATE.getCode()){
-            changedLogEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, event.getData());
-            return;
-        }
-
-        if(event.getEvent() == TableChangeDescription.TableOperation.INSERT.getCode()){
-            changedLogEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, event.getData());
-            return;
-        }
-
-        if(event.getEvent() == TableChangeDescription.TableOperation.DELETE.getCode()){
-            changedLogEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, event.getData(), Collections.EMPTY_LIST);
-            return;
-        }
-    }
-
 }

+ 32 - 20
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -7,6 +7,8 @@ import oracle.jdbc.OracleDriver;
 import oracle.jdbc.OracleStatement;
 import oracle.jdbc.dcn.*;
 import oracle.jdbc.driver.OracleConnection;
+import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,7 +110,7 @@ public class DBChangeNotification {
         }
     }
 
-    private void close(ResultSet rs){
+    private void close(ResultSet rs) {
         if (null != rs) {
             try {
                 rs.close();
@@ -204,35 +206,45 @@ public class DBChangeNotification {
             for (TableChangeDescription td : tds) {
                 RowChangeDescription[] rds = td.getRowChangeDescription();
                 for (RowChangeDescription rd : rds) {
-                    RowChangeDescription.RowOperation opr = rd.getRowOperation();
-                    parseEvent(tables.get(td.getObjectNumber()), rd.getRowid().stringValue(), opr);
+                    parseEvent(tables.get(td.getObjectNumber()), rd.getRowid().stringValue(), rd.getRowOperation());
                 }
             }
         }
 
         private void parseEvent(String tableName, String rowId, RowChangeDescription.RowOperation event) {
+            if (event.getCode() == TableChangeDescription.TableOperation.UPDATE.getCode()) {
+                RowChangedEvent rowChangedEvent = new RowChangedEvent(tableName, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, read(tableName, rowId), rowId);
+                listeners.forEach(e -> e.onEvents(rowChangedEvent));
+
+            }else if(event.getCode() == TableChangeDescription.TableOperation.INSERT.getCode()){
+                RowChangedEvent rowChangedEvent = new RowChangedEvent(tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, read(tableName, rowId), rowId);
+                listeners.forEach(e -> e.onEvents(rowChangedEvent));
+
+            }else{
+                RowChangedEvent rowChangedEvent = new RowChangedEvent(tableName, ConnectorConstant.OPERTION_DELETE, Collections.EMPTY_LIST, Collections.EMPTY_LIST, rowId);
+                listeners.forEach(e -> e.onEvents(rowChangedEvent));
+            }
+        }
+
+        private List<Object> read(String tableName, String rowId) {
             List<Object> data = new ArrayList<>();
-            data.add(rowId);
-
-            if(event.getCode() != TableChangeDescription.TableOperation.DELETE.getCode()){
-                ResultSet rs = null;
-                try {
-                    rs = statement.executeQuery(String.format(QUERY_ROW_DATA_SQL, tableName, rowId));
-                    final int size = rs.getMetaData().getColumnCount();
-                    while (rs.next()) {
-                        for (int i = 1; i <= size; i++) {
-                            data.add(rs.getObject(i));
-                        }
+            ResultSet rs = null;
+            try {
+                rs = statement.executeQuery(String.format(QUERY_ROW_DATA_SQL, tableName, rowId));
+                final int size = rs.getMetaData().getColumnCount();
+                while (rs.next()) {
+                    for (int i = 1; i <= size; i++) {
+                        data.add(rs.getObject(i));
                     }
-                } catch (SQLException e) {
-                    logger.error(e.getMessage());
-                } finally {
-                    close(rs);
                 }
+            } catch (SQLException e) {
+                logger.error(e.getMessage());
+            } finally {
+                close(rs);
             }
-
-            listeners.forEach(e -> e.onEvents(new RowChangeEvent(tableName, event.getCode(), data)));
+            return data;
         }
+
     }
 
 }

+ 3 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/RowEventListener.java

@@ -3,6 +3,8 @@
  */
 package org.dbsyncer.listener.oracle.dcn;
 
+import org.dbsyncer.common.event.RowChangedEvent;
+
 /**
  * 行变更监听器
  *
@@ -12,6 +14,6 @@ package org.dbsyncer.listener.oracle.dcn;
  */
 public interface RowEventListener {
 
-    void onEvents(RowChangeEvent event);
+    void onEvents(RowChangedEvent rowChangedEvent);
 
 }

+ 4 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.listener.quartz;
 
 import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.UUIDUtil;
@@ -89,15 +90,15 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
             for (Map<String, Object> row : data) {
                 event = row.get(eventFieldName);
                 if (update.contains(event)) {
-                    changedQuartzEvent(index, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_MAP, row);
+                    changedQuartzEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_MAP, row));
                     continue;
                 }
                 if (insert.contains(event)) {
-                    changedQuartzEvent(index, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_MAP, row);
+                    changedQuartzEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_MAP, row));
                     continue;
                 }
                 if (delete.contains(event)) {
-                    changedQuartzEvent(index, ConnectorConstant.OPERTION_DELETE, row, Collections.EMPTY_MAP);
+                    changedQuartzEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_DELETE, row, Collections.EMPTY_MAP));
                     continue;
                 }
 

+ 13 - 9
dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/FieldPicker.java

@@ -7,7 +7,6 @@ import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.Filter;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
-import org.dbsyncer.parser.model.DataEvent;
 import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.TableGroup;
 import org.springframework.util.Assert;
@@ -18,6 +17,7 @@ import java.util.stream.Collectors;
 public class FieldPicker {
 
     private TableGroup tableGroup;
+    private List<Field> pkList;
     private List<Node> index;
     private int indexSize;
     private boolean filterSwitch;
@@ -28,8 +28,9 @@ public class FieldPicker {
         this.tableGroup = tableGroup;
     }
 
-    public FieldPicker(TableGroup tableGroup, List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
+    public FieldPicker(TableGroup tableGroup, List<Field> pkList, List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
         this.tableGroup = tableGroup;
+        this.pkList = pkList;
         init(filter, column, fieldMapping);
     }
 
@@ -47,21 +48,16 @@ public class FieldPicker {
         return Collections.EMPTY_MAP;
     }
 
-    public TableGroup getTableGroup() {
-        return tableGroup;
-    }
-
     /**
      * 根据过滤条件过滤
      *
-     * @param data
+     * @param row
      * @return
      */
-    public boolean filter(DataEvent data) {
+    public boolean filter(Map<String, Object> row) {
         if (!filterSwitch) {
             return true;
         }
-        final Map<String, Object> row = data.getData();
         // where (id > 1 and id < 100) or (id = 100 or id =101)
         // 或 关系(成立任意条件)
         CompareFilter filter = null;
@@ -123,6 +119,14 @@ public class FieldPicker {
         this.indexSize = index.size();
     }
 
+    public TableGroup getTableGroup() {
+        return tableGroup;
+    }
+
+    public String getPk() {
+        return pkList.get(0).getName();
+    }
+
     final class Node {
         // 属性
         String name;

+ 46 - 36
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -1,11 +1,15 @@
 package org.dbsyncer.manager.puller.impl;
 
+import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.common.event.Event;
+import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.Extractor;
 import org.dbsyncer.listener.Listener;
@@ -15,13 +19,17 @@ import org.dbsyncer.listener.quartz.QuartzExtractor;
 import org.dbsyncer.listener.quartz.ScheduledTaskJob;
 import org.dbsyncer.listener.quartz.ScheduledTaskService;
 import org.dbsyncer.manager.Manager;
-import org.dbsyncer.manager.config.ExtractorConfig;
 import org.dbsyncer.manager.config.FieldPicker;
 import org.dbsyncer.manager.puller.AbstractPuller;
 import org.dbsyncer.parser.Parser;
+import org.dbsyncer.parser.enums.PrimaryKeyMappingEnum;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
-import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.strategy.PrimaryKeyMappingStrategy;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -143,8 +151,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
             QuartzExtractor extractor = listener.getExtractor(listenerType, QuartzExtractor.class);
             List<Map<String, String>> commands = list.stream().map(t -> t.getCommand()).collect(Collectors.toList());
 
-            ExtractorConfig config = new ExtractorConfig(connectorConfig, listenerConfig, meta.getMap(), new QuartzListener(mapping, list));
-            setExtractorConfig(extractor, config);
+            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new QuartzListener(mapping, list));
             extractor.setConnectorFactory(connectorFactory);
             extractor.setScheduledTaskService(scheduledTaskService);
             extractor.setCommands(commands);
@@ -155,36 +162,27 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         if (ListenerTypeEnum.isLog(listenerType)) {
             final String connectorType = connectorConfig.getConnectorType();
             AbstractExtractor extractor = listener.getExtractor(connectorType, AbstractExtractor.class);
+            PrimaryKeyMappingStrategy strategy = PrimaryKeyMappingEnum.getPrimaryKeyMappingStrategy(connectorType);
 
-            ExtractorConfig config = new ExtractorConfig(connectorConfig, listenerConfig, meta.getMap(), new LogListener(mapping, list));
-            setExtractorConfig(extractor, config);
+            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new LogListener(mapping, list, strategy));
             return extractor;
         }
         return null;
     }
 
-    private void setExtractorConfig(AbstractExtractor extractor, ExtractorConfig config) {
-        extractor.setConnectorConfig(config.getConnectorConfig());
-        extractor.setListenerConfig(config.getListenerConfig());
-        extractor.setMap(config.getMap());
-        extractor.addListener(config.getEvent());
+    private void setExtractorConfig(AbstractExtractor extractor, ConnectorConfig connector, ListenerConfig listener,
+                                    Map<String, String> map, Event event) {
+        extractor.setConnectorConfig(connector);
+        extractor.setListenerConfig(listener);
+        extractor.setMap(map);
+        extractor.addListener(event);
     }
 
     abstract class AbstractListener implements Event {
-        protected Mapping mapping;
-        protected String metaId;
+        protected Mapping       mapping;
+        protected String        metaId;
         protected AtomicBoolean changed = new AtomicBoolean();
 
-        @Override
-        public void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after) {
-            // nothing to do
-        }
-
-        @Override
-        public void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
-            // nothing to do
-        }
-
         @Override
         public void flushEvent(Map<String, String> map) {
             // 如果有变更,执行更新
@@ -238,13 +236,15 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         }
 
         @Override
-        public void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
-            final FieldPicker picker = tablePicker.get(tableGroupIndex);
-            logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}", picker.getTableGroup().getSourceTable().getName(), event, before, after);
+        public void changedQuartzEvent(RowChangedEvent rowChangedEvent) {
+            final FieldPicker picker = tablePicker.get(rowChangedEvent.getTableGroupIndex());
+            logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}", picker.getTableGroup().getSourceTable().getName(),
+                    rowChangedEvent.getEvent(),
+                    rowChangedEvent.getBefore(),
+                    rowChangedEvent.getAfter());
 
             // 处理过程有异常向上抛
-            DataEvent data = new DataEvent(event, before, after);
-            parser.execute(mapping, picker.getTableGroup(), data);
+            parser.execute(mapping, picker.getTableGroup(), rowChangedEvent, null);
 
             // 标记有变更记录
             changed.compareAndSet(false, true);
@@ -275,30 +275,40 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
 
         private Map<String, List<FieldPicker>> tablePicker;
 
-        public LogListener(Mapping mapping, List<TableGroup> list) {
+        private PrimaryKeyMappingStrategy strategy;
+
+        public LogListener(Mapping mapping, List<TableGroup> list, PrimaryKeyMappingStrategy strategy) {
             this.mapping = mapping;
             this.metaId = mapping.getMetaId();
             this.tablePicker = new LinkedHashMap<>();
+            this.strategy = strategy;
             list.forEach(t -> {
                 final Table table = t.getSourceTable();
                 final String tableName = table.getName();
+                List<Field> pkList = t.getTargetTable().getColumn().stream().filter(field -> field.isPk()).collect(Collectors.toList());
                 tablePicker.putIfAbsent(tableName, new ArrayList<>());
                 TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
-                tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
+                tablePicker.get(tableName).add(new FieldPicker(group, pkList, group.getFilter(), table.getColumn(), group.getFieldMapping()));
             });
         }
 
         @Override
-        public void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after) {
-            logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}", tableName, event, before, after);
+        public void changedLogEvent(RowChangedEvent rowChangedEvent) {
+            logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}, rowId:{}", rowChangedEvent.getTableName(),
+                    rowChangedEvent.getEvent(),
+                    rowChangedEvent.getBefore(), rowChangedEvent.getAfter(), rowChangedEvent.getRowId());
 
             // 处理过程有异常向上抛
-            List<FieldPicker> pickers = tablePicker.get(tableName);
+            List<FieldPicker> pickers = tablePicker.get(rowChangedEvent.getTableName());
             if (!CollectionUtils.isEmpty(pickers)) {
                 pickers.parallelStream().forEach(picker -> {
-                    DataEvent data = new DataEvent(event, picker.getColumns(before), picker.getColumns(after));
-                    if (picker.filter(data)) {
-                        parser.execute(mapping, picker.getTableGroup(), data);
+                    final Map<String, Object> before = picker.getColumns(rowChangedEvent.getBeforeData());
+                    final Map<String, Object> after = picker.getColumns(rowChangedEvent.getAfterData());
+                    if (picker.filter(StringUtils.equals(ConnectorConstant.OPERTION_DELETE, rowChangedEvent.getEvent()) ? before : after)) {
+                        rowChangedEvent.setBefore(before);
+                        rowChangedEvent.setAfter(after);
+                        rowChangedEvent.setPk(picker.getPk());
+                        parser.execute(mapping, picker.getTableGroup(), rowChangedEvent, strategy);
                     }
                 });
             }

+ 5 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.parser;
 
+import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Task;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.MetaInfo;
@@ -9,9 +10,9 @@ import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.listener.enums.QuartzFilterEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.model.Connector;
-import org.dbsyncer.parser.model.DataEvent;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.strategy.PrimaryKeyMappingStrategy;
 
 import java.util.List;
 import java.util.Map;
@@ -133,7 +134,8 @@ public interface Parser {
      *
      * @param mapping
      * @param tableGroup
-     * @param dataEvent
+     * @param rowChangedEvent
+     * @param strategy
      */
-    void execute(Mapping mapping, TableGroup tableGroup, DataEvent dataEvent);
+    void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent rowChangedEvent, PrimaryKeyMappingStrategy strategy);
 }

+ 14 - 8
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -1,7 +1,9 @@
 package org.dbsyncer.parser;
 
+import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.event.FullRefreshEvent;
+import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Task;
 import org.dbsyncer.common.util.CollectionUtils;
@@ -17,6 +19,7 @@ import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.enums.ParserEnum;
 import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.strategy.PrimaryKeyMappingStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
@@ -236,8 +239,8 @@ public class ParserFactory implements Parser {
     }
 
     @Override
-    public void execute(Mapping mapping, TableGroup tableGroup, DataEvent dataEvent) {
-        logger.info("{}", dataEvent);
+    public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent rowChangedEvent, PrimaryKeyMappingStrategy strategy) {
+        logger.info("{}", rowChangedEvent);
         final String metaId = mapping.getMetaId();
 
         ConnectorConfig tConfig = getConnectorConfig(mapping.getTargetConnectorId());
@@ -246,21 +249,24 @@ public class ParserFactory implements Parser {
         PickerUtil.pickFields(picker, tableGroup.getFieldMapping());
 
         // 1、映射字段
-        String event = dataEvent.getEvent();
-        Map<String, Object> data = dataEvent.getData();
+        final String event = rowChangedEvent.getEvent();
+        Map<String, Object> data = StringUtils.equals(ConnectorConstant.OPERTION_DELETE, event) ? rowChangedEvent.getBefore() : rowChangedEvent.getAfter();
         PickerUtil.pickData(picker, data);
 
-        // 2、参数转换
+        // 2、主键映射策略,Oracle需要替换主键为rowId
         Map<String, Object> target = picker.getTarget();
+        strategy.handle(target, rowChangedEvent);
+
+        // 3、参数转换
         ConvertUtil.convert(tableGroup.getConvert(), target);
 
-        // 3、插件转换
+        // 4、插件转换
         pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
 
-        // 4、写入目标源
+        // 5、写入目标源
         Result writer = connectorFactory.writer(tConfig, picker.getTargetFields(), tableGroup.getCommand(), event, target);
 
-        // 5、更新结果
+        // 6、更新结果
         List<Map<String, Object>> list = new ArrayList<>(1);
         list.add(target);
         flush(metaId, writer, event, list);

+ 56 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/enums/PrimaryKeyMappingEnum.java

@@ -0,0 +1,56 @@
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
+package org.dbsyncer.parser.enums;
+
+import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.connector.enums.ConnectorEnum;
+import org.dbsyncer.parser.strategy.PrimaryKeyMappingStrategy;
+import org.dbsyncer.parser.strategy.impl.OraclePrimaryKeyMappingStrategy;
+
+/**
+ * 主键映射策略枚举
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/10/19 14:19
+ */
+public enum PrimaryKeyMappingEnum {
+
+    /**
+     * Oracle
+     */
+    ORACLE(ConnectorEnum.ORACLE.getType(), new OraclePrimaryKeyMappingStrategy());
+
+    private String type;
+
+    private PrimaryKeyMappingStrategy primaryKeyMappingStrategy;
+
+    PrimaryKeyMappingEnum(String type, PrimaryKeyMappingStrategy primaryKeyMappingStrategy) {
+        this.type = type;
+        this.primaryKeyMappingStrategy = primaryKeyMappingStrategy;
+    }
+
+    /**
+     * 主键映射策略
+     *
+     * @param type
+     * @return
+     */
+    public static PrimaryKeyMappingStrategy getPrimaryKeyMappingStrategy(String type) {
+        for (PrimaryKeyMappingEnum e : PrimaryKeyMappingEnum.values()) {
+            if (StringUtils.equals(type, e.getType())) {
+                return e.getPrimaryKeyMappingStrategy();
+            }
+        }
+        return new PrimaryKeyMappingStrategy() {};
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public PrimaryKeyMappingStrategy getPrimaryKeyMappingStrategy() {
+        return primaryKeyMappingStrategy;
+    }
+}

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java

@@ -10,7 +10,7 @@ public interface FlushService {
     /**
      * 记录错误日志
      *
-     * @param metaId
+     * @param type
      * @param error
      */
     @Async("taskExecutor")

+ 21 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/PrimaryKeyMappingStrategy.java

@@ -0,0 +1,21 @@
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
+package org.dbsyncer.parser.strategy;
+
+import org.dbsyncer.common.event.RowChangedEvent;
+
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/10/19 11:53
+ */
+public interface PrimaryKeyMappingStrategy {
+
+    default void handle(Map<String, Object> row, RowChangedEvent rowChangedEvent) {
+        // nothing to do
+    }
+
+}

+ 27 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/OraclePrimaryKeyMappingStrategy.java

@@ -0,0 +1,27 @@
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
+package org.dbsyncer.parser.strategy.impl;
+
+import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.parser.strategy.PrimaryKeyMappingStrategy;
+
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/10/19 11:53
+ */
+public class OraclePrimaryKeyMappingStrategy implements PrimaryKeyMappingStrategy {
+
+    /**
+     * 替换主键
+     *
+     * @param row
+     */
+    @Override
+    public void handle(Map<String, Object> row, RowChangedEvent rowChangedEvent) {
+        row.put(rowChangedEvent.getPk(), rowChangedEvent.getRowId());
+    }
+}