Browse Source

修改时间放置scn过多错误

life 1 year ago
parent
commit
4b909acd6e

+ 42 - 28
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java

@@ -54,16 +54,14 @@ public class OracleListener extends AbstractDatabaseListener {
             String schema = config.getSchema();
             boolean containsPos = snapshot.containsKey(REDO_POSITION);
             logMiner = new LogMiner(username, password, url, schema, driverClassName);
-            if (!snapshot.get(REDO_POSITION).equals("null")){
-                logMiner.setStartScn(containsPos ? Long.parseLong(snapshot.get(REDO_POSITION)) : 0);
-            }
+            logMiner.setStartScn(containsPos ? Long.parseLong(snapshot.get(REDO_POSITION)) : 0);
             logMiner.registerEventListener((event) -> {
 //                sendSql(event);
                 try {
                     parseSqlToPk(event);
                 } catch (JSQLParserException e) {
                     logger.error("不支持sql:" + event.getRedoSql());
-                }catch (Exception e){
+                } catch (Exception e) {
                     logger.error(e.getMessage());
                 }
             });
@@ -75,35 +73,43 @@ public class OracleListener extends AbstractDatabaseListener {
     }
 
     //发送sql解析时间
-    private void sendSql(RedoEvent event){
+    private void sendSql(RedoEvent event) {
         try {
             Statement statement = CCJSqlParserUtil.parse(event.getRedoSql());
             if (statement instanceof Update) {
                 Update update = (Update) statement;
-                sendChangedEvent(new SqlChangedEvent(replaceTableName(update.getTable()), ConnectorConstant.OPERTION_UPDATE, event.getRedoSql(), null, event.getScn()));
+                sendChangedEvent(new SqlChangedEvent(replaceTableName(update.getTable()),
+                        ConnectorConstant.OPERTION_UPDATE, event.getRedoSql(), null,
+                        event.getScn()));
                 return;
             }
 
             if (statement instanceof Insert) {
                 Insert insert = (Insert) statement;
-                sendChangedEvent(new SqlChangedEvent(replaceTableName(insert.getTable()), ConnectorConstant.OPERTION_INSERT, event.getRedoSql(), null, event.getScn()));
+                sendChangedEvent(new SqlChangedEvent(replaceTableName(insert.getTable()),
+                        ConnectorConstant.OPERTION_INSERT, event.getRedoSql(), null,
+                        event.getScn()));
                 return;
             }
 
             if (statement instanceof Delete) {
                 Delete delete = (Delete) statement;
-                sendChangedEvent(new SqlChangedEvent(replaceTableName(delete.getTable()), ConnectorConstant.OPERTION_DELETE, event.getRedoSql(), null, event.getScn()));
+                sendChangedEvent(new SqlChangedEvent(replaceTableName(delete.getTable()),
+                        ConnectorConstant.OPERTION_DELETE, event.getRedoSql(), null,
+                        event.getScn()));
                 return;
             }
 
             if (statement instanceof Alter) {
                 Alter alter = (Alter) statement;
-                sendChangedEvent(new DDLChangedEvent("", replaceTableName(alter.getTable()), ConnectorConstant.OPERTION_ALTER, event.getRedoSql(), null, event.getScn()));
+                sendChangedEvent(new DDLChangedEvent("", replaceTableName(alter.getTable()),
+                        ConnectorConstant.OPERTION_ALTER, event.getRedoSql(), null,
+                        event.getScn()));
                 return;
             }
         } catch (JSQLParserException e) {
             logger.error("不支持sql:" + event.getRedoSql());
-        }catch (Exception e){
+        } catch (Exception e) {
             logger.error(e.getMessage());
         }
     }
@@ -111,33 +117,41 @@ public class OracleListener extends AbstractDatabaseListener {
     //解析sql出来主键数据
     private void parseSqlToPk(RedoEvent event) throws Exception {
         Statement statement = CCJSqlParserUtil.parse(event.getRedoSql());
-        if (statement instanceof Insert){
+        if (statement instanceof Insert) {
             Insert insert = (Insert) statement;
-            org.dbsyncer.sdk.model.Table table1 =sourceTable.stream()
-                    .filter(x->x.getName().equals(replaceTableName(insert.getTable())))
+            org.dbsyncer.sdk.model.Table table1 = sourceTable.stream()
+                    .filter(x -> x.getName().equals(replaceTableName(insert.getTable())))
                     .findFirst().orElse(null);
-            if (table1 != null){
-                Parser parser = new InsertSql(insert,table1.getColumn(),(DatabaseConnectorInstance) connectorInstance);
-                sendChangedEvent(new RowChangedEvent(replaceTableName(insert.getTable()), ConnectorConstant.OPERTION_INSERT,parser.parseSql() ,null,event.getScn()));
+            if (table1 != null) {
+                Parser parser = new InsertSql(insert, table1.getColumn(),
+                        (DatabaseConnectorInstance) connectorInstance);
+                sendChangedEvent(new RowChangedEvent(replaceTableName(insert.getTable()),
+                        ConnectorConstant.OPERTION_INSERT, parser.parseSql(), null,
+                        event.getScn()));
             }
 
-        }else if (statement instanceof Update){
+        } else if (statement instanceof Update) {
             Update update = (Update) statement;
-            org.dbsyncer.sdk.model.Table table1 =sourceTable.stream()
-                    .filter(x->x.getName().equals(replaceTableName(update.getTable())))
+            org.dbsyncer.sdk.model.Table table1 = sourceTable.stream()
+                    .filter(x -> x.getName().equals(replaceTableName(update.getTable())))
                     .findFirst().orElse(null);
-            if (table1 != null){
-                Parser parser = new UpdateSql(update,table1.getColumn(),(DatabaseConnectorInstance) connectorInstance);
-                sendChangedEvent(new RowChangedEvent(replaceTableName(update.getTable()), ConnectorConstant.OPERTION_UPDATE, parser.parseSql(),null,event.getScn()));
+            if (table1 != null) {
+                Parser parser = new UpdateSql(update, table1.getColumn(),
+                        (DatabaseConnectorInstance) connectorInstance);
+                sendChangedEvent(new RowChangedEvent(replaceTableName(update.getTable()),
+                        ConnectorConstant.OPERTION_UPDATE, parser.parseSql(), null,
+                        event.getScn()));
             }
-        }else if (statement instanceof Delete){
+        } else if (statement instanceof Delete) {
             Delete delete = (Delete) statement;
-            org.dbsyncer.sdk.model.Table table1 =sourceTable.stream()
-                    .filter(x->x.getName().equals(replaceTableName(delete.getTable())))
+            org.dbsyncer.sdk.model.Table table1 = sourceTable.stream()
+                    .filter(x -> x.getName().equals(replaceTableName(delete.getTable())))
                     .findFirst().orElse(null);
-            if (table1 !=null){
-                Parser parser = new DeleteSql(delete,table1.getColumn());
-                sendChangedEvent(new RowChangedEvent(replaceTableName(delete.getTable()), ConnectorConstant.OPERTION_DELETE, parser.parseSql(),null,event.getScn()));
+            if (table1 != null) {
+                Parser parser = new DeleteSql(delete, table1.getColumn());
+                sendChangedEvent(new RowChangedEvent(replaceTableName(delete.getTable()),
+                        ConnectorConstant.OPERTION_DELETE, parser.parseSql(), null,
+                        event.getScn()));
             }
         }
 

+ 1 - 1
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java

@@ -132,7 +132,7 @@ public class LogMiner {
 
                 try {
                     // 避免频繁的执行导致 PGA 内存超出 PGA_AGGREGATE_LIMIT
-                    TimeUnit.SECONDS.sleep(2);
+                    TimeUnit.SECONDS.sleep(5);
                 } catch (InterruptedException e) {
                     logger.error(e.getMessage(), e);
                 }