Browse Source

优化代码

AE86 1 year ago
parent
commit
1b8ee933c2
16 changed files with 256 additions and 312 deletions
  1. 1 1
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/cdc/ESQuartzListener.java
  2. 1 1
      dbsyncer-connector/dbsyncer-connector-file/src/main/java/org/dbsyncer/connector/file/cdc/FileListener.java
  3. 49 95
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java
  4. 55 55
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/AbstractParser.java
  5. 0 75
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/DeleteSql.java
  6. 0 79
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/InsertSql.java
  7. 6 1
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/Parser.java
  8. 49 0
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/impl/DeleteSql.java
  9. 37 0
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/impl/InsertSql.java
  10. 30 0
      dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/impl/UpdateSql.java
  11. 1 0
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/IncrementPuller.java
  12. 4 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractDatabaseListener.java
  13. 9 2
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractListener.java
  14. 4 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractQuartzListener.java
  15. 4 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/DatabaseQuartzListener.java
  16. 6 0
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/Listener.java

+ 1 - 1
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/cdc/ESQuartzListener.java

@@ -67,7 +67,7 @@ public final class ESQuartzListener extends AbstractQuartzListener {
                     }
 
                     // 读取历史增量点
-                    f.setValue(snapshot.get(key));
+                    f.setValue((String) snapshot.get(key));
                     point.setBeginKey(key);
                     point.setBeginValue(quartzFilter.toString(quartzFilter.getObject()));
                     continue;

+ 1 - 1
dbsyncer-connector/dbsyncer-connector-file/src/main/java/org/dbsyncer/connector/file/cdc/FileListener.java

@@ -107,7 +107,7 @@ public class FileListener extends AbstractListener {
             final RandomAccessFile raf = new BufferedRandomAccessFile(file, "r");
             final String filePosKey = getFilePosKey(fileName);
             if (snapshot.containsKey(filePosKey)) {
-                raf.seek(NumberUtil.toLong(snapshot.get(filePosKey), 0L));
+                raf.seek(NumberUtil.toLong((String) snapshot.get(filePosKey), 0L));
             } else {
                 raf.seek(raf.length());
                 snapshot.put(filePosKey, String.valueOf(raf.getFilePointer()));

+ 49 - 95
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java

@@ -3,34 +3,35 @@
  */
 package org.dbsyncer.connector.oracle.cdc;
 
-import java.sql.SQLException;
 import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.parser.CCJSqlParserUtil;
 import net.sf.jsqlparser.schema.Table;
 import net.sf.jsqlparser.statement.Statement;
-import net.sf.jsqlparser.statement.alter.Alter;
 import net.sf.jsqlparser.statement.delete.Delete;
 import net.sf.jsqlparser.statement.insert.Insert;
 import net.sf.jsqlparser.statement.update.Update;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.oracle.OracleException;
+import org.dbsyncer.connector.oracle.logminer.parser.AbstractParser;
 import org.dbsyncer.connector.oracle.logminer.LogMiner;
 import org.dbsyncer.connector.oracle.logminer.RedoEvent;
-import org.dbsyncer.connector.oracle.logminer.parser.DeleteSql;
-import org.dbsyncer.connector.oracle.logminer.parser.InsertSql;
-import org.dbsyncer.connector.oracle.logminer.parser.Parser;
-import org.dbsyncer.connector.oracle.logminer.parser.UpdateSql;
+import org.dbsyncer.connector.oracle.logminer.parser.impl.DeleteSql;
+import org.dbsyncer.connector.oracle.logminer.parser.impl.InsertSql;
+import org.dbsyncer.connector.oracle.logminer.parser.impl.UpdateSql;
 import org.dbsyncer.sdk.config.DatabaseConfig;
-import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.listener.AbstractDatabaseListener;
-import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
 import org.dbsyncer.sdk.listener.event.RowChangedEvent;
-import org.dbsyncer.sdk.listener.event.SqlChangedEvent;
 import org.dbsyncer.sdk.model.ChangedOffset;
+import org.dbsyncer.sdk.model.Field;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * @Author AE86
  * @Version 1.0.0
@@ -40,9 +41,14 @@ public class OracleListener extends AbstractDatabaseListener {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final String REDO_POSITION = "position";
-
+    private final Map<String, List<Field>> tableFiledMap = new ConcurrentHashMap<>();
     private LogMiner logMiner;
 
+    @Override
+    public void init() {
+        sourceTable.forEach(table -> tableFiledMap.put(table.getName(), table.getColumn()));
+    }
+
     @Override
     public void start() {
         try {
@@ -56,13 +62,12 @@ public class OracleListener extends AbstractDatabaseListener {
             logMiner = new LogMiner(username, password, url, schema, driverClassName);
             logMiner.setStartScn(containsPos ? Long.parseLong(snapshot.get(REDO_POSITION)) : 0);
             logMiner.registerEventListener((event) -> {
-//                sendSql(event);
                 try {
-                    parseSqlToPk(event);
+                    parseEvent(event);
                 } catch (JSQLParserException e) {
-                    logger.error("不支持sql:" + event.getRedoSql());
+                    logger.error("不支持sql:{}", event.getRedoSql());
                 } catch (Exception e) {
-                    logger.error(e.getMessage());
+                    logger.error(e.getMessage(), e);
                 }
             });
             logMiner.start();
@@ -72,89 +77,39 @@ public class OracleListener extends AbstractDatabaseListener {
         }
     }
 
-    //发送sql解析时间
-    private void sendSql(RedoEvent event) {
-        try {
-            Statement statement = CCJSqlParserUtil.parse(event.getRedoSql());
-            if (statement instanceof Update) {
-                Update update = (Update) statement;
-                sendChangedEvent(new SqlChangedEvent(replaceTableName(update.getTable()),
-                        ConnectorConstant.OPERTION_UPDATE, event.getRedoSql(), null,
-                        event.getScn()));
-                return;
-            }
-
-            if (statement instanceof Insert) {
-                Insert insert = (Insert) statement;
-                sendChangedEvent(new SqlChangedEvent(replaceTableName(insert.getTable()),
-                        ConnectorConstant.OPERTION_INSERT, event.getRedoSql(), null,
-                        event.getScn()));
-                return;
-            }
-
-            if (statement instanceof Delete) {
-                Delete delete = (Delete) statement;
-                sendChangedEvent(new SqlChangedEvent(replaceTableName(delete.getTable()),
-                        ConnectorConstant.OPERTION_DELETE, event.getRedoSql(), null,
-                        event.getScn()));
-                return;
-            }
-
-            if (statement instanceof Alter) {
-                Alter alter = (Alter) statement;
-                sendChangedEvent(new DDLChangedEvent("", replaceTableName(alter.getTable()),
-                        ConnectorConstant.OPERTION_ALTER, event.getRedoSql(), null,
-                        event.getScn()));
-                return;
-            }
-        } catch (JSQLParserException e) {
-            logger.error("不支持sql:" + event.getRedoSql());
-        } catch (Exception e) {
-            logger.error(e.getMessage());
+    /**
+     * 解析时间
+     *
+     * @param event
+     * @throws Exception
+     */
+    private void parseEvent(RedoEvent event) throws Exception {
+        // TODO life 注意拦截子查询, 或修改主键值情况
+        Statement statement = CCJSqlParserUtil.parse(event.getRedoSql());
+        if (statement instanceof Update) {
+            Update update = (Update) statement;
+            UpdateSql parser = new UpdateSql(update);
+            setTable(parser, update.getTable());
+            sendChangedEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_UPDATE, parser.parseColumns(), null, event.getScn()));
+            return;
         }
-    }
 
-    //解析sql出来主键数据
-    private void parseSqlToPk(RedoEvent event) throws Exception {
-        Statement statement = CCJSqlParserUtil.parse(event.getRedoSql());
         if (statement instanceof Insert) {
             Insert insert = (Insert) statement;
-            org.dbsyncer.sdk.model.Table table1 = sourceTable.stream()
-                    .filter(x -> x.getName().equals(replaceTableName(insert.getTable())))
-                    .findFirst().orElse(null);
-            if (table1 != null) {
-                Parser parser = new InsertSql(insert, table1.getColumn(),
-                        (DatabaseConnectorInstance) connectorInstance);
-                sendChangedEvent(new RowChangedEvent(replaceTableName(insert.getTable()),
-                        ConnectorConstant.OPERTION_INSERT, parser.parseSql(), null,
-                        event.getScn()));
-            }
+            InsertSql parser = new InsertSql(insert);
+            setTable(parser, insert.getTable());
+            sendChangedEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_INSERT, parser.parseColumns(), null, event.getScn()));
+            return;
+        }
 
-        } else if (statement instanceof Update) {
-            Update update = (Update) statement;
-            org.dbsyncer.sdk.model.Table table1 = sourceTable.stream()
-                    .filter(x -> x.getName().equals(replaceTableName(update.getTable())))
-                    .findFirst().orElse(null);
-            if (table1 != null) {
-                Parser parser = new UpdateSql(update, table1.getColumn(),
-                        (DatabaseConnectorInstance) connectorInstance);
-                sendChangedEvent(new RowChangedEvent(replaceTableName(update.getTable()),
-                        ConnectorConstant.OPERTION_UPDATE, parser.parseSql(), null,
-                        event.getScn()));
-            }
-        } else if (statement instanceof Delete) {
+        if (statement instanceof Delete) {
             Delete delete = (Delete) statement;
-            org.dbsyncer.sdk.model.Table table1 = sourceTable.stream()
-                    .filter(x -> x.getName().equals(replaceTableName(delete.getTable())))
-                    .findFirst().orElse(null);
-            if (table1 != null) {
-                Parser parser = new DeleteSql(delete, table1.getColumn());
-                sendChangedEvent(new RowChangedEvent(replaceTableName(delete.getTable()),
-                        ConnectorConstant.OPERTION_DELETE, parser.parseSql(), null,
-                        event.getScn()));
-            }
+            DeleteSql parser = new DeleteSql(delete);
+            setTable(parser, delete.getTable());
+            sendChangedEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_DELETE, parser.parseColumns(), null, event.getScn()));
         }
 
+        // TODO ddl
     }
 
     @Override
@@ -173,12 +128,11 @@ public class OracleListener extends AbstractDatabaseListener {
         snapshot.put(REDO_POSITION, String.valueOf(offset.getPosition()));
     }
 
-    private String replaceTableName(Table table) {
-        if (table == null) {
-            return StringUtil.EMPTY;
-        }
-        return StringUtil.replace(table.getName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY);
+    private AbstractParser setTable(AbstractParser parser, Table table) {
+        parser.setTableName(table == null ? StringUtil.EMPTY : StringUtil.replace(table.getName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY));
+        parser.setFields(tableFiledMap.get(parser.getTableName()));
+        parser.setInstance(getConnectorInstance());
+        return parser;
     }
 
-
 }

+ 55 - 55
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/UpdateSql.java → dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/AbstractParser.java

@@ -1,68 +1,67 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.connector.oracle.logminer.parser;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import net.sf.jsqlparser.expression.BinaryExpression;
 import net.sf.jsqlparser.expression.Expression;
 import net.sf.jsqlparser.schema.Column;
-import net.sf.jsqlparser.statement.update.Update;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance;
 import org.dbsyncer.sdk.model.Field;
 
-public class UpdateSql implements Parser {
-
-    Update update;
-
-    Map<String, String> cloumnMap;
-
-    List<Field> columns;
-
-    DatabaseConnectorInstance instance;
-
-    public UpdateSql(Update update, List<Field> columns,
-            DatabaseConnectorInstance instance) {
-        this.update = update;
-        this.columns = columns;
-        this.instance = instance;
-        cloumnMap = new HashMap<>();
-    }
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
-    @Override
-    public List<Object> parseSql() {
-        findColumn(update.getWhere());
-        String toSqlString = toSql(
-                StringUtil.replace(update.getTable().getName(), StringUtil.DOUBLE_QUOTATION,
-                        StringUtil.EMPTY));
-        List<Object> data = ToResult(toSqlString);
-        return data;
-    }
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-14 22:39
+ */
+public abstract class AbstractParser implements Parser {
 
+    protected Map<String, String> columnMap = new HashMap<>();
+    protected String tableName;
+    protected List<Field> fields;
+    protected DatabaseConnectorInstance instance;
 
-    private void findColumn(Expression expression) {
+    public void findColumn(Expression expression) {
         BinaryExpression binaryExpression = (BinaryExpression) expression;
         if (binaryExpression.getLeftExpression() instanceof Column) {
             Column column = (Column) binaryExpression.getLeftExpression();
             String value = binaryExpression.getRightExpression().toString();
-            cloumnMap.put(StringUtil.replace(column.getColumnName(), StringUtil.DOUBLE_QUOTATION,
-                    StringUtil.EMPTY), value);
+            columnMap.put(StringUtil.replace(column.getColumnName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY), value);
             return;
         }
         findColumn(binaryExpression.getLeftExpression());
         findColumn(binaryExpression.getRightExpression());
     }
 
-    private String toSql(String tableName) {
-        StringBuilder sql = new StringBuilder("SELECT *");
-        sql.append(" FROM ").append(tableName).append(" WHERE ");
+    // 从主键解析出来的map装载成sql并运行sql找出对应的数据
+    public List<Object> getColumnsFromDB() {
+        List<Map<String, Object>> rows = instance.execute(databaseTemplate -> databaseTemplate.queryForList(getRowByPk()));
+        List<Object> list = new LinkedList<>();
+        if (!CollectionUtils.isEmpty(rows)) {
+            rows.forEach(map -> {
+                for (String key : map.keySet()) {
+                    list.add(map.get(key));
+                }
+            });
+            return list;
+        }
+        return Collections.EMPTY_LIST;
+    }
+
+    private String getRowByPk() {
+        StringBuilder sql = new StringBuilder("SELECT * FROM ").append(getTableName()).append(" WHERE ");
         int pkCount = 0;
-        for (Field field : columns) {
+        for (Field field : fields) {
             if (field.isPk()) {
-                String value = cloumnMap.get(field.getName());
+                String value = columnMap.get(field.getName());
                 if (StringUtil.isNotBlank(value)) {
                     if (pkCount > 0) {
                         sql.append(" AND ");
@@ -75,19 +74,20 @@ public class UpdateSql implements Parser {
         return sql.toString();
     }
 
-    //从主键解析出来的map装载成sql并运行sql找出对应的数据
-    private List<Object> ToResult(String sql) {
-        List<Map<String, Object>> results = instance.execute(
-                databaseTemplate -> databaseTemplate.queryForList(sql));
-        List<Object> list = new LinkedList<>();
-        if (!CollectionUtils.isEmpty(results)) {
-            results.forEach(map -> {
-                for (String key : map.keySet()) {
-                    list.add(map.get(key));
-                }
-            });
-            return list;
-        }
-        return Collections.EMPTY_LIST;
+    @Override
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public void setFields(List<Field> fields) {
+        this.fields = fields;
+    }
+
+    public void setInstance(DatabaseConnectorInstance instance) {
+        this.instance = instance;
     }
-}
+}

+ 0 - 75
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/DeleteSql.java

@@ -1,75 +0,0 @@
-package org.dbsyncer.connector.oracle.logminer.parser;
-
-import java.math.BigInteger;
-import java.sql.Types;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import net.sf.jsqlparser.expression.BinaryExpression;
-import net.sf.jsqlparser.expression.Expression;
-import net.sf.jsqlparser.schema.Column;
-import net.sf.jsqlparser.statement.delete.Delete;
-import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.sdk.model.Field;
-
-/**
- * @author : life
- * @Description :
- * @Param :
- * @return :
- * @date : 2023/12/14  14:58
- */
-public class DeleteSql implements Parser {
-
-    Delete delete;
-
-    Map<String, String> cloumnMap;
-
-    List<Field> columns;
-
-    public DeleteSql(Delete delete,  List<Field> columns) {
-        this.delete = delete;
-        this.columns = columns;
-        cloumnMap = new HashMap<>();
-    }
-
-    @Override
-    public List<Object> parseSql() {
-        findColumn(delete.getWhere());
-        return columnMapToData();
-    }
-
-    private void findColumn(Expression expression) {
-        BinaryExpression binaryExpression = (BinaryExpression) expression;
-        if (binaryExpression.getLeftExpression() instanceof Column) {
-            Column column = (Column) binaryExpression.getLeftExpression();
-            String value = binaryExpression.getRightExpression().toString();
-            cloumnMap.put(StringUtil.replace(column.getColumnName(), StringUtil.DOUBLE_QUOTATION,
-                    StringUtil.EMPTY), value);
-            return;
-        }
-        findColumn(binaryExpression.getLeftExpression());
-        findColumn(binaryExpression.getRightExpression());
-    }
-
-    private List<Object> columnMapToData(){
-        List<Object> data = new LinkedList<>();
-        for (Field field : columns) {
-            if (field.isPk()) {
-                Object value = cloumnMap.get(field.getName());
-                switch (field.getType()) {
-                    case Types.DECIMAL:
-                        value = new BigInteger(
-                                StringUtil.replace(value.toString(), StringUtil.SINGLE_QUOTATION,
-                                        StringUtil.EMPTY));
-                }
-                data.add(value);
-            } else {
-                data.add(null);
-            }
-        }
-        return data;
-    }
-
-}

+ 0 - 79
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/InsertSql.java

@@ -1,79 +0,0 @@
-package org.dbsyncer.connector.oracle.logminer.parser;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
-import net.sf.jsqlparser.schema.Column;
-import net.sf.jsqlparser.statement.insert.Insert;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance;
-import org.dbsyncer.sdk.model.Field;
-
-public class InsertSql implements Parser{
-
-    Insert insert;
-
-    Map<String,String> cloumnMap;
-
-    List<Field> columns;
-
-    DatabaseConnectorInstance instance;
-
-    public InsertSql(Insert insert, List<Field> columns,
-            DatabaseConnectorInstance instance) {
-        this.insert = insert;
-        this.columns = columns;
-        this.instance = instance;
-        cloumnMap = new HashMap<>();
-    }
-
-    @Override
-    public List<Object> parseSql() {
-        List<Column> columns= insert.getColumns();
-        ExpressionList values = insert.getSelect().getValues().getExpressions();
-        for (int i = 0; i < columns.size(); i++) {
-            cloumnMap.put(StringUtil.replace(columns.get(i).getColumnName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY),values.get(i).toString());
-        }
-        String toSqlString = toSql(StringUtil.replace(insert.getTable().getName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY));
-        List<Object> data= ToResult(toSqlString);
-        return data;
-    }
-
-    private String toSql(String tableName){
-        StringBuilder sql = new StringBuilder("SELECT *");
-        sql.append(" FROM ").append(tableName).append(" WHERE ");
-        int pkCount = 0;
-        for (Field field: columns) {
-            if (field.isPk()){
-                String value = cloumnMap.get(field.getName());
-                if (StringUtil.isNotBlank(value)){
-                    if (pkCount > 0){
-                        sql.append(" AND ");
-                    }
-                    sql.append(field.getName()).append("=").append(value);
-                    pkCount ++;
-                }
-            }
-        }
-        return sql.toString();
-    }
-
-    //从主键解析出来的map装载成sql并运行sql找出对应的数据
-    private List<Object> ToResult(String sql){
-        List<Map<String,Object>> results = instance.execute(databaseTemplate -> databaseTemplate.queryForList(sql));
-        List<Object> list = new LinkedList<>();
-        if (!CollectionUtils.isEmpty(results)){
-            results.forEach(map->{
-                for (String key:map.keySet()) {
-                    list.add(map.get(key));
-                }
-            });
-            return list;
-        }
-        return Collections.EMPTY_LIST;
-    }
-}

+ 6 - 1
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/Parser.java

@@ -1,8 +1,13 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.connector.oracle.logminer.parser;
 
 import java.util.List;
 
 public interface Parser {
 
-    List<Object> parseSql();
+    String getTableName();
+
+    List<Object> parseColumns();
 }

+ 49 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/impl/DeleteSql.java

@@ -0,0 +1,49 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.connector.oracle.logminer.parser.impl;
+
+import net.sf.jsqlparser.statement.delete.Delete;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.oracle.logminer.parser.AbstractParser;
+import org.dbsyncer.sdk.model.Field;
+
+import java.math.BigInteger;
+import java.sql.Types;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * @Author life
+ * @Version 1.0.0
+ * @Date 2023-12-14 14:58
+ */
+public class DeleteSql extends AbstractParser {
+
+    private Delete delete;
+
+    public DeleteSql(Delete delete) {
+        this.delete = delete;
+    }
+
+    @Override
+    public List<Object> parseColumns() {
+        findColumn(delete.getWhere());
+        List<Object> data = new LinkedList<>();
+        for (Field field : fields) {
+            if (field.isPk()) {
+                Object value = columnMap.get(field.getName());
+                switch (field.getType()) {
+                    case Types.DECIMAL:
+                        value = new BigInteger(StringUtil.replace(value.toString(), StringUtil.SINGLE_QUOTATION, StringUtil.EMPTY));
+                        break;
+                }
+                data.add(value);
+            } else {
+                data.add(null);
+            }
+        }
+        return data;
+    }
+
+}

+ 37 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/impl/InsertSql.java

@@ -0,0 +1,37 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.connector.oracle.logminer.parser.impl;
+
+import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
+import net.sf.jsqlparser.schema.Column;
+import net.sf.jsqlparser.statement.insert.Insert;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.oracle.logminer.parser.AbstractParser;
+
+import java.util.List;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-14 14:58
+ */
+public class InsertSql extends AbstractParser {
+
+    private Insert insert;
+
+    public InsertSql(Insert insert) {
+        this.insert = insert;
+    }
+
+    @Override
+    public List<Object> parseColumns() {
+        List<Column> columns = insert.getColumns();
+        ExpressionList values = insert.getSelect().getValues().getExpressions();
+        for (int i = 0; i < columns.size(); i++) {
+            columnMap.put(StringUtil.replace(columns.get(i).getColumnName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY), values.get(i).toString());
+        }
+        return getColumnsFromDB();
+    }
+
+}

+ 30 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/impl/UpdateSql.java

@@ -0,0 +1,30 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.connector.oracle.logminer.parser.impl;
+
+import net.sf.jsqlparser.statement.update.Update;
+import org.dbsyncer.connector.oracle.logminer.parser.AbstractParser;
+
+import java.util.List;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2023-12-14 14:58
+ */
+public class UpdateSql extends AbstractParser {
+
+    private Update update;
+
+    public UpdateSql(Update update) {
+        this.update = update;
+    }
+
+    @Override
+    public List<Object> parseColumns() {
+        findColumn(update.getWhere());
+        return getColumnsFromDB();
+    }
+
+}

+ 1 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/IncrementPuller.java

@@ -184,6 +184,7 @@ public final class IncrementPuller extends AbstractPuller implements Application
             abstractListener.setMetaId(meta.getId());
         }
 
+        listener.init();
         return listener;
     }
 

+ 4 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractDatabaseListener.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.sdk.listener;
 
 import org.dbsyncer.common.util.CollectionUtils;
@@ -23,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * @version 1.0.0
  * @date 2022/5/29 21:46
  */
-public abstract class AbstractDatabaseListener extends AbstractListener {
+public abstract class AbstractDatabaseListener extends AbstractListener<DatabaseConnectorInstance> {
 
     /**
      * 自定义SQL,支持1对多

+ 9 - 2
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractListener.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.sdk.listener;
 
 import org.dbsyncer.common.util.CollectionUtils;
@@ -24,7 +27,7 @@ import java.util.concurrent.TimeUnit;
  * @Author AE86
  * @Date 2020-05-25 22:35
  */
-public abstract class AbstractListener implements Listener {
+public abstract class AbstractListener<C extends ConnectorInstance> implements Listener {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final int FLUSH_DELAYED_SECONDS = 20;
@@ -54,7 +57,7 @@ public abstract class AbstractListener implements Listener {
                     break;
                 case ConnectorConstant.OPERTION_INSERT:
                     // 是否支持监听新增事件
-                    processEvent(!listenerConfig.isBanInsert(), event);
+                     processEvent(!listenerConfig.isBanInsert(), event);
                     break;
                 case ConnectorConstant.OPERTION_DELETE:
                     // 是否支持监听删除事件
@@ -127,6 +130,10 @@ public abstract class AbstractListener implements Listener {
         this.connectorService = connectorService;
     }
 
+    public C getConnectorInstance() {
+        return (C) connectorInstance;
+    }
+
     public void setScheduledTaskService(ScheduledTaskService scheduledTaskService) {
         this.scheduledTaskService = scheduledTaskService;
     }

+ 4 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractQuartzListener.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.sdk.listener;
 
 import org.dbsyncer.common.model.Result;
@@ -105,7 +108,7 @@ public abstract class AbstractQuartzListener extends AbstractListener implements
         // 检查增量点
         Point point = checkLastPoint(command, index);
         int pageIndex = 1;
-        Object[] cursors = PrimaryKeyUtil.getLastCursors(snapshot.get(index + CURSOR));
+        Object[] cursors = PrimaryKeyUtil.getLastCursors((String) snapshot.get(index + CURSOR));
 
         while (running) {
             ReaderConfig readerConfig = new ReaderConfig(table, point.getCommand(), point.getArgs(), supportedCursor, cursors, pageIndex++, READ_NUM);

+ 4 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/DatabaseQuartzListener.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.sdk.listener;
 
 import org.dbsyncer.common.util.CollectionUtils;
@@ -79,7 +82,7 @@ public final class DatabaseQuartzListener extends AbstractQuartzListener {
                 }
 
                 // 读取历史增量点
-                Object val = f.getObject(snapshot.get(key));
+                Object val = f.getObject((String) snapshot.get(key));
                 point.addArg(val);
                 point.setBeginKey(key);
                 point.setBeginValue(f.toString(f.getObject()));

+ 6 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/Listener.java

@@ -13,6 +13,12 @@ import org.dbsyncer.sdk.model.ChangedOffset;
  * @Date 2023-11-21 22:48
  */
 public interface Listener {
+
+    /**
+     * 初始化
+     */
+    default void init(){};
+
     /**
      * 启动定时/日志抽取任务
      */