浏览代码

Merge remote-tracking branch 'origin/v_2.0' into v_2.0

AE86 1 年之前
父节点
当前提交
f1b6f37df4

+ 32 - 4
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java

@@ -3,6 +3,8 @@
  */
 package org.dbsyncer.connector.oracle.cdc;
 
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
 import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.parser.CCJSqlParserUtil;
 import net.sf.jsqlparser.schema.Table;
@@ -10,6 +12,7 @@ import net.sf.jsqlparser.statement.Statement;
 import net.sf.jsqlparser.statement.delete.Delete;
 import net.sf.jsqlparser.statement.insert.Insert;
 import net.sf.jsqlparser.statement.update.Update;
+import org.dbsyncer.common.QueueOverflowException;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.oracle.OracleException;
 import org.dbsyncer.connector.oracle.logminer.parser.AbstractParser;
@@ -78,6 +81,32 @@ public class OracleListener extends AbstractDatabaseListener {
         }
     }
 
+    private void trySendEvent(RowChangedEvent event){
+        try {
+            // 如果消费事件失败,重试
+            long now = Instant.now().toEpochMilli();
+            boolean isReTry = false;
+            while (logMiner.isConnected()){
+                try {
+                    sendChangedEvent(event);
+                    break;
+                } catch (QueueOverflowException e) {
+                    isReTry = true;
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(1);
+                    } catch (InterruptedException ex) {
+                        logger.error(ex.getMessage(), ex);
+                    }
+                }
+            }
+            if (isReTry) {
+                logger.info("重试耗时:{}ms", Instant.now().toEpochMilli() - now);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
     /**
      * 解析事件
      *
@@ -91,7 +120,7 @@ public class OracleListener extends AbstractDatabaseListener {
             Update update = (Update) statement;
             UpdateSql parser = new UpdateSql(update);
             setTable(parser, update.getTable());
-            sendChangedEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_UPDATE, parser.parseColumns(), null, event.getScn()));
+            trySendEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_UPDATE, parser.parseColumns(), null, event.getScn()));
             return;
         }
 
@@ -99,7 +128,7 @@ public class OracleListener extends AbstractDatabaseListener {
             Insert insert = (Insert) statement;
             InsertSql parser = new InsertSql(insert);
             setTable(parser, insert.getTable());
-            sendChangedEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_INSERT, parser.parseColumns(), null, event.getScn()));
+            trySendEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_INSERT, parser.parseColumns(), null, event.getScn()));
             return;
         }
 
@@ -107,7 +136,7 @@ public class OracleListener extends AbstractDatabaseListener {
             Delete delete = (Delete) statement;
             DeleteSql parser = new DeleteSql(delete);
             setTable(parser, delete.getTable());
-            sendChangedEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_DELETE, parser.parseColumns(), null, event.getScn()));
+            trySendEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_DELETE, parser.parseColumns(), null, event.getScn()));
         }
 
         // TODO ddl
@@ -132,7 +161,6 @@ public class OracleListener extends AbstractDatabaseListener {
     private AbstractParser setTable(AbstractParser parser, Table table) {
         parser.setTableName(table == null ? StringUtil.EMPTY : StringUtil.replace(table.getName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY));
         parser.setFields(tableFiledMap.get(parser.getTableName()));
-        parser.setInstance(getConnectorInstance());
         return parser;
     }
 

+ 16 - 4
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java

@@ -124,7 +124,17 @@ public class LogMiner {
                 try (ResultSet rs = minerViewStatement.executeQuery()) {
                     logger.trace("Query V$LOGMNR_CONTENTS spend time {} ms", stopWatch.getTime(TimeUnit.MILLISECONDS));
                     stopWatch.reset();
-                    logMinerViewProcessor(rs);
+                    try{
+                        logMinerViewProcessor(rs);
+                    }catch (SQLException e){
+                        if (e.getMessage().contains("ORA-00310")){
+                            logger.error("ORA-00310 try continue");
+                            restartLogMiner();
+                            currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
+                            continue;
+                        }
+                        throw e;
+                    }
                 }
 
                 // 7.确定新的SCN
@@ -264,12 +274,10 @@ public class LogMiner {
                             // 当前SCN 事务已经提交 并且 小于事务缓冲区中所有的开始SCN,所以可以更新offsetScn
                             startScn = scn.longValue();
                         }
-
                         if (counter == 0) {
                             updateCommittedScn(commitScn.longValue());
                         }
-
-                        event.setScn(committedScn);
+                        event.setScn(startScn < committedScn ? committedScn:startScn);
                         listener.onEvent(event);
                     };
                     transactionalBuffer.registerCommitCallback(txId, scn, commitCallback);
@@ -321,4 +329,8 @@ public class LogMiner {
         void onEvent(RedoEvent redoEvent);
     }
 
+    public boolean isConnected(){
+        return connected;
+    }
+
 }

+ 39 - 41
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/AbstractParser.java

@@ -3,20 +3,19 @@
  */
 package org.dbsyncer.connector.oracle.logminer.parser;
 
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import net.sf.jsqlparser.expression.BinaryExpression;
 import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import net.sf.jsqlparser.expression.NullValue;
+import net.sf.jsqlparser.expression.operators.relational.IsNullExpression;
 import net.sf.jsqlparser.schema.Column;
-import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance;
 import org.dbsyncer.sdk.model.Field;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
 /**
  * @Author AE86
  * @Version 1.0.0
@@ -27,51 +26,53 @@ public abstract class AbstractParser implements Parser {
     protected Map<String, String> columnMap = new HashMap<>();
     protected String tableName;
     protected List<Field> fields;
-    protected DatabaseConnectorInstance instance;
 
     public void findColumn(Expression expression) {
+        if (expression instanceof IsNullExpression){
+            IsNullExpression isNullExpression = (IsNullExpression) expression;
+            Column column = (Column) isNullExpression.getLeftExpression();
+            columnMap.put(StringUtil.replace(column.getColumnName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY),
+                    null);
+            return;
+        }
+
         BinaryExpression binaryExpression = (BinaryExpression) expression;
         if (binaryExpression.getLeftExpression() instanceof Column) {
             Column column = (Column) binaryExpression.getLeftExpression();
-            String value = binaryExpression.getRightExpression().toString();
-            columnMap.put(StringUtil.replace(column.getColumnName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY), value);
+            columnMap.put(StringUtil.replace(column.getColumnName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY),
+                    parserValue(binaryExpression.getRightExpression()));
             return;
         }
         findColumn(binaryExpression.getLeftExpression());
         findColumn(binaryExpression.getRightExpression());
     }
 
-    // 从主键解析出来的map装载成sql并运行sql找出对应的数据
-    public List<Object> getColumnsFromDB() {
-        List<Map<String, Object>> rows = instance.execute(databaseTemplate -> databaseTemplate.queryForList(getRowByPk()));
-        List<Object> list = new LinkedList<>();
-        if (!CollectionUtils.isEmpty(rows)) {
-            rows.forEach(map -> {
-                for (String key : map.keySet()) {
-                    list.add(map.get(key));
-                }
-            });
-            return list;
+    public String parserValue(Expression expression){
+        if (expression instanceof Function){
+            return parserFunction((Function) expression);
         }
-        return Collections.EMPTY_LIST;
+        if (expression instanceof NullValue){
+            return null;
+        }
+        return expression.toString();
     }
 
-    private String getRowByPk() {
-        StringBuilder sql = new StringBuilder("SELECT * FROM ").append(getTableName()).append(" WHERE ");
-        int pkCount = 0;
-        for (Field field : fields) {
-            if (field.isPk()) {
-                String value = columnMap.get(field.getName());
-                if (StringUtil.isNotBlank(value)) {
-                    if (pkCount > 0) {
-                        sql.append(" AND ");
-                    }
-                    sql.append(field.getName()).append("=").append(value);
-                    pkCount++;
-                }
-            }
+    //解析sql的function,只取到关键的字符串
+    public String parserFunction(Function function){
+        if (function.getMultipartName().get(0).equals("TO_DATE")){
+            return StringUtil.replace(function.getParameters().get(0).toString(),StringUtil.SINGLE_QUOTATION,StringUtil.EMPTY);
         }
-        return sql.toString();
+        return "";
+    }
+
+
+    public List<Object> columnMapToData(){
+        List<Object> data = new LinkedList<>();
+        for (Field field: fields) {
+            Object value = OracleTypeParser.convertToJavaType(field,columnMap.get(field.getName()));
+            data.add(value);
+        }
+        return data;
     }
 
     @Override
@@ -87,7 +88,4 @@ public abstract class AbstractParser implements Parser {
         this.fields = fields;
     }
 
-    public void setInstance(DatabaseConnectorInstance instance) {
-        this.instance = instance;
-    }
 }

+ 34 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/OracleTypeParser.java

@@ -0,0 +1,34 @@
+package org.dbsyncer.connector.oracle.logminer.parser;
+
+import java.math.BigInteger;
+import java.sql.Types;
+import org.dbsyncer.common.util.DateFormatUtil;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.sdk.model.Field;
+
+/**
+ * @Description :oracle解析到java类
+ * @Param :
+ * @return :
+ * @author : life
+ * @date : 2023/12/15  11:25
+ */
+public class OracleTypeParser {
+    public static Object convertToJavaType(Field field,String value){
+        if (value == null){
+            return null;
+        }
+        switch (field.getType()) {
+            case Types.DECIMAL:
+                return new BigInteger(
+                        StringUtil.replace(value, StringUtil.SINGLE_QUOTATION, StringUtil.EMPTY));
+            case Types.TIMESTAMP:
+                return DateFormatUtil.stringToTimestamp(StringUtil.replace(value,StringUtil.SINGLE_QUOTATION,StringUtil.EMPTY));
+            default:
+                return StringUtil.replace(value,StringUtil.SINGLE_QUOTATION,StringUtil.EMPTY);
+        }
+    }
+
+
+
+}

+ 6 - 5
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/impl/InsertSql.java

@@ -3,14 +3,14 @@
  */
 package org.dbsyncer.connector.oracle.logminer.parser.impl;
 
+import java.util.List;
+import net.sf.jsqlparser.expression.Expression;
 import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
 import net.sf.jsqlparser.schema.Column;
 import net.sf.jsqlparser.statement.insert.Insert;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.oracle.logminer.parser.AbstractParser;
 
-import java.util.List;
-
 /**
  * @Author AE86
  * @Version 1.0.0
@@ -27,11 +27,12 @@ public class InsertSql extends AbstractParser {
     @Override
     public List<Object> parseColumns() {
         List<Column> columns = insert.getColumns();
-        ExpressionList values = insert.getSelect().getValues().getExpressions();
+        ExpressionList<Expression> values = (ExpressionList<Expression>) insert.getSelect().getValues().getExpressions();
         for (int i = 0; i < columns.size(); i++) {
-            columnMap.put(StringUtil.replace(columns.get(i).getColumnName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY), values.get(i).toString());
+            columnMap.put(StringUtil.replace(columns.get(i).getColumnName(), StringUtil.DOUBLE_QUOTATION, StringUtil.EMPTY),
+                    parserValue(values.get(i)));
         }
-        return getColumnsFromDB();
+        return columnMapToData();
     }
 
 }

+ 14 - 1
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/impl/UpdateSql.java

@@ -4,6 +4,8 @@
 package org.dbsyncer.connector.oracle.logminer.parser.impl;
 
 import net.sf.jsqlparser.statement.update.Update;
+import net.sf.jsqlparser.statement.update.UpdateSet;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.oracle.logminer.parser.AbstractParser;
 
 import java.util.List;
@@ -24,7 +26,18 @@ public class UpdateSql extends AbstractParser {
     @Override
     public List<Object> parseColumns() {
         findColumn(update.getWhere());
-        return getColumnsFromDB();
+        passerSet(update.getUpdateSets());
+        return columnMapToData();
+    }
+
+    private void passerSet(List<UpdateSet> updateSets){
+        //解析替换
+        for (UpdateSet updateSet:updateSets) {
+            String columnName = StringUtil.replace(updateSet.getColumn(0).getColumnName(),
+                    StringUtil.DOUBLE_QUOTATION,StringUtil.EMPTY);
+            String value = parserValue(updateSet.getValue(0));
+            columnMap.put(columnName,value);
+        }
     }
 
 }