Prechádzať zdrojové kódy

logminer cdc使用回查

life 1 rok pred
rodič
commit
a3ccb9613e

+ 85 - 27
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java

@@ -3,6 +3,7 @@
  */
  */
 package org.dbsyncer.connector.oracle.cdc;
 package org.dbsyncer.connector.oracle.cdc;
 
 
+import java.sql.SQLException;
 import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.parser.CCJSqlParserUtil;
 import net.sf.jsqlparser.parser.CCJSqlParserUtil;
 import net.sf.jsqlparser.schema.Table;
 import net.sf.jsqlparser.schema.Table;
@@ -14,17 +15,22 @@ import net.sf.jsqlparser.statement.update.Update;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.oracle.OracleException;
 import org.dbsyncer.connector.oracle.OracleException;
 import org.dbsyncer.connector.oracle.logminer.LogMiner;
 import org.dbsyncer.connector.oracle.logminer.LogMiner;
+import org.dbsyncer.connector.oracle.logminer.RedoEvent;
+import org.dbsyncer.connector.oracle.logminer.parser.DeleteSql;
+import org.dbsyncer.connector.oracle.logminer.parser.InsertSql;
+import org.dbsyncer.connector.oracle.logminer.parser.Parser;
+import org.dbsyncer.connector.oracle.logminer.parser.UpdateSql;
 import org.dbsyncer.sdk.config.DatabaseConfig;
 import org.dbsyncer.sdk.config.DatabaseConfig;
+import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.listener.AbstractDatabaseListener;
 import org.dbsyncer.sdk.listener.AbstractDatabaseListener;
 import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
 import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
+import org.dbsyncer.sdk.listener.event.RowChangedEvent;
 import org.dbsyncer.sdk.listener.event.SqlChangedEvent;
 import org.dbsyncer.sdk.listener.event.SqlChangedEvent;
 import org.dbsyncer.sdk.model.ChangedOffset;
 import org.dbsyncer.sdk.model.ChangedOffset;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import java.sql.SQLException;
-
 /**
 /**
  * @Author AE86
  * @Author AE86
  * @Version 1.0.0
  * @Version 1.0.0
@@ -48,35 +54,17 @@ public class OracleListener extends AbstractDatabaseListener {
             String schema = config.getSchema();
             String schema = config.getSchema();
             boolean containsPos = snapshot.containsKey(REDO_POSITION);
             boolean containsPos = snapshot.containsKey(REDO_POSITION);
             logMiner = new LogMiner(username, password, url, schema, driverClassName);
             logMiner = new LogMiner(username, password, url, schema, driverClassName);
-            logMiner.setStartScn(containsPos ? Long.parseLong(snapshot.get(REDO_POSITION)) : 0);
+            if (!snapshot.get(REDO_POSITION).equals("null")){
+                logMiner.setStartScn(containsPos ? Long.parseLong(snapshot.get(REDO_POSITION)) : 0);
+            }
             logMiner.registerEventListener((event) -> {
             logMiner.registerEventListener((event) -> {
+//                sendSql(event);
                 try {
                 try {
-                    Statement statement = CCJSqlParserUtil.parse(event.getRedoSql());
-                    if (statement instanceof Update) {
-                        Update update = (Update) statement;
-                        sendChangedEvent(new SqlChangedEvent(replaceTableName(update.getTable()), ConnectorConstant.OPERTION_UPDATE, event.getRedoSql(), null, event.getScn()));
-                        return;
-                    }
-
-                    if (statement instanceof Insert) {
-                        Insert insert = (Insert) statement;
-                        sendChangedEvent(new SqlChangedEvent(replaceTableName(insert.getTable()), ConnectorConstant.OPERTION_INSERT, event.getRedoSql(), null, event.getScn()));
-                        return;
-                    }
-
-                    if (statement instanceof Delete) {
-                        Delete delete = (Delete) statement;
-                        sendChangedEvent(new SqlChangedEvent(replaceTableName(delete.getTable()), ConnectorConstant.OPERTION_DELETE, event.getRedoSql(), null, event.getScn()));
-                        return;
-                    }
-
-                    if (statement instanceof Alter) {
-                        Alter alter = (Alter) statement;
-                        sendChangedEvent(new DDLChangedEvent("", replaceTableName(alter.getTable()), ConnectorConstant.OPERTION_ALTER, event.getRedoSql(), null, event.getScn()));
-                        return;
-                    }
+                    parseSqlToPk(event);
                 } catch (JSQLParserException e) {
                 } catch (JSQLParserException e) {
                     logger.error("不支持sql:" + event.getRedoSql());
                     logger.error("不支持sql:" + event.getRedoSql());
+                }catch (Exception e){
+                    logger.error(e.getMessage());
                 }
                 }
             });
             });
             logMiner.start();
             logMiner.start();
@@ -86,6 +74,75 @@ public class OracleListener extends AbstractDatabaseListener {
         }
         }
     }
     }
 
 
+    //发送sql解析时间
+    private void sendSql(RedoEvent event){
+        try {
+            Statement statement = CCJSqlParserUtil.parse(event.getRedoSql());
+            if (statement instanceof Update) {
+                Update update = (Update) statement;
+                sendChangedEvent(new SqlChangedEvent(replaceTableName(update.getTable()), ConnectorConstant.OPERTION_UPDATE, event.getRedoSql(), null, event.getScn()));
+                return;
+            }
+
+            if (statement instanceof Insert) {
+                Insert insert = (Insert) statement;
+                sendChangedEvent(new SqlChangedEvent(replaceTableName(insert.getTable()), ConnectorConstant.OPERTION_INSERT, event.getRedoSql(), null, event.getScn()));
+                return;
+            }
+
+            if (statement instanceof Delete) {
+                Delete delete = (Delete) statement;
+                sendChangedEvent(new SqlChangedEvent(replaceTableName(delete.getTable()), ConnectorConstant.OPERTION_DELETE, event.getRedoSql(), null, event.getScn()));
+                return;
+            }
+
+            if (statement instanceof Alter) {
+                Alter alter = (Alter) statement;
+                sendChangedEvent(new DDLChangedEvent("", replaceTableName(alter.getTable()), ConnectorConstant.OPERTION_ALTER, event.getRedoSql(), null, event.getScn()));
+                return;
+            }
+        } catch (JSQLParserException e) {
+            logger.error("不支持sql:" + event.getRedoSql());
+        }catch (Exception e){
+            logger.error(e.getMessage());
+        }
+    }
+
+    //解析sql出来主键数据
+    private void parseSqlToPk(RedoEvent event) throws Exception {
+        Statement statement = CCJSqlParserUtil.parse(event.getRedoSql());
+        if (statement instanceof Insert){
+            Insert insert = (Insert) statement;
+            org.dbsyncer.sdk.model.Table table1 =sourceTable.stream()
+                    .filter(x->x.getName().equals(replaceTableName(insert.getTable())))
+                    .findFirst().orElse(null);
+            if (table1 != null){
+                Parser parser = new InsertSql(insert,table1.getColumn(),(DatabaseConnectorInstance) connectorInstance);
+                sendChangedEvent(new RowChangedEvent(replaceTableName(insert.getTable()), ConnectorConstant.OPERTION_INSERT,parser.parseSql() ,null,event.getScn()));
+            }
+
+        }else if (statement instanceof Update){
+            Update update = (Update) statement;
+            org.dbsyncer.sdk.model.Table table1 =sourceTable.stream()
+                    .filter(x->x.getName().equals(replaceTableName(update.getTable())))
+                    .findFirst().orElse(null);
+            if (table1 != null){
+                Parser parser = new UpdateSql(update,table1.getColumn(),(DatabaseConnectorInstance) connectorInstance);
+                sendChangedEvent(new RowChangedEvent(replaceTableName(update.getTable()), ConnectorConstant.OPERTION_UPDATE, parser.parseSql(),null,event.getScn()));
+            }
+        }else if (statement instanceof Delete){
+            Delete delete = (Delete) statement;
+            org.dbsyncer.sdk.model.Table table1 =sourceTable.stream()
+                    .filter(x->x.getName().equals(replaceTableName(delete.getTable())))
+                    .findFirst().orElse(null);
+            if (table1 !=null){
+                Parser parser = new DeleteSql(delete,table1.getColumn());
+                sendChangedEvent(new RowChangedEvent(replaceTableName(delete.getTable()), ConnectorConstant.OPERTION_DELETE, parser.parseSql(),null,event.getScn()));
+            }
+        }
+
+    }
+
     @Override
     @Override
     public void close() {
     public void close() {
         try {
         try {
@@ -109,4 +166,5 @@ public class OracleListener extends AbstractDatabaseListener {
         return StringUtil.replace(table.getName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY);
         return StringUtil.replace(table.getName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY);
     }
     }
 
 
+
 }
 }

+ 75 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/DeleteSql.java

@@ -0,0 +1,75 @@
+package org.dbsyncer.connector.oracle.logminer.parser;
+
+import java.math.BigInteger;
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import net.sf.jsqlparser.expression.BinaryExpression;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.schema.Column;
+import net.sf.jsqlparser.statement.delete.Delete;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.sdk.model.Field;
+
+/**
+ * @author : life
+ * @Description :
+ * @Param :
+ * @return :
+ * @date : 2023/12/14  14:58
+ */
+public class DeleteSql implements Parser {
+
+    Delete delete;
+
+    Map<String, String> cloumnMap;
+
+    List<Field> columns;
+
+    public DeleteSql(Delete delete,  List<Field> columns) {
+        this.delete = delete;
+        this.columns = columns;
+        cloumnMap = new HashMap<>();
+    }
+
+    @Override
+    public List<Object> parseSql() {
+        findColumn(delete.getWhere());
+        return columnMapToData();
+    }
+
+    private void findColumn(Expression expression) {
+        BinaryExpression binaryExpression = (BinaryExpression) expression;
+        if (binaryExpression.getLeftExpression() instanceof Column) {
+            Column column = (Column) binaryExpression.getLeftExpression();
+            String value = binaryExpression.getRightExpression().toString();
+            cloumnMap.put(StringUtil.replace(column.getColumnName(), StringUtil.DOUBLE_QUOTATION,
+                    StringUtil.EMPTY), value);
+            return;
+        }
+        findColumn(binaryExpression.getLeftExpression());
+        findColumn(binaryExpression.getRightExpression());
+    }
+
+    private List<Object> columnMapToData(){
+        List<Object> data = new LinkedList<>();
+        for (Field field : columns) {
+            if (field.isPk()) {
+                Object value = cloumnMap.get(field.getName());
+                switch (field.getType()) {
+                    case Types.DECIMAL:
+                        value = new BigInteger(
+                                StringUtil.replace(value.toString(), StringUtil.SINGLE_QUOTATION,
+                                        StringUtil.EMPTY));
+                }
+                data.add(value);
+            } else {
+                data.add(null);
+            }
+        }
+        return data;
+    }
+
+}

+ 79 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/InsertSql.java

@@ -0,0 +1,79 @@
+package org.dbsyncer.connector.oracle.logminer.parser;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
+import net.sf.jsqlparser.schema.Column;
+import net.sf.jsqlparser.statement.insert.Insert;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance;
+import org.dbsyncer.sdk.model.Field;
+
+public class InsertSql implements Parser{
+
+    Insert insert;
+
+    Map<String,String> cloumnMap;
+
+    List<Field> columns;
+
+    DatabaseConnectorInstance instance;
+
+    public InsertSql(Insert insert, List<Field> columns,
+            DatabaseConnectorInstance instance) {
+        this.insert = insert;
+        this.columns = columns;
+        this.instance = instance;
+        cloumnMap = new HashMap<>();
+    }
+
+    @Override
+    public List<Object> parseSql() {
+        List<Column> columns= insert.getColumns();
+        ExpressionList values = insert.getSelect().getValues().getExpressions();
+        for (int i = 0; i < columns.size(); i++) {
+            cloumnMap.put(StringUtil.replace(columns.get(i).getColumnName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY),values.get(i).toString());
+        }
+        String toSqlString = toSql(StringUtil.replace(insert.getTable().getName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY));
+        List<Object> data= ToResult(toSqlString);
+        return data;
+    }
+
+    private String toSql(String tableName){
+        StringBuilder sql = new StringBuilder("SELECT *");
+        sql.append(" FROM ").append(tableName).append(" WHERE ");
+        int pkCount = 0;
+        for (Field field: columns) {
+            if (field.isPk()){
+                String value = cloumnMap.get(field.getName());
+                if (StringUtil.isNotBlank(value)){
+                    if (pkCount > 0){
+                        sql.append(" AND ");
+                    }
+                    sql.append(field.getName()).append("=").append(value);
+                    pkCount ++;
+                }
+            }
+        }
+        return sql.toString();
+    }
+
+    //从主键解析出来的map装载成sql并运行sql找出对应的数据
+    private List<Object> ToResult(String sql){
+        List<Map<String,Object>> results = instance.execute(databaseTemplate -> databaseTemplate.queryForList(sql));
+        List<Object> list = new LinkedList<>();
+        if (!CollectionUtils.isEmpty(results)){
+            results.forEach(map->{
+                for (String key:map.keySet()) {
+                    list.add(map.get(key));
+                }
+            });
+            return list;
+        }
+        return Collections.EMPTY_LIST;
+    }
+}

+ 8 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/Parser.java

@@ -0,0 +1,8 @@
+package org.dbsyncer.connector.oracle.logminer.parser;
+
+import java.util.List;
+
+public interface Parser {
+
+    List<Object> parseSql();
+}

+ 93 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/UpdateSql.java

@@ -0,0 +1,93 @@
+package org.dbsyncer.connector.oracle.logminer.parser;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import net.sf.jsqlparser.expression.BinaryExpression;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.schema.Column;
+import net.sf.jsqlparser.statement.update.Update;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance;
+import org.dbsyncer.sdk.model.Field;
+
+public class UpdateSql implements Parser {
+
+    Update update;
+
+    Map<String, String> cloumnMap;
+
+    List<Field> columns;
+
+    DatabaseConnectorInstance instance;
+
+    public UpdateSql(Update update, List<Field> columns,
+            DatabaseConnectorInstance instance) {
+        this.update = update;
+        this.columns = columns;
+        this.instance = instance;
+        cloumnMap = new HashMap<>();
+    }
+
+    @Override
+    public List<Object> parseSql() {
+        findColumn(update.getWhere());
+        String toSqlString = toSql(
+                StringUtil.replace(update.getTable().getName(), StringUtil.DOUBLE_QUOTATION,
+                        StringUtil.EMPTY));
+        List<Object> data = ToResult(toSqlString);
+        return data;
+    }
+
+
+    private void findColumn(Expression expression) {
+        BinaryExpression binaryExpression = (BinaryExpression) expression;
+        if (binaryExpression.getLeftExpression() instanceof Column) {
+            Column column = (Column) binaryExpression.getLeftExpression();
+            String value = binaryExpression.getRightExpression().toString();
+            cloumnMap.put(StringUtil.replace(column.getColumnName(), StringUtil.DOUBLE_QUOTATION,
+                    StringUtil.EMPTY), value);
+            return;
+        }
+        findColumn(binaryExpression.getLeftExpression());
+        findColumn(binaryExpression.getRightExpression());
+    }
+
+    private String toSql(String tableName) {
+        StringBuilder sql = new StringBuilder("SELECT *");
+        sql.append(" FROM ").append(tableName).append(" WHERE ");
+        int pkCount = 0;
+        for (Field field : columns) {
+            if (field.isPk()) {
+                String value = cloumnMap.get(field.getName());
+                if (StringUtil.isNotBlank(value)) {
+                    if (pkCount > 0) {
+                        sql.append(" AND ");
+                    }
+                    sql.append(field.getName()).append("=").append(value);
+                    pkCount++;
+                }
+            }
+        }
+        return sql.toString();
+    }
+
+    //从主键解析出来的map装载成sql并运行sql找出对应的数据
+    private List<Object> ToResult(String sql) {
+        List<Map<String, Object>> results = instance.execute(
+                databaseTemplate -> databaseTemplate.queryForList(sql));
+        List<Object> list = new LinkedList<>();
+        if (!CollectionUtils.isEmpty(results)) {
+            results.forEach(map -> {
+                for (String key : map.keySet()) {
+                    list.add(map.get(key));
+                }
+            });
+            return list;
+        }
+        return Collections.EMPTY_LIST;
+    }
+}