AE86 2 年之前
父節點
當前提交
69dc49e5ea

+ 14 - 43
dbsyncer-common/src/main/java/org/dbsyncer/common/event/RowChangedEvent.java

@@ -19,25 +19,20 @@ public class RowChangedEvent {
 
     private int tableGroupIndex;
     private String sourceTableName;
-    private String targetTableName;
     private String event;
-    private List<Object> beforeData;
-    private List<Object> afterData;
-    private Map<String, Object> before;
-    private Map<String, Object> after;
+    private List<Object> dataList;
+    private Map<String, Object> dataMap;
 
-    public RowChangedEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
+    public RowChangedEvent(int tableGroupIndex, String event, Map<String, Object> data) {
         this.tableGroupIndex = tableGroupIndex;
         this.event = event;
-        this.before = before;
-        this.after = after;
+        this.dataMap = data;
     }
 
-    public RowChangedEvent(String sourceTableName, String event, List<Object> beforeData, List<Object> afterData) {
+    public RowChangedEvent(String sourceTableName, String event, List<Object> data) {
         this.sourceTableName = sourceTableName;
         this.event = event;
-        this.beforeData = beforeData;
-        this.afterData = afterData;
+        this.dataList = data;
     }
 
     public int getTableGroupIndex() {
@@ -52,48 +47,24 @@ public class RowChangedEvent {
         this.sourceTableName = sourceTableName;
     }
 
-    public String getTargetTableName() {
-        return targetTableName;
-    }
-
-    public void setTargetTableName(String targetTableName) {
-        this.targetTableName = targetTableName;
-    }
-
     public String getEvent() {
         return event;
     }
 
-    public List<Object> getBeforeData() {
-        return beforeData;
-    }
-
-    public void setBeforeData(List<Object> beforeData) {
-        this.beforeData = beforeData;
-    }
-
-    public List<Object> getAfterData() {
-        return afterData;
-    }
-
-    public void setAfterData(List<Object> afterData) {
-        this.afterData = afterData;
-    }
-
-    public Map<String, Object> getBefore() {
-        return before;
+    public List<Object> getDataList() {
+        return dataList;
     }
 
-    public void setBefore(Map<String, Object> before) {
-        this.before = before;
+    public void setDataList(List<Object> dataList) {
+        this.dataList = dataList;
     }
 
-    public Map<String, Object> getAfter() {
-        return after;
+    public Map<String, Object> getDataMap() {
+        return dataMap;
     }
 
-    public void setAfter(Map<String, Object> after) {
-        this.after = after;
+    public void setDataMap(Map<String, Object> dataMap) {
+        this.dataMap = dataMap;
     }
 
     @Override

+ 1 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractDatabaseExtractor.java

@@ -2,7 +2,6 @@ package org.dbsyncer.listener;
 
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
@@ -39,7 +38,7 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
             switch (event.getEvent()){
                 case ConnectorConstant.OPERTION_UPDATE:
                 case ConnectorConstant.OPERTION_INSERT:
-                    event.setAfterData(queryData(event.getAfterData()));
+                    event.setDataList(queryData(event.getDataList()));
                     break;
                 default:
                     break;

+ 1 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.file.*;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -149,7 +148,7 @@ public class FileExtractor extends AbstractExtractor {
                 snapshot.put(filePosKey, String.valueOf(raf.getFilePointer()));
                 if (StringUtil.isNotBlank(line)) {
                     List<Object> row = fileResolver.parseList(pipelineResolver.fields, separator, line);
-                    changedEvent(new RowChangedEvent(fileName, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, row));
+                    changedEvent(new RowChangedEvent(fileName, ConnectorConstant.OPERTION_UPDATE, row));
                 }
             }
 

+ 3 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -228,9 +228,8 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                 UpdateRowsEventData data = event.getData();
                 if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
-                        List<Object> before = Stream.of(m.getKey()).collect(Collectors.toList());
                         List<Object> after = Stream.of(m.getValue()).collect(Collectors.toList());
-                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_UPDATE, before, after));
+                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_UPDATE, after));
                     });
                 }
                 return;
@@ -241,7 +240,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                 if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
                         List<Object> after = Stream.of(m).collect(Collectors.toList());
-                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after));
+                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_INSERT, after));
                     });
                 }
                 return;
@@ -252,7 +251,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                 if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
                         List<Object> before = Stream.of(m).collect(Collectors.toList());
-                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST));
+                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_DELETE, before));
                     });
                 }
                 return;

+ 3 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -353,15 +353,15 @@ public class DBChangeNotification {
             List<Object> data = new ArrayList<>();
             if (event.getCode() == TableChangeDescription.TableOperation.UPDATE.getCode()) {
                 read(event.getTableName(), event.getRowId(), data);
-                listeners.forEach(listener -> listener.onEvents(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, data)));
+                listeners.forEach(listener -> listener.onEvents(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, data)));
 
             } else if (event.getCode() == TableChangeDescription.TableOperation.INSERT.getCode()) {
                 read(event.getTableName(), event.getRowId(), data);
-                listeners.forEach(listener -> listener.onEvents(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, data)));
+                listeners.forEach(listener -> listener.onEvents(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, data)));
 
             } else {
                 data.add(event.getRowId());
-                listeners.forEach(listener -> listener.onEvents(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, data, Collections.EMPTY_LIST)));
+                listeners.forEach(listener -> listener.onEvents(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, data)));
             }
         }
     }

+ 1 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/PgOutputMessageDecoder.java

@@ -138,10 +138,7 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
                 case "O":
                     List<Object> data = new ArrayList<>();
                     readTupleData(tableId, buffer, data);
-                    if (MessageTypeEnum.DELETE == type) {
-                        return new RowChangedEvent(tableId.tableName, type.name(), data, Collections.EMPTY_LIST);
-                    }
-                    return new RowChangedEvent(tableId.tableName, type.name(), Collections.EMPTY_LIST, data);
+                    return new RowChangedEvent(tableId.tableName, type.name(), data);
 
                 default:
                     logger.info("N, K, O not set, got instead {}", newTuple);

+ 1 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/TestDecodingMessageDecoder.java

@@ -81,11 +81,8 @@ public class TestDecodingMessageDecoder extends AbstractMessageDecoder {
         switch (eventType) {
             case ConnectorConstant.OPERTION_UPDATE:
             case ConnectorConstant.OPERTION_INSERT:
-                event = new RowChangedEvent(table, eventType, Collections.EMPTY_LIST, data);
-                break;
-
             case ConnectorConstant.OPERTION_DELETE:
-                event = new RowChangedEvent(table, eventType, data, Collections.EMPTY_LIST);
+                event = new RowChangedEvent(table, eventType, data);
                 break;
 
             default:

+ 4 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -13,7 +13,6 @@ import org.dbsyncer.listener.AbstractExtractor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -113,21 +112,21 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
             Object event = null;
             for (Map<String, Object> row : data) {
                 if(StringUtil.isBlank(eventFieldName)){
-                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_MAP, row));
+                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
                     continue;
                 }
 
                 event = row.get(eventFieldName);
                 if (update.contains(event)) {
-                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_MAP, row));
+                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, row));
                     continue;
                 }
                 if (insert.contains(event)) {
-                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_MAP, row));
+                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_INSERT, row));
                     continue;
                 }
                 if (delete.contains(event)) {
-                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_DELETE, row, Collections.EMPTY_MAP));
+                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_DELETE, row));
                     continue;
                 }
 

+ 3 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -246,17 +246,17 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
         for (CDCEvent event : list) {
             int code = event.getCode();
             if (TableOperationEnum.isUpdateAfter(code)) {
-                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, event.getRow()));
+                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, event.getRow()));
                 continue;
             }
 
             if (TableOperationEnum.isInsert(code)) {
-                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, event.getRow()));
+                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, event.getRow()));
                 continue;
             }
 
             if (TableOperationEnum.isDelete(code)) {
-                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, event.getRow(), Collections.EMPTY_LIST));
+                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, event.getRow()));
             }
         }
     }

+ 1 - 1
dbsyncer-listener/src/main/test/DBChangeNotificationTest.java

@@ -28,7 +28,7 @@ public class DBChangeNotificationTest {
 
         final DBChangeNotification dcn = new DBChangeNotification(username, password, url);
         dcn.addRowEventListener((e) ->
-            logger.info("{}触发{}, before:{}, after:{}", e.getSourceTableName(), e.getEvent(), e.getBeforeData(), e.getAfterData())
+            logger.info("{}触发{}, data:{}", e.getSourceTableName(), e.getEvent(), e.getDataList())
         );
         dcn.start();
 

+ 3 - 7
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -251,7 +251,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             final FieldPicker picker = tablePicker.get(rowChangedEvent.getTableGroupIndex());
             TableGroup tableGroup = picker.getTableGroup();
             rowChangedEvent.setSourceTableName(tableGroup.getSourceTable().getName());
-            rowChangedEvent.setTargetTableName(tableGroup.getTargetTable().getName());
 
             // 处理过程有异常向上抛
             parser.execute(mapping, tableGroup, rowChangedEvent);
@@ -310,12 +309,9 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             List<FieldPicker> pickers = tablePicker.get(rowChangedEvent.getSourceTableName());
             if (!CollectionUtils.isEmpty(pickers)) {
                 pickers.forEach(picker -> {
-                    final Map<String, Object> before = picker.getColumns(rowChangedEvent.getBeforeData());
-                    final Map<String, Object> after = picker.getColumns(rowChangedEvent.getAfterData());
-                    if (picker.filter(StringUtil.equals(ConnectorConstant.OPERTION_DELETE, rowChangedEvent.getEvent()) ? before : after)) {
-                        rowChangedEvent.setBefore(before);
-                        rowChangedEvent.setAfter(after);
-                        rowChangedEvent.setTargetTableName(picker.getTableGroup().getTargetTable().getName());
+                    final Map<String, Object> dataMap = picker.getColumns(rowChangedEvent.getDataList());
+                    if (picker.filter(dataMap)) {
+                        rowChangedEvent.setDataMap(dataMap);
                         parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
                     }
                 });

+ 3 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -299,18 +299,17 @@ public class ParserFactory implements Parser {
 
     @Override
     public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event) {
-        logger.debug("Table[{}] {}, before:{}, after:{}", event.getSourceTableName(), event.getEvent(), event.getBefore(), event.getAfter());
+        logger.debug("Table[{}] {}, data:{}", event.getSourceTableName(), event.getEvent(), event.getDataMap());
 
         // 1、获取映射字段
-        final Map<String, Object> data = StringUtil.equals(ConnectorConstant.OPERTION_DELETE, event.getEvent()) ? event.getBefore() : event.getAfter();
         final Picker picker = new Picker(tableGroup.getFieldMapping());
-        final Map target = picker.pickData(data);
+        final Map target = picker.pickData(event.getDataMap());
 
         // 2、参数转换
         ConvertUtil.convert(tableGroup.getConvert(), target);
 
         // 3、插件转换
-        pluginFactory.convert(tableGroup.getPlugin(), event.getEvent(), data, target);
+        pluginFactory.convert(tableGroup.getPlugin(), event.getEvent(), event.getDataMap(), target);
 
         // 4、处理数据
         parserStrategy.execute(tableGroup.getId(), event.getEvent(), target);