life 1 rok temu
rodzic
commit
8f05f755eb

+ 32 - 3
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java

@@ -3,6 +3,8 @@
  */
 package org.dbsyncer.connector.oracle.cdc;
 
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
 import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.parser.CCJSqlParserUtil;
 import net.sf.jsqlparser.schema.Table;
@@ -10,6 +12,7 @@ import net.sf.jsqlparser.statement.Statement;
 import net.sf.jsqlparser.statement.delete.Delete;
 import net.sf.jsqlparser.statement.insert.Insert;
 import net.sf.jsqlparser.statement.update.Update;
+import org.dbsyncer.common.QueueOverflowException;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.oracle.OracleException;
 import org.dbsyncer.connector.oracle.logminer.parser.AbstractParser;
@@ -78,6 +81,32 @@ public class OracleListener extends AbstractDatabaseListener {
         }
     }
 
+    private void trySendEvent(RowChangedEvent event){
+        try {
+            // 如果消费事件失败,重试
+            long now = Instant.now().toEpochMilli();
+            boolean isReTry = false;
+            while (logMiner.isConnected()){
+                try {
+                    sendChangedEvent(event);
+                    break;
+                } catch (QueueOverflowException e) {
+                    isReTry = true;
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(1);
+                    } catch (InterruptedException ex) {
+                        logger.error(ex.getMessage(), ex);
+                    }
+                }
+            }
+            if (isReTry) {
+                logger.info("重试耗时:{}ms", Instant.now().toEpochMilli() - now);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
     /**
      * 解析事件
      *
@@ -91,7 +120,7 @@ public class OracleListener extends AbstractDatabaseListener {
             Update update = (Update) statement;
             UpdateSql parser = new UpdateSql(update);
             setTable(parser, update.getTable());
-            sendChangedEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_UPDATE, parser.parseColumns(), null, event.getScn()));
+            trySendEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_UPDATE, parser.parseColumns(), null, event.getScn()));
             return;
         }
 
@@ -99,7 +128,7 @@ public class OracleListener extends AbstractDatabaseListener {
             Insert insert = (Insert) statement;
             InsertSql parser = new InsertSql(insert);
             setTable(parser, insert.getTable());
-            sendChangedEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_INSERT, parser.parseColumns(), null, event.getScn()));
+            trySendEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_INSERT, parser.parseColumns(), null, event.getScn()));
             return;
         }
 
@@ -107,7 +136,7 @@ public class OracleListener extends AbstractDatabaseListener {
             Delete delete = (Delete) statement;
             DeleteSql parser = new DeleteSql(delete);
             setTable(parser, delete.getTable());
-            sendChangedEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_DELETE, parser.parseColumns(), null, event.getScn()));
+            trySendEvent(new RowChangedEvent(parser.getTableName(), ConnectorConstant.OPERTION_DELETE, parser.parseColumns(), null, event.getScn()));
         }
 
         // TODO ddl

+ 4 - 0
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java

@@ -321,4 +321,8 @@ public class LogMiner {
         void onEvent(RedoEvent redoEvent);
     }
 
+    public boolean isConnected(){
+        return connected;
+    }
+
 }

+ 6 - 6
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/parser/AbstractParser.java

@@ -10,10 +10,10 @@ import java.util.Map;
 import net.sf.jsqlparser.expression.BinaryExpression;
 import net.sf.jsqlparser.expression.Expression;
 import net.sf.jsqlparser.expression.Function;
+import net.sf.jsqlparser.expression.NullValue;
 import net.sf.jsqlparser.expression.operators.relational.IsNullExpression;
 import net.sf.jsqlparser.schema.Column;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance;
 import org.dbsyncer.sdk.model.Field;
 
 /**
@@ -48,13 +48,13 @@ public abstract class AbstractParser implements Parser {
     }
 
     public String parserValue(Expression expression){
-        String value = "";
         if (expression instanceof Function){
-            value = parserFunction((Function) expression);
-        }else{
-            value = expression.toString();
+            return parserFunction((Function) expression);
         }
-        return value;
+        if (expression instanceof NullValue){
+            return null;
+        }
+        return expression.toString();
     }
 
     //解析sql的function,只取到关键的字符串