Browse Source

merge into v2

AE86 1 year ago
parent
commit
ee08b57aca
24 changed files with 1337 additions and 49 deletions
  1. 2 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java
  2. 6 0
      dbsyncer-connector/dbsyncer-connector-oracle/pom.xml
  3. 63 20
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java
  4. 46 0
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogFile.java
  5. 321 0
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java
  6. 240 0
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMinerHelper.java
  7. 99 0
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/RedoEvent.java
  8. 107 0
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/TransactionalBuffer.java
  9. 16 4
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/AbstractConsumer.java
  10. 6 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/impl/LogConsumer.java
  11. 62 9
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java
  12. 11 6
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java
  13. 11 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterResponse.java
  14. 14 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/sql/SqlParser.java
  15. 76 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/sql/impl/DeleteSql.java
  16. 59 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/sql/impl/InsertSql.java
  17. 87 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/sql/impl/UpdateSql.java
  18. 35 0
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/enums/ChangedEventTypeEnum.java
  19. 15 0
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/ChangedEvent.java
  20. 14 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/CommonChangedEvent.java
  21. 6 8
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/DDLChangedEvent.java
  22. 7 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/RowChangedEvent.java
  23. 7 0
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/ScanChangedEvent.java
  24. 27 0
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/SqlChangedEvent.java

+ 2 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java

@@ -14,6 +14,8 @@ public abstract class StringUtil {
 
 
     public static final String SINGLE_QUOTATION = "'";
     public static final String SINGLE_QUOTATION = "'";
 
 
+    public static final String DOUBLE_QUOTATION = "\"";
+
     public static final String FORWARD_SLASH = "/";
     public static final String FORWARD_SLASH = "/";
 
 
     public static boolean equals(CharSequence cs1, CharSequence cs2) {
     public static boolean equals(CharSequence cs1, CharSequence cs2) {

+ 6 - 0
dbsyncer-connector/dbsyncer-connector-oracle/pom.xml

@@ -26,6 +26,12 @@
             <artifactId>ojdbc6</artifactId>
             <artifactId>ojdbc6</artifactId>
         </dependency>
         </dependency>
 
 
+        <dependency>
+            <groupId>com.github.jsqlparser</groupId>
+            <artifactId>jsqlparser</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- sqlserver-driver -->
         <!-- sqlserver-driver -->
         <dependency>
         <dependency>
             <groupId>com.microsoft.sqlserver</groupId>
             <groupId>com.microsoft.sqlserver</groupId>

+ 63 - 20
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java

@@ -3,15 +3,26 @@
  */
  */
 package org.dbsyncer.connector.oracle.cdc;
 package org.dbsyncer.connector.oracle.cdc;
 
 
-import org.dbsyncer.common.QueueOverflowException;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.schema.Table;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.Alter;
+import net.sf.jsqlparser.statement.delete.Delete;
+import net.sf.jsqlparser.statement.insert.Insert;
+import net.sf.jsqlparser.statement.update.Update;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.oracle.OracleException;
 import org.dbsyncer.connector.oracle.OracleException;
-import org.dbsyncer.connector.oracle.dcn.DBChangeNotification;
+import org.dbsyncer.connector.oracle.logminer.LogMiner;
 import org.dbsyncer.sdk.config.DatabaseConfig;
 import org.dbsyncer.sdk.config.DatabaseConfig;
+import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.listener.AbstractDatabaseListener;
 import org.dbsyncer.sdk.listener.AbstractDatabaseListener;
+import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
+import org.dbsyncer.sdk.listener.event.SqlChangedEvent;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import java.util.concurrent.TimeUnit;
+import java.sql.SQLException;
 
 
 /**
 /**
  * @Author AE86
  * @Author AE86
@@ -21,33 +32,54 @@ import java.util.concurrent.TimeUnit;
 public class OracleListener extends AbstractDatabaseListener {
 public class OracleListener extends AbstractDatabaseListener {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
+    private final String REDO_POSITION = "position";
 
 
-    private DBChangeNotification client;
+    private LogMiner logMiner;
 
 
     @Override
     @Override
     public void start() {
     public void start() {
         try {
         try {
             final DatabaseConfig config = (DatabaseConfig) connectorConfig;
             final DatabaseConfig config = (DatabaseConfig) connectorConfig;
+            String driverClassName = config.getDriverClassName();
             String username = config.getUsername();
             String username = config.getUsername();
             String password = config.getPassword();
             String password = config.getPassword();
             String url = config.getUrl();
             String url = config.getUrl();
-            client = new DBChangeNotification(username, password, url);
-            client.setFilterTable(filterTable);
-            client.addRowEventListener((event) -> {
-                while (client.isConnected()){
-                    try {
-                        sendChangedEvent(event);
-                        break;
-                    } catch (QueueOverflowException ex) {
-                        try {
-                            TimeUnit.MILLISECONDS.sleep(1);
-                        } catch (InterruptedException exe) {
-                            logger.error(exe.getMessage(), exe);
-                        }
+            String schema = config.getSchema();
+            boolean containsPos = snapshot.containsKey(REDO_POSITION);
+            logMiner = new LogMiner(username, password, url, schema, driverClassName);
+            logMiner.setStartScn(containsPos ? Long.parseLong(snapshot.get(REDO_POSITION)) : 0);
+            logMiner.registerEventListener((event) -> {
+                try {
+                    Statement statement = CCJSqlParserUtil.parse(event.getRedoSql());
+                    if (statement instanceof Update) {
+                        Update update = (Update) statement;
+                        sendChangedEvent(new SqlChangedEvent(replaceTableName(update.getTable()), ConnectorConstant.OPERTION_UPDATE, event.getRedoSql(), null, event.getScn()));
+                        return;
                     }
                     }
+
+                    if (statement instanceof Insert) {
+                        Insert insert = (Insert) statement;
+                        sendChangedEvent(new SqlChangedEvent(replaceTableName(insert.getTable()), ConnectorConstant.OPERTION_INSERT, event.getRedoSql(), null, event.getScn()));
+                        return;
+                    }
+
+                    if (statement instanceof Delete) {
+                        Delete delete = (Delete) statement;
+                        sendChangedEvent(new SqlChangedEvent(replaceTableName(delete.getTable()), ConnectorConstant.OPERTION_DELETE, event.getRedoSql(), null, event.getScn()));
+                        return;
+                    }
+
+                    if (statement instanceof Alter) {
+                        Alter alter = (Alter) statement;
+                        sendChangedEvent(new DDLChangedEvent("", replaceTableName(alter.getTable()), ConnectorConstant.OPERTION_ALTER, event.getRedoSql(), null, event.getScn()));
+                        return;
+                    }
+                } catch (JSQLParserException e) {
+                    logger.error("parse redoSql=" + event.getRedoSql(), e);
                 }
                 }
             });
             });
-            client.start();
+            logMiner.start();
+
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error("启动失败:{}", e.getMessage());
             logger.error("启动失败:{}", e.getMessage());
             throw new OracleException(e);
             throw new OracleException(e);
@@ -56,9 +88,20 @@ public class OracleListener extends AbstractDatabaseListener {
 
 
     @Override
     @Override
     public void close() {
     public void close() {
-        if (null != client) {
-            client.close();
+        try {
+            if (logMiner != null) {
+                logMiner.close();
+            }
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private String replaceTableName(Table table) {
+        if (table == null) {
+            return StringUtil.EMPTY;
         }
         }
+        return StringUtil.replace(table.getName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY);
     }
     }
 
 
 }
 }

+ 46 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogFile.java

@@ -0,0 +1,46 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.connector.oracle.logminer;
+
+import java.math.BigInteger;
+import java.util.Objects;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-09 20:20
+ */
+public class LogFile {
+    private final String fileName;
+    private final BigInteger firstScn;
+    private final BigInteger nextScn;
+    private final boolean current;
+
+    public LogFile(String fileName, BigInteger firstScn, BigInteger nextScn, boolean current) {
+        this.fileName = fileName;
+        this.firstScn = firstScn;
+        this.nextScn = nextScn;
+        this.current = current;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public BigInteger getFirstScn() {
+        return firstScn;
+    }
+
+    public BigInteger getNextScn() {
+        return nextScn;
+    }
+
+    public boolean isCurrent() {
+        return current;
+    }
+
+    public boolean isSameRange(LogFile other) {
+        return Objects.equals(firstScn, other.getFirstScn()) && Objects.equals(nextScn, other.getNextScn());
+    }
+}

+ 321 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java

@@ -0,0 +1,321 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.connector.oracle.logminer;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.dbsyncer.sdk.util.DatabaseUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-09 20:21
+ */
+public class LogMiner {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+    private Lock lock = new ReentrantLock();
+    private String username;
+    private String password;
+    private String url;
+    private String schema;
+    private String driverClassName;
+    private String miningStrategy = "DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG";
+    private volatile boolean connected = false;
+    private final StopWatch stopWatch = StopWatch.create();
+    private Connection connection;
+    private List<BigInteger> currentRedoLogSequences;
+    private TransactionalBuffer transactionalBuffer = new TransactionalBuffer();
+    // 已提交的位点
+    private Long committedScn = 0L;
+    // 初始位点
+    private long startScn = 0;
+    private EventListener listener;
+
+    public LogMiner(String username, String password, String url, String schema, String driverClassName) {
+        this.username = username;
+        this.password = password;
+        this.url = url;
+        this.schema = schema;
+        this.driverClassName = driverClassName;
+    }
+
+    public void close() throws SQLException {
+        lock.lock();
+        if (!connected) {
+            logger.error("LogMiner is already stop");
+            lock.unlock();
+            return;
+        }
+        this.connection.close();
+        connected = false;
+        lock.unlock();
+    }
+
+    public void start() throws SQLException {
+        lock.lock();
+        if (connected) {
+            logger.error("LogMiner is already started");
+            lock.unlock();
+            return;
+        }
+        this.connection = DatabaseUtil.getConnection(driverClassName, url, username, password);
+        connected = true;
+        lock.unlock();
+        //get current scn 判断是否第一次没有存储
+        if (startScn == 0) {
+            startScn = getCurrentScn(connection);
+        }
+
+        logger.info("scn start '{}'", startScn);
+        logger.info("start LogMiner...");
+//        LogMinerHelper.resetSessionToCdbIfNecessary(connection);
+        LogMinerHelper.setSessionParameter(connection);
+
+        // 1.记录当前redoLog,用于下文判断redoLog 是否切换
+        currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
+
+        // 2.构建数据字典 && add redo / archived log
+        initializeLogMiner();
+
+        String minerViewQuery = LogMinerHelper.logMinerViewQuery(schema, username);
+        logger.debug(minerViewQuery);
+
+        try (PreparedStatement minerViewStatement = connection.prepareStatement(minerViewQuery, ResultSet.TYPE_FORWARD_ONLY,
+                ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT)) {
+            // while
+            while (connected) {
+                // 3.确定 endScn
+                BigInteger endScn = determineEndScn();
+
+                // 4.是否发生redoLog切换
+                if (redoLogSwitchOccurred()) {
+                    // 如果切换则重启logMiner会话
+                    logger.debug("restart LogMiner Session");
+                    restartLogMiner();
+                    currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
+                }
+
+                // 5.start logMiner
+                LogMinerHelper.startLogMiner(connection, BigInteger.valueOf(startScn), endScn, miningStrategy);
+
+                // 6.查询 logMiner view, 处理结果集
+                minerViewStatement.setFetchSize(2000);
+                minerViewStatement.setFetchDirection(ResultSet.FETCH_FORWARD);
+                minerViewStatement.setString(1, String.valueOf(startScn));
+                minerViewStatement.setString(2, endScn.toString());
+
+                stopWatch.start();
+
+                try (ResultSet rs = minerViewStatement.executeQuery()) {
+                    logger.trace("Query V$LOGMNR_CONTENTS spend time {} ms", stopWatch.getTime(TimeUnit.MILLISECONDS));
+                    stopWatch.reset();
+                    logMinerViewProcessor(rs);
+                }
+
+                // 7.确定新的SCN
+                startScn = Long.parseLong(endScn.toString());
+
+                try {
+                    // 避免频繁的执行导致 PGA 内存超出 PGA_AGGREGATE_LIMIT
+                    TimeUnit.SECONDS.sleep(2);
+                } catch (InterruptedException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public long getCurrentScn(Connection connection) throws SQLException {
+        try (Statement statement = connection.createStatement()) {
+            ResultSet rs = statement.executeQuery("select CURRENT_SCN from V$DATABASE");
+
+            if (!rs.next()) {
+                throw new IllegalStateException("Couldn't get SCN");
+            }
+
+            return rs.getLong(1);
+        }
+    }
+
+    private void restartLogMiner() throws SQLException {
+        LogMinerHelper.endLogMiner(connection);
+        initializeLogMiner();
+    }
+
+    private boolean redoLogSwitchOccurred() throws SQLException {
+        final List<BigInteger> newSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
+        if (!newSequences.equals(currentRedoLogSequences)) {
+            currentRedoLogSequences = newSequences;
+            return true;
+        }
+        return false;
+    }
+
+    private BigInteger determineEndScn() throws SQLException {
+        return BigInteger.valueOf(getCurrentScn(connection));
+    }
+
+    private void initializeLogMiner() throws SQLException {
+        // 默认使用在线数据字典,所以此处不做数据字典相关操作
+        LogMinerHelper.buildDataDictionary(connection, miningStrategy);
+
+        setRedoLog();
+    }
+
+    private void setRedoLog() throws SQLException {
+        LogMinerHelper.removeLogFilesFromMining(connection);
+        List<LogFile> onlineLogFiles = LogMinerHelper.getOnlineLogFilesForOffsetScn(connection, BigInteger.valueOf(startScn));
+        List<LogFile> archivedLogFiles = LogMinerHelper.getArchivedLogFilesForOffsetScn(connection, BigInteger.valueOf(startScn));
+        List<String> logFilesNames = archivedLogFiles.stream().map(LogFile::getFileName).collect(Collectors.toList());
+        for (LogFile onlineLogFile : onlineLogFiles) {
+            boolean found = false;
+            for (LogFile archivedLogFile : archivedLogFiles) {
+                if (onlineLogFile.isSameRange(archivedLogFile)) {
+                    // 如果redo 已经被归档,那么就不需要加载这个redo了
+                    found = true;
+                    break;
+                }
+            }
+            if (!found)
+                logFilesNames.add(onlineLogFile.getFileName());
+        }
+
+        // 加载所需要的redo / archived
+        for (String fileName : logFilesNames) {
+            LogMinerHelper.addLogFile(connection, fileName);
+        }
+    }
+
+    private void logMinerViewProcessor(ResultSet rs) throws SQLException {
+        while (rs.next()) {
+            BigInteger scn = rs.getBigDecimal("SCN").toBigInteger();
+            String tableName = rs.getString("TABLE_NAME");
+            String segOwner = rs.getString("SEG_OWNER");
+            int operationCode = rs.getInt("OPERATION_CODE");
+            Timestamp changeTime = rs.getTimestamp("TIMESTAMP");
+            String txId = rs.getString("XID");
+            String operation = rs.getString("OPERATION");
+            String username = rs.getString("USERNAME");
+
+            logger.trace("Capture record, SCN:{}, TABLE_NAME:{}, SEG_OWNER:{}, OPERATION_CODE:{}, TIMESTAMP:{}, XID:{}, OPERATION:{}, USERNAME:{}",
+                    scn, tableName, segOwner, operationCode, changeTime, txId, operation, username);
+
+            // Commit
+            if (operationCode == LogMinerHelper.LOG_MINER_OC_COMMIT) {
+                // 将TransactionalBuffer中当前事务的DML 转移到消费者处理
+                if (transactionalBuffer.commit(txId, scn, committedScn)) {
+                    logger.debug("txId: {} commit", txId);
+                }
+                continue;
+            }
+
+            // Rollback
+            if (operationCode == LogMinerHelper.LOG_MINER_OC_ROLLBACK) {
+                // 清空TransactionalBuffer中当前事务
+                if (transactionalBuffer.rollback(txId)) {
+                    logger.debug("txId: {} rollback", txId);
+                }
+                continue;
+            }
+
+            // MISSING_SCN
+            if (operationCode == LogMinerHelper.LOG_MINER_OC_MISSING_SCN) {
+                logger.warn("Found MISSING_SCN");
+                continue;
+            }
+
+            // DDL
+            String redoSql = getRedoSQL(rs);
+            if (operationCode == LogMinerHelper.LOG_MINER_OC_DDL) {
+                updateCommittedScn(scn.longValue());
+                listener.onEvent(new RedoEvent(scn.longValue(), operationCode, redoSql, segOwner, tableName, changeTime, txId));
+                continue;
+            }
+
+            // DML
+            if (operationCode == LogMinerHelper.LOG_MINER_OC_INSERT
+                    || operationCode == LogMinerHelper.LOG_MINER_OC_DELETE
+                    || operationCode == LogMinerHelper.LOG_MINER_OC_UPDATE) {
+                // 内部维护 TransactionalBuffer,将每条DML注册到Buffer中
+                // 根据事务提交或者回滚情况决定如何处理
+                if (redoSql != null) {
+                    final RedoEvent event = new RedoEvent(scn.longValue(), operationCode, redoSql, segOwner, tableName, changeTime, txId);
+                    // Transactional Commit Callback
+                    TransactionalBuffer.CommitCallback commitCallback = (smallestScn, commitScn, counter) -> {
+                        if (smallestScn == null || scn.compareTo(smallestScn) < 0) {
+                            // 当前SCN 事务已经提交 并且 小于事务缓冲区中所有的开始SCN,所以可以更新offsetScn
+                            startScn = scn.longValue();
+                        }
+
+                        if (counter == 0) {
+                            updateCommittedScn(commitScn.longValue());
+                        }
+
+                        event.setScn(committedScn);
+                        listener.onEvent(event);
+                    };
+                    transactionalBuffer.registerCommitCallback(txId, scn, commitCallback);
+                }
+            }
+        }
+    }
+
+    private void updateCommittedScn(long newScn) {
+        committedScn = newScn > committedScn ? newScn : committedScn;
+    }
+
+    private String getRedoSQL(ResultSet rs) throws SQLException {
+        String redoSql = rs.getString("SQL_REDO");
+        if (redoSql == null) {
+            return null;
+        }
+        StringBuilder redoBuilder = new StringBuilder(redoSql);
+
+        // https://docs.oracle.com/cd/B19306_01/server.102/b14237/dynviews_1154.htm#REFRN30132
+        // Continuation SQL flag. Possible values are:
+        // 0 = indicates SQL_REDO and SQL_UNDO is contained within the same row
+        // 1 = indicates that either SQL_REDO or SQL_UNDO is greater than 4000 bytes in size and is continued in the next row returned by the view
+        int csf = rs.getInt("CSF");
+
+        while (csf == 1) {
+            rs.next();
+            redoBuilder.append(rs.getString("SQL_REDO"));
+            csf = rs.getInt("CSF");
+        }
+
+        return redoBuilder.toString();
+    }
+
+    public void registerEventListener(EventListener listener){
+        this.listener = listener;
+    }
+
+    public void setStartScn(long startScn) {
+        this.startScn = startScn;
+    }
+
+    public interface EventListener {
+
+        void onEvent(RedoEvent redoEvent);
+    }
+
+}

+ 240 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMinerHelper.java

@@ -0,0 +1,240 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.connector.oracle.logminer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.dbsyncer.common.util.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-09 20:23
+ */
+public class LogMinerHelper {
+    private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerHelper.class);
+    public static final int LOG_MINER_OC_INSERT = 1;
+    public static final int LOG_MINER_OC_DELETE = 2;
+    public static final int LOG_MINER_OC_UPDATE = 3;
+    public static final int LOG_MINER_OC_DDL = 5;
+    public static final int LOG_MINER_OC_COMMIT = 7;
+    public static final int LOG_MINER_OC_MISSING_SCN = 34;
+    public static final int LOG_MINER_OC_ROLLBACK = 36;
+
+    public static void removeLogFilesFromMining(Connection conn) throws SQLException {
+        try (PreparedStatement ps = conn.prepareStatement("SELECT FILENAME AS NAME FROM V$LOGMNR_LOGS");
+             ResultSet result = ps.executeQuery()) {
+            Set<String> files = new LinkedHashSet<>();
+            while (result.next()) {
+                files.add(result.getString(1));
+            }
+            for (String fileName : files) {
+                String sql = String.format("BEGIN SYS.DBMS_LOGMNR.REMOVE_LOGFILE(LOGFILENAME => '%s');END;", fileName);
+                executeCallableStatement(conn, sql);
+                LOGGER.debug("File {} was removed from mining", fileName);
+            }
+        }
+    }
+
+    public static void resetSessionToCdbIfNecessary(Connection connection) {
+        Statement statement = null;
+        try {
+            statement = connection.createStatement();
+            statement.execute("alter session set container=cdb$root");
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (statement != null) {
+                try {
+                    statement.close();
+                } catch (SQLException e) {
+                    LOGGER.error("Couldn't close statement", e);
+                }
+            }
+        }
+    }
+
+    public static void executeCallableStatement(Connection connection, String statement) throws SQLException {
+        Objects.requireNonNull(statement);
+        try (CallableStatement s = connection.prepareCall(statement)) {
+            s.execute();
+        }
+    }
+
+    public static List<LogFile> getOnlineLogFilesForOffsetScn(Connection connection, BigInteger offsetScn) throws SQLException {
+        List<LogFile> redoLogFiles = new ArrayList<>();
+
+        String onlineLogQuery = "SELECT MIN(F.MEMBER) AS FILE_NAME, L.NEXT_CHANGE# AS NEXT_CHANGE, F.GROUP#, L.FIRST_CHANGE# AS FIRST_CHANGE, L.STATUS " +
+                " FROM V$LOG L, V$LOGFILE F " +
+                " WHERE F.GROUP# = L.GROUP# AND L.NEXT_CHANGE# > 0 " +
+                " GROUP BY F.GROUP#, L.NEXT_CHANGE#, L.FIRST_CHANGE#, L.STATUS ORDER BY 3";
+
+        try (PreparedStatement s = connection.prepareStatement(onlineLogQuery)) {
+            try (ResultSet rs = s.executeQuery()) {
+                while (rs.next()) {
+                    String fileName = rs.getString(1);
+                    BigInteger nextChangeNumber = new BigInteger(rs.getString(2));
+                    BigInteger firstChangeNumber = new BigInteger(rs.getString(4));
+                    String status = rs.getString(5);
+                    LogFile logFile = new LogFile(fileName, firstChangeNumber, nextChangeNumber, "CURRENT".equalsIgnoreCase(status));
+                    // 添加Current Redo || scn 范围符合的
+                    if (logFile.isCurrent() || logFile.getNextScn().compareTo(offsetScn) >= 0) {
+                        redoLogFiles.add(logFile);
+                    }
+                }
+            }
+        }
+        return redoLogFiles;
+    }
+
+    public static List<LogFile> getArchivedLogFilesForOffsetScn(Connection connection, BigInteger offsetScn) throws SQLException {
+        String archiveLogsQuery = String.format("SELECT NAME AS FILE_NAME, NEXT_CHANGE# AS NEXT_CHANGE, FIRST_CHANGE# AS FIRST_CHANGE FROM V$ARCHIVED_LOG " +
+                "WHERE NAME IS NOT NULL AND ARCHIVED = 'YES' " +
+                "AND STATUS = 'A' AND NEXT_CHANGE# > %s ORDER BY 2", offsetScn);
+
+        final List<LogFile> archiveLogFiles = new ArrayList<>();
+        try (PreparedStatement s = connection.prepareStatement(archiveLogsQuery)) {
+            try (ResultSet rs = s.executeQuery()) {
+                while (rs.next()) {
+                    String fileName = rs.getString(1);
+                    BigInteger firstChangeNumber = new BigInteger(rs.getString(3));
+                    BigInteger nextChangeNumber = new BigInteger(rs.getString(2));
+                    archiveLogFiles.add(new LogFile(fileName, firstChangeNumber, nextChangeNumber, false));
+                }
+            }
+        }
+        return archiveLogFiles;
+    }
+
+    public static void addLogFile(Connection connection, String fileName) throws SQLException {
+        String addLogFile = "BEGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => '%s', OPTIONS => %s);END;";
+        String options = "DBMS_LOGMNR.ADDFILE";
+//        String options = "DBMS_LOGMNR.NEW";
+        executeCallableStatement(connection, String.format(addLogFile, fileName, options));
+    }
+
+    public static List<BigInteger> getCurrentRedoLogSequences(Connection connection) throws SQLException {
+        String currentRedoSequence = "SELECT SEQUENCE# FROM V$LOG WHERE STATUS = 'CURRENT'";
+        try (Statement statement = connection.createStatement();
+             ResultSet rs = statement.executeQuery(currentRedoSequence)) {
+            List<BigInteger> sequences = new ArrayList<>();
+            if (rs.next()) {
+                sequences.add(new BigInteger(rs.getString(1)));
+            }
+            // 如果是RAC则会返回多个SEQUENCE
+            return sequences;
+        }
+    }
+
+    public static void buildDataDictionary(Connection connection, String miningStrategy) throws SQLException {
+        if (StringUtils.isBlank(miningStrategy)) {
+            // default
+            String sql = "BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;";
+            executeCallableStatement(connection, sql);
+        }
+    }
+
+    public static void startLogMiner(Connection connection, BigInteger startScn, BigInteger endScn, String miningStrategy) throws SQLException {
+        LOGGER.debug("startLogMiner... startScn {}, endScn {}", startScn, endScn);
+        // default
+        if (StringUtils.isBlank(miningStrategy)) {
+            miningStrategy = "DBMS_LOGMNR.DICT_FROM_REDO_LOGS + DBMS_LOGMNR.DDL_DICT_TRACKING ";
+        }
+
+        String startLogMiner = "BEGIN sys.dbms_logmnr.start_logmnr(" +
+                "startScn => '" + startScn + "', " +
+                "endScn => '" + endScn + "', " +
+                "OPTIONS => " + miningStrategy +
+                " + DBMS_LOGMNR.NO_ROWID_IN_STMT);" +
+                "END;";
+
+        executeCallableStatement(connection, startLogMiner);
+    }
+
+    public static void endLogMiner(Connection connection) {
+        try {
+            executeCallableStatement(connection, "BEGIN SYS.DBMS_LOGMNR.END_LOGMNR(); END;");
+        } catch (SQLException e) {
+            if (e.getMessage().toUpperCase().contains("ORA-01307")) {
+                LOGGER.info("LogMiner session was already closed");
+            } else {
+                LOGGER.error("Cannot close LogMiner session gracefully: {}", e);
+            }
+        }
+    }
+
+    public static String logMinerViewQuery(String schema, String logMinerUser) {
+        StringBuilder query = new StringBuilder();
+//        query.append("SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME ");
+        query.append("SELECT * ");
+        query.append("FROM V$LOGMNR_CONTENTS ");
+        query.append("WHERE ");
+        // 这里由原来的 SCN > ? AND SCN <= ? 改为如下
+        // 原因:
+        // 在测试的时候发现一个情况会丢失部分数据
+        // 结论:
+        // START_SCN = X , END_SCN = Y, 此时查询条件 SCN >= X AND SCN <= Y
+        // 查询 V$LOGMNR_CONTENTS, 此时如果SQL的SCN恰好等于Y, 那么这次可能不会查出SCN=Y 的SQL(并不是百分之百)
+        // 但是当指定 SCN >= Y 时, 貌似一定能查到
+        // 这个问题很奇怪,有待研究
+        query.append("SCN >= ? AND SCN < ? ");
+        query.append("AND (");
+        // MISSING_SCN/DDL only when not performed by excluded users
+        query.append("(OPERATION_CODE IN (5,34) AND USERNAME NOT IN (").append(getExcludedUsers(logMinerUser)).append(")) ");
+        // COMMIT/ROLLBACK
+        query.append("OR (OPERATION_CODE IN (7,36)) ");
+        // INSERT/UPDATE/DELETE
+        query.append("OR ");
+        query.append("(OPERATION_CODE IN (1,2,3) ");
+        query.append(" AND SEG_OWNER NOT IN ('APPQOSSYS','AUDSYS','CTXSYS','DVSYS','DBSFWUSER','DBSNMP','GSMADMIN_INTERNAL','LBACSYS','MDSYS','OJVMSYS','OLAPSYS','ORDDATA','ORDSYS','OUTLN','SYS','SYSTEM','WMSYS','XDB') ");
+
+        if (StringUtils.isNotBlank(schema)) {
+            query.append(String.format(" AND (REGEXP_LIKE(SEG_OWNER,'^%s$','i')) ", schema));
+//            query.append(" AND ");
+//            query.append("USERNAME = '");
+//            query.append(schema);
+//            query.append("' ");
+        }
+
+        query.append(" ))");
+
+        return query.toString();
+    }
+
+    private static String getExcludedUsers(String logMinerUser) {
+        return "'SYS','SYSTEM','" + logMinerUser.toUpperCase() + "'";
+    }
+
+    public static void setSessionParameter(Connection connection) throws SQLException {
+        String sql = "ALTER SESSION SET "
+                + "  NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS'"
+                + "  NLS_TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF'"
+                + "  NLS_TIMESTAMP_TZ_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'"
+                + "  NLS_NUMERIC_CHARACTERS = '.,'";
+
+        executeCallableStatement(connection, sql);
+        executeCallableStatement(connection, "ALTER SESSION SET TIME_ZONE = '00:00'");
+    }
+
+    public void executeWithoutCommitting(Connection connection, String sql) throws SQLException {
+        try (Statement statement = connection.createStatement()) {
+            statement.execute(sql);
+        }
+    }
+
+}

+ 99 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/RedoEvent.java

@@ -0,0 +1,99 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.connector.oracle.logminer;
+
+import java.sql.Timestamp;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-09 20:22
+ */
+public final class RedoEvent {
+    private long scn;
+    private int operationCode;
+    private String redoSql;
+    private String objectOwner;
+    private String objectName;
+    private Timestamp sourceTime;
+    private String transactionId;
+
+    public RedoEvent(long scn, int operationCode, String redoSql, String objectOwner, String objectName, Timestamp sourceTime, String transactionId) {
+        this.scn = scn;
+        this.operationCode = operationCode;
+        this.redoSql = redoSql;
+        this.objectOwner = objectOwner;
+        this.objectName = objectName;
+        this.sourceTime = sourceTime;
+        this.transactionId = transactionId;
+    }
+
+    public long getScn() {
+        return scn;
+    }
+
+    public void setScn(long scn) {
+        this.scn = scn;
+    }
+
+    public int getOperationCode() {
+        return operationCode;
+    }
+
+    public void setOperationCode(int operationCode) {
+        this.operationCode = operationCode;
+    }
+
+    public String getRedoSql() {
+        return redoSql;
+    }
+
+    public void setRedoSql(String redoSql) {
+        this.redoSql = redoSql;
+    }
+
+    public String getObjectOwner() {
+        return objectOwner;
+    }
+
+    public void setObjectOwner(String objectOwner) {
+        this.objectOwner = objectOwner;
+    }
+
+    public String getObjectName() {
+        return objectName;
+    }
+
+    public void setObjectName(String objectName) {
+        this.objectName = objectName;
+    }
+
+    public Timestamp getSourceTime() {
+        return sourceTime;
+    }
+
+    public void setSourceTime(Timestamp sourceTime) {
+        this.sourceTime = sourceTime;
+    }
+
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(String transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    @Override
+    public String toString() {
+        return "LogMinerDmlObject{" +
+                "redoSql='" + redoSql + '\'' +
+                ", objectOwner='" + objectOwner + '\'' +
+                ", objectName='" + objectName + '\'' +
+                ", sourceTime=" + sourceTime +
+                ", transactionId='" + transactionId + '\'' +
+                ", scn=" + scn +
+                '}';
+    }
+}

+ 107 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/TransactionalBuffer.java

@@ -0,0 +1,107 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.connector.oracle.logminer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-09 20:26
+ */
+public class TransactionalBuffer {
+    private final Logger LOGGER = LoggerFactory.getLogger(TransactionalBuffer.class);
+    private final Map<String, Transaction> transactions;
+    private BigInteger lastCommittedScn;
+
+    public TransactionalBuffer() {
+        this.transactions = new HashMap<>();
+        this.lastCommittedScn = BigInteger.ZERO;
+    }
+
+    public interface CommitCallback {
+        void execute(BigInteger smallestScn, BigInteger commitScn, int callbackNumber) throws InterruptedException;
+    }
+
+    private final class Transaction {
+        private final BigInteger firstScn;
+        private BigInteger lastScn;
+        private final List<CommitCallback> commitCallbacks;
+
+        private Transaction(BigInteger firstScn) {
+            this.firstScn = firstScn;
+            this.commitCallbacks = new ArrayList<>();
+            this.lastScn = firstScn;
+        }
+
+        @Override
+        public String toString() {
+            return "Transaction{" +
+                    "firstScn=" + firstScn +
+                    ", lastScn=" + lastScn +
+                    '}';
+        }
+    }
+
+    public boolean isEmpty() {
+        return this.transactions.isEmpty();
+    }
+
+    private BigInteger calculateSmallestScn() {
+        return transactions.isEmpty() ? null
+                : transactions.values()
+                .stream()
+                .map(transaction -> transaction.firstScn)
+                .min(BigInteger::compareTo)
+                .orElseThrow(() -> new RuntimeException("Cannot calculate smallest SCN"));
+    }
+
+    public void registerCommitCallback(String transactionId, BigInteger scn, CommitCallback callback) {
+        transactions.computeIfAbsent(transactionId, s -> new Transaction(scn));
+
+        Transaction transaction = transactions.get(transactionId);
+        if (transaction != null) {
+            transaction.commitCallbacks.add(callback);
+        }
+    }
+
+    public boolean commit(String txId, BigInteger commitScn, long committedScn) {
+        Transaction transaction = transactions.remove(txId);
+        if (transaction == null) {
+            return false;
+        }
+
+        BigInteger smallestScn = calculateSmallestScn();
+
+        if (committedScn > commitScn.longValue() || lastCommittedScn.longValue() > commitScn.longValue()) {
+            LOGGER.warn("txId {} already commit, ignore.", txId);
+            return false;
+        }
+
+        int counter = transaction.commitCallbacks.size();
+        for (CommitCallback callback : transaction.commitCallbacks) {
+            try {
+                callback.execute(smallestScn, commitScn, --counter);
+            } catch (InterruptedException e) {
+                LOGGER.error(e.getMessage(), e);
+            }
+        }
+
+        lastCommittedScn = commitScn;
+        return true;
+    }
+
+    public boolean rollback(String txId) {
+        return transactions.remove(txId) != null;
+    }
+
+}

+ 16 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/AbstractConsumer.java

@@ -3,6 +3,7 @@
  */
  */
 package org.dbsyncer.parser.consumer;
 package org.dbsyncer.parser.consumer;
 
 
+import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 import org.dbsyncer.sdk.listener.ChangedEvent;
 import org.dbsyncer.sdk.listener.ChangedEvent;
 import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
 import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
 import org.dbsyncer.sdk.listener.Watcher;
 import org.dbsyncer.sdk.listener.Watcher;
@@ -13,6 +14,7 @@ import org.dbsyncer.parser.LogType;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.sdk.listener.event.SqlChangedEvent;
 
 
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
@@ -48,14 +50,24 @@ public abstract class AbstractConsumer<E extends ChangedEvent> implements Watche
     public void onDDLChanged(DDLChangedEvent event) {
     public void onDDLChanged(DDLChangedEvent event) {
     }
     }
 
 
+    public void onSqlChanged(SqlChangedEvent event) {
+    }
+
     @Override
     @Override
     public void changeEvent(ChangedEvent event) {
     public void changeEvent(ChangedEvent event) {
         event.getChangedOffset().setMetaId(metaId);
         event.getChangedOffset().setMetaId(metaId);
-        if (event instanceof DDLChangedEvent) {
-            onDDLChanged((DDLChangedEvent) event);
-            return;
+        switch (event.getType()){
+            case ROW:
+            case SCAN:
+                onChange((E) event);
+                break;
+            case SQL:
+                onSqlChanged((SqlChangedEvent) event);
+                break;
+            case DDL:
+                onDDLChanged((DDLChangedEvent) event);
+                break;
         }
         }
-        onChange((E) event);
     }
     }
 
 
     @Override
     @Override

+ 6 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/consumer/impl/LogConsumer.java

@@ -11,6 +11,7 @@ import org.dbsyncer.parser.consumer.AbstractConsumer;
 import org.dbsyncer.parser.model.FieldPicker;
 import org.dbsyncer.parser.model.FieldPicker;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.parser.util.PickerUtil;
+import org.dbsyncer.sdk.listener.event.SqlChangedEvent;
 import org.dbsyncer.sdk.model.Table;
 import org.dbsyncer.sdk.model.Table;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -59,6 +60,11 @@ public final class LogConsumer extends AbstractConsumer<RowChangedEvent> {
         process(event, picker -> execute(picker.getTableGroup().getId(), event));
         process(event, picker -> execute(picker.getTableGroup().getId(), event));
     }
     }
 
 
+    @Override
+    public void onSqlChanged(SqlChangedEvent event) {
+        process(event, picker -> execute(picker.getTableGroup().getId(), event));
+    }
+
     private void process(CommonChangedEvent event, Consumer<FieldPicker> consumer) {
     private void process(CommonChangedEvent event, Consumer<FieldPicker> consumer) {
         // 处理过程有异常向上抛
         // 处理过程有异常向上抛
         List<FieldPicker> pickers = tablePicker.get(event.getSourceTableName());
         List<FieldPicker> pickers = tablePicker.get(event.getSourceTableName());

+ 62 - 9
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -6,8 +6,6 @@ import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.sdk.config.DDLConfig;
-import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.parser.ParserComponent;
 import org.dbsyncer.parser.ParserComponent;
 import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.ddl.DDLParser;
 import org.dbsyncer.parser.ddl.DDLParser;
@@ -21,13 +19,20 @@ import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.model.WriterResponse;
 import org.dbsyncer.parser.model.WriterResponse;
+import org.dbsyncer.parser.sql.SqlParser;
+import org.dbsyncer.parser.sql.impl.DeleteSql;
+import org.dbsyncer.parser.sql.impl.InsertSql;
+import org.dbsyncer.parser.sql.impl.UpdateSql;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
 import org.dbsyncer.plugin.PluginFactory;
+import org.dbsyncer.plugin.impl.IncrementPluginContext;
+import org.dbsyncer.sdk.config.DDLConfig;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
+import org.dbsyncer.sdk.constant.ConnectorConstant;
+import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 import org.dbsyncer.sdk.model.ConnectorConfig;
 import org.dbsyncer.sdk.model.ConnectorConfig;
-import org.dbsyncer.plugin.impl.IncrementPluginContext;
 import org.dbsyncer.sdk.model.MetaInfo;
 import org.dbsyncer.sdk.model.MetaInfo;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -103,6 +108,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         if (!response.isMerged()) {
         if (!response.isMerged()) {
             response.setTableGroupId(request.getTableGroupId());
             response.setTableGroupId(request.getTableGroupId());
             response.setEvent(request.getEvent());
             response.setEvent(request.getEvent());
+            response.setTypeEnum(request.getTypeEnum());
             response.setSql(request.getSql());
             response.setSql(request.getSql());
             response.setMerged(true);
             response.setMerged(true);
         }
         }
@@ -112,7 +118,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
     protected boolean skipPartition(WriterRequest nextRequest, WriterResponse response) {
     protected boolean skipPartition(WriterRequest nextRequest, WriterResponse response) {
         // 并发场景,同一条数据可能连续触发Insert > Delete > Insert,批处理任务中出现不同事件时,跳过分区处理
         // 并发场景,同一条数据可能连续触发Insert > Delete > Insert,批处理任务中出现不同事件时,跳过分区处理
         // 跳过表结构修改事件(保证表结构修改原子性)
         // 跳过表结构修改事件(保证表结构修改原子性)
-        return !StringUtil.equals(nextRequest.getEvent(), response.getEvent()) || isDDLEvent(response.getEvent());
+        return !StringUtil.equals(nextRequest.getEvent(), response.getEvent()) || ChangedEventTypeEnum.isDDL(response.getTypeEnum());
     }
     }
 
 
     @Override
     @Override
@@ -123,10 +129,14 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
         final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
 
 
         // 1、ddl解析
         // 1、ddl解析
-        if (isDDLEvent(response.getEvent())) {
+        if (ChangedEventTypeEnum.isDDL(response.getTypeEnum())) {
             parseDDl(response, mapping, group);
             parseDDl(response, mapping, group);
             return;
             return;
         }
         }
+        if (ChangedEventTypeEnum.isSQL(response.getTypeEnum())) {
+            parseSql(response, mapping, group);
+            return;
+        }
 
 
         final String sourceTableName = group.getSourceTable().getName();
         final String sourceTableName = group.getSourceTable().getName();
         final String targetTableName = group.getTargetTable().getName();
         final String targetTableName = group.getTargetTable().getName();
@@ -171,10 +181,6 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         return generalExecutor;
         return generalExecutor;
     }
     }
 
 
-    private boolean isDDLEvent(String event) {
-        return StringUtil.equals(event, ConnectorConstant.OPERTION_ALTER);
-    }
-
     /**
     /**
      * 解析DDL
      * 解析DDL
      *
      *
@@ -227,6 +233,53 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         logger.warn("暂只支持MYSQL解析DDL");
         logger.warn("暂只支持MYSQL解析DDL");
     }
     }
 
 
+    /**
+     * 解析sql并替换到目标sql
+     *
+     * @param response
+     * @param mapping
+     * @param group
+     */
+    private void parseSql(WriterResponse response, Mapping mapping, TableGroup group) {
+        String sql = response.getSql();
+        final String sourceTableName = group.getSourceTable().getName();
+        final String targetTableName = group.getTargetTable().getName();
+        final String event = response.getEvent();
+        // 根据不同的语句进行sql替换拼接
+        switch (event) {
+            case ConnectorConstant.OPERTION_INSERT:
+                SqlParser insertSql = new InsertSql(sql, sourceTableName, targetTableName, group.getFieldMapping());
+                sql = insertSql.parse();
+                break;
+            case ConnectorConstant.OPERTION_UPDATE:
+                SqlParser updateSql = new UpdateSql(sql, sourceTableName, targetTableName, group.getFieldMapping());
+                sql = updateSql.parse();
+                break;
+            case ConnectorConstant.OPERTION_DELETE:
+                SqlParser deleteSql = new DeleteSql(sql, sourceTableName, targetTableName, group.getFieldMapping());
+                sql = deleteSql.parse();
+                break;
+            default:
+                break;
+        }
+        logger.info("execute sql:{}", sql);
+
+        ConnectorConfig tConnConfig = getConnectorConfig(mapping.getTargetConnectorId());
+        final ConnectorInstance tConnectorInstance = connectorFactory.connect(tConnConfig);
+        // TODO life 这里需要重新设计实现方案,不支持异构同步场景
+        DDLConfig ddlConfig = new DDLConfig();
+        ddlConfig.setSql(sql);
+        Result result = connectorFactory.writerDDL(tConnectorInstance, ddlConfig);
+
+        // 6.发布刷新增量点事件
+        applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
+
+        // 7、持久化同步结果
+        result.setTableGroupId(group.getId());
+        result.setTargetTableGroupName(targetTableName);
+        flushStrategy.flushIncrementData(mapping.getMetaId(), result, event);
+    }
+
     /**
     /**
      * 获取连接器配置
      * 获取连接器配置
      *
      *

+ 11 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java

@@ -1,9 +1,9 @@
 package org.dbsyncer.parser.model;
 package org.dbsyncer.parser.model;
 
 
+import org.dbsyncer.parser.flush.BufferRequest;
+import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 import org.dbsyncer.sdk.listener.ChangedEvent;
 import org.dbsyncer.sdk.listener.ChangedEvent;
 import org.dbsyncer.sdk.model.ChangedOffset;
 import org.dbsyncer.sdk.model.ChangedOffset;
-import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
-import org.dbsyncer.parser.flush.BufferRequest;
 
 
 import java.util.Map;
 import java.util.Map;
 
 
@@ -14,6 +14,8 @@ import java.util.Map;
  */
  */
 public class WriterRequest extends AbstractWriter implements BufferRequest {
 public class WriterRequest extends AbstractWriter implements BufferRequest {
 
 
+    private ChangedEventTypeEnum typeEnum;
+
     private Map row;
     private Map row;
 
 
     private ChangedOffset changedOffset;
     private ChangedOffset changedOffset;
@@ -23,18 +25,21 @@ public class WriterRequest extends AbstractWriter implements BufferRequest {
     public WriterRequest(String tableGroupId, ChangedEvent event) {
     public WriterRequest(String tableGroupId, ChangedEvent event) {
         setTableGroupId(tableGroupId);
         setTableGroupId(tableGroupId);
         setEvent(event.getEvent());
         setEvent(event.getEvent());
+        this.typeEnum = event.getType();
         this.row = event.getChangedRow();
         this.row = event.getChangedRow();
         this.changedOffset = event.getChangedOffset();
         this.changedOffset = event.getChangedOffset();
-        if(event instanceof DDLChangedEvent){
-            sql = ((DDLChangedEvent) event).getSql();
-        }
+        this.sql = event.getSql();
     }
     }
 
 
     @Override
     @Override
     public String getMetaId() {
     public String getMetaId() {
         return changedOffset.getMetaId();
         return changedOffset.getMetaId();
     }
     }
-    
+
+    public ChangedEventTypeEnum getTypeEnum() {
+        return typeEnum;
+    }
+
     public Map getRow() {
     public Map getRow() {
         return row;
         return row;
     }
     }

+ 11 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterResponse.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.parser.model;
 package org.dbsyncer.parser.model;
 
 
+import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 import org.dbsyncer.sdk.model.ChangedOffset;
 import org.dbsyncer.sdk.model.ChangedOffset;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.flush.BufferResponse;
 import org.dbsyncer.parser.flush.BufferResponse;
@@ -15,6 +16,8 @@ import java.util.Map;
  */
  */
 public class WriterResponse extends AbstractWriter implements BufferResponse {
 public class WriterResponse extends AbstractWriter implements BufferResponse {
 
 
+    private ChangedEventTypeEnum typeEnum;
+
     private List<Map> dataList = new LinkedList<>();
     private List<Map> dataList = new LinkedList<>();
 
 
     private List<ChangedOffset> offsetList = new LinkedList<>();
     private List<ChangedOffset> offsetList = new LinkedList<>();
@@ -33,6 +36,14 @@ public class WriterResponse extends AbstractWriter implements BufferResponse {
         return StringUtil.SYMBOL.concat(getEvent());
         return StringUtil.SYMBOL.concat(getEvent());
     }
     }
 
 
+    public ChangedEventTypeEnum getTypeEnum() {
+        return typeEnum;
+    }
+
+    public void setTypeEnum(ChangedEventTypeEnum typeEnum) {
+        this.typeEnum = typeEnum;
+    }
+
     public List<Map> getDataList() {
     public List<Map> getDataList() {
         return dataList;
         return dataList;
     }
     }

+ 14 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/sql/SqlParser.java

@@ -0,0 +1,14 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.parser.sql;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-10 00:43
+ */
+public interface SqlParser {
+
+    public String parse();
+}

+ 76 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/sql/impl/DeleteSql.java

@@ -0,0 +1,76 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.parser.sql.impl;
+
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.expression.BinaryExpression;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.schema.Column;
+import net.sf.jsqlparser.schema.Table;
+import net.sf.jsqlparser.statement.delete.Delete;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.dbsyncer.parser.sql.SqlParser;
+
+import java.util.List;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-10 00:43
+ */
+public final class DeleteSql implements SqlParser {
+
+    private String sql;
+
+    private String sourceTableName;
+
+    private String targetTableName;
+
+    private List<FieldMapping> fieldMappingList;
+
+    public DeleteSql(String sql, String sourceTableName, String targetTableName, List<FieldMapping> fieldMappingList) {
+        this.sql = sql;
+        this.sourceTableName = sourceTableName;
+        this.targetTableName = targetTableName;
+        this.fieldMappingList = fieldMappingList;
+    }
+
+    @Override
+    public String parse() {
+        try {
+            Delete delete = (Delete) CCJSqlParserUtil.parse(sql);
+            //替换表名
+            Table table = new Table();
+            table.setName(targetTableName);
+            delete.setTable(table);
+            whereParse(delete.getWhere());
+            return delete.toString();
+        } catch (JSQLParserException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void whereParse(Expression expression){
+        BinaryExpression binaryExpression = (BinaryExpression) expression;
+        Expression left = binaryExpression.getLeftExpression();
+        Expression right = binaryExpression.getRightExpression();
+        findColumn((BinaryExpression) left);
+        findColumn((BinaryExpression) right);
+    }
+
+    private void findColumn(BinaryExpression binaryExpression){
+        if (binaryExpression.getLeftExpression() instanceof Column){
+            Column column = (Column) binaryExpression.getLeftExpression();
+            fieldMappingList.stream()
+                    .filter(x -> x.getSource().getName()
+                            .equals(column.getColumnName().replaceAll("\"", "")))
+                    .findFirst().ifPresent(
+                    fieldMapping -> column.setColumnName(fieldMapping.getTarget().getName()));
+            return;
+        }
+        findColumn((BinaryExpression) binaryExpression.getLeftExpression());
+        findColumn((BinaryExpression) binaryExpression.getRightExpression());
+    }
+}

+ 59 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/sql/impl/InsertSql.java

@@ -0,0 +1,59 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.parser.sql.impl;
+
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.schema.Column;
+import net.sf.jsqlparser.schema.Table;
+import net.sf.jsqlparser.statement.insert.Insert;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.dbsyncer.parser.sql.SqlParser;
+
+import java.util.List;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-10 00:44
+ */
+public final class InsertSql implements SqlParser {
+
+    private String sql;
+
+    private String sourceTableName;
+
+    private String targetTableName;
+
+    private List<FieldMapping> fieldMappingList;
+
+    public InsertSql(String sql, String sourceTableName, String targetTableName, List<FieldMapping> fieldMappingList) {
+        this.sql = sql;
+        this.sourceTableName = sourceTableName;
+        this.targetTableName = targetTableName;
+        this.fieldMappingList = fieldMappingList;
+    }
+
+    public String parse() {
+        try {
+            Insert insert = (Insert) CCJSqlParserUtil.parse(this.sql);
+            //替换表名
+            Table table = new Table();
+            table.setName(targetTableName);
+            insert.setTable(table);
+            //替换字段
+            List<Column> columns = insert.getColumns();
+            for (Column column : columns) {
+                fieldMappingList.stream()
+                        .filter(x -> x.getSource().getName()
+                                .equals(column.getColumnName().replaceAll("\"", "")))
+                        .findFirst().ifPresent(
+                        fieldMapping -> column.setColumnName(fieldMapping.getTarget().getName()));
+            }
+            return insert.toString();
+        } catch (JSQLParserException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

+ 87 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/sql/impl/UpdateSql.java

@@ -0,0 +1,87 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.parser.sql.impl;
+
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.expression.BinaryExpression;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.schema.Column;
+import net.sf.jsqlparser.schema.Table;
+import net.sf.jsqlparser.statement.update.Update;
+import net.sf.jsqlparser.statement.update.UpdateSet;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.dbsyncer.parser.sql.SqlParser;
+
+import java.util.List;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-10 00:45
+ */
+public final class UpdateSql implements SqlParser {
+
+    private String sql;
+
+    private String sourceTableName;
+
+    private String targetTableName;
+
+    private List<FieldMapping> fieldMappingList;
+
+    public UpdateSql(String sql, String sourceTableName, String targetTableName, List<FieldMapping> fieldMappingList) {
+        this.sql = sql;
+        this.sourceTableName = sourceTableName;
+        this.targetTableName = targetTableName;
+        this.fieldMappingList = fieldMappingList;
+    }
+
+    @Override
+    public String parse() {
+        try {
+            Update update = (Update) CCJSqlParserUtil.parse(sql);
+            //替换表名
+            Table table = new Table();
+            table.setName(targetTableName);
+            update.setTable(table);
+            for (UpdateSet updateSet : update.getUpdateSets()) {
+                List<Column> columns = updateSet.getColumns();
+                for (Column column : columns) {
+                    fieldMappingList.stream()
+                            .filter(x -> x.getSource().getName()
+                                    .equals(column.getColumnName().replaceAll("\"", "")))
+                            .findFirst().ifPresent(
+                            fieldMapping -> column.setColumnName(fieldMapping.getTarget().getName()));
+                }
+            }
+            whereParse(update.getWhere());
+            return update.toString();
+        } catch (JSQLParserException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void whereParse(Expression expression) {
+        BinaryExpression binaryExpression = (BinaryExpression) expression;
+        Expression left = binaryExpression.getLeftExpression();
+        Expression right = binaryExpression.getRightExpression();
+        findColumn((BinaryExpression) left);
+        findColumn((BinaryExpression) right);
+    }
+
+    private void findColumn(BinaryExpression binaryExpression) {
+        if (binaryExpression.getLeftExpression() instanceof Column) {
+            Column column = (Column) binaryExpression.getLeftExpression();
+            fieldMappingList.stream()
+                    .filter(x -> x.getSource().getName()
+                            .equals(column.getColumnName().replaceAll("\"", "")))
+                    .findFirst().ifPresent(
+                    fieldMapping -> column.setColumnName(fieldMapping.getTarget().getName()));
+            return;
+        }
+        findColumn((BinaryExpression) binaryExpression.getLeftExpression());
+        findColumn((BinaryExpression) binaryExpression.getRightExpression());
+    }
+}

+ 35 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/enums/ChangedEventTypeEnum.java

@@ -0,0 +1,35 @@
+package org.dbsyncer.sdk.enums;
+
+/**
+ * 变更事件类型枚举
+ *
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-09 20:34
+ */
+public enum ChangedEventTypeEnum {
+    /**
+     * ddl变更
+     */
+    DDL,
+    /**
+     * 表列变更,比如oracle, 会获取变动列的sql
+     */
+    SQL,
+    /**
+     * 定时变更
+     */
+    SCAN,
+    /**
+     * 表行变更,比如mysql, 会获取变动行所有列的值
+     */
+    ROW;
+
+    public static boolean isDDL(ChangedEventTypeEnum event) {
+        return event != null && DDL == event;
+    }
+
+    public static boolean isSQL(ChangedEventTypeEnum event) {
+        return event != null && SQL == event;
+    }
+}

+ 15 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/ChangedEvent.java

@@ -3,6 +3,7 @@
  */
  */
 package org.dbsyncer.sdk.listener;
 package org.dbsyncer.sdk.listener;
 
 
+import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
 import org.dbsyncer.sdk.model.ChangedOffset;
 import org.dbsyncer.sdk.model.ChangedOffset;
 
 
 import java.util.Map;
 import java.util.Map;
@@ -16,6 +17,13 @@ import java.util.Map;
  */
  */
 public interface ChangedEvent {
 public interface ChangedEvent {
 
 
+    /**
+     * 获取变更事件类型
+     *
+     * @return
+     */
+    ChangedEventTypeEnum getType();
+
     /**
     /**
      * 获取变更表名称
      * 获取变更表名称
      *
      *
@@ -30,6 +38,13 @@ public interface ChangedEvent {
      */
      */
     String getEvent();
     String getEvent();
 
 
+    /**
+     * 获取变更SQL
+     *
+     * @return
+     */
+    String getSql();
+
     /**
     /**
      * 获取变更行数据
      * 获取变更行数据
      *
      *

+ 14 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/CommonChangedEvent.java

@@ -15,7 +15,7 @@ import java.util.Map;
  * @Author AE86
  * @Author AE86
  * @Date 2023-08-20 20:00
  * @Date 2023-08-20 20:00
  */
  */
-public class CommonChangedEvent implements ChangedEvent {
+public abstract class CommonChangedEvent implements ChangedEvent {
 
 
     /**
     /**
      * 变更表名称
      * 变更表名称
@@ -25,6 +25,10 @@ public class CommonChangedEvent implements ChangedEvent {
      * 变更事件
      * 变更事件
      */
      */
     private String event;
     private String event;
+    /**
+     * 变更sql
+     */
+    private String sql;
     /**
     /**
      * 变更行数据
      * 变更行数据
      */
      */
@@ -52,6 +56,15 @@ public class CommonChangedEvent implements ChangedEvent {
         this.event = event;
         this.event = event;
     }
     }
 
 
+    @Override
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
     @Override
     @Override
     public Map<String, Object> getChangedRow() {
     public Map<String, Object> getChangedRow() {
         return changedRow;
         return changedRow;

+ 6 - 8
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/DDLChangedEvent.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.sdk.listener.event;
 package org.dbsyncer.sdk.listener.event;
 
 
+import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
+
 /**
 /**
  * DDL变更事件
  * DDL变更事件
  *
  *
@@ -14,25 +16,21 @@ public final class DDLChangedEvent extends CommonChangedEvent {
       */
       */
     private String database;
     private String database;
 
 
-    /**
-     * 变更SQL
-     */
-    private String sql;
-
     public DDLChangedEvent(String database, String sourceTableName, String event, String sql, String nextFileName, Object position) {
     public DDLChangedEvent(String database, String sourceTableName, String event, String sql, String nextFileName, Object position) {
         setSourceTableName(sourceTableName);
         setSourceTableName(sourceTableName);
         setEvent(event);
         setEvent(event);
         setNextFileName(nextFileName);
         setNextFileName(nextFileName);
         setPosition(position);
         setPosition(position);
+        setSql(sql);
         this.database = database;
         this.database = database;
-        this.sql = sql;
     }
     }
 
 
     public String getDatabase() {
     public String getDatabase() {
         return database;
         return database;
     }
     }
 
 
-    public String getSql() {
-        return sql;
+    @Override
+    public ChangedEventTypeEnum getType() {
+        return ChangedEventTypeEnum.DDL;
     }
     }
 }
 }

+ 7 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/RowChangedEvent.java

@@ -3,6 +3,8 @@
  */
  */
 package org.dbsyncer.sdk.listener.event;
 package org.dbsyncer.sdk.listener.event;
 
 
+import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
+
 import java.util.List;
 import java.util.List;
 
 
 /**
 /**
@@ -12,7 +14,7 @@ import java.util.List;
  * @Author AE86
  * @Author AE86
  * @Date 2020-06-15 20:00
  * @Date 2020-06-15 20:00
  */
  */
-public final class RowChangedEvent extends CommonChangedEvent {
+public class RowChangedEvent extends CommonChangedEvent {
     private List<Object> dataList;
     private List<Object> dataList;
 
 
     public RowChangedEvent(String sourceTableName, String event, List<Object> data) {
     public RowChangedEvent(String sourceTableName, String event, List<Object> data) {
@@ -31,4 +33,8 @@ public final class RowChangedEvent extends CommonChangedEvent {
         return dataList;
         return dataList;
     }
     }
 
 
+    @Override
+    public ChangedEventTypeEnum getType() {
+        return ChangedEventTypeEnum.ROW;
+    }
 }
 }

+ 7 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/ScanChangedEvent.java

@@ -3,6 +3,8 @@
  */
  */
 package org.dbsyncer.sdk.listener.event;
 package org.dbsyncer.sdk.listener.event;
 
 
+import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
+
 import java.util.Map;
 import java.util.Map;
 
 
 /**
 /**
@@ -25,4 +27,9 @@ public final class ScanChangedEvent extends CommonChangedEvent {
     public int getTableGroupIndex() {
     public int getTableGroupIndex() {
         return tableGroupIndex;
         return tableGroupIndex;
     }
     }
+
+    @Override
+    public ChangedEventTypeEnum getType() {
+        return ChangedEventTypeEnum.SCAN;
+    }
 }
 }

+ 27 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/SqlChangedEvent.java

@@ -0,0 +1,27 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.sdk.listener.event;
+
+import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-09 20:34
+ */
+public final class SqlChangedEvent extends CommonChangedEvent {
+
+    public SqlChangedEvent(String sourceTableName, String event, String sql, String nextFileName, Object position) {
+        setSourceTableName(sourceTableName);
+        setEvent(event);
+        setNextFileName(nextFileName);
+        setPosition(position);
+        setSql(sql);
+    }
+
+    @Override
+    public ChangedEventTypeEnum getType() {
+        return ChangedEventTypeEnum.SQL;
+    }
+}