AE86 1 год назад
Родитель
Сommit
2adacf6ffa

+ 10 - 6
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java

@@ -19,6 +19,7 @@ import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.listener.AbstractDatabaseListener;
 import org.dbsyncer.sdk.listener.event.DDLChangedEvent;
 import org.dbsyncer.sdk.listener.event.SqlChangedEvent;
+import org.dbsyncer.sdk.model.ChangedOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,12 +50,6 @@ public class OracleListener extends AbstractDatabaseListener {
             logMiner = new LogMiner(username, password, url, schema, driverClassName);
             logMiner.setStartScn(containsPos ? Long.parseLong(snapshot.get(REDO_POSITION)) : 0);
             logMiner.registerEventListener((event) -> {
-                if (snapshot.containsKey(REDO_POSITION)){
-                    snapshot.replace(REDO_POSITION,String.valueOf(event.getScn()));
-                }else{
-                    snapshot.putIfAbsent(REDO_POSITION,String.valueOf(event.getScn()));
-                }
-
                 try {
                     Statement statement = CCJSqlParserUtil.parse(event.getRedoSql());
                     if (statement instanceof Update) {
@@ -86,6 +81,10 @@ public class OracleListener extends AbstractDatabaseListener {
             });
             logMiner.start();
 
+            if (!containsPos) {
+                snapshot.put(REDO_POSITION, String.valueOf(logMiner.getStartScn()));
+                super.forceFlushEvent();
+            }
         } catch (Exception e) {
             logger.error("启动失败:{}", e.getMessage());
             throw new OracleException(e);
@@ -103,6 +102,11 @@ public class OracleListener extends AbstractDatabaseListener {
         }
     }
 
+    @Override
+    public void refreshEvent(ChangedOffset offset) {
+        snapshot.put(REDO_POSITION, String.valueOf(offset.getPosition()));
+    }
+
     private String replaceTableName(Table table) {
         if (table == null) {
             return StringUtil.EMPTY;

+ 4 - 1
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java

@@ -84,7 +84,6 @@ public class LogMiner {
 
         logger.info("scn start '{}'", startScn);
         logger.info("start LogMiner...");
-//        LogMinerHelper.resetSessionToCdbIfNecessary(connection);
         LogMinerHelper.setSessionParameter(connection);
 
         // 1.记录当前redoLog,用于下文判断redoLog 是否切换
@@ -309,6 +308,10 @@ public class LogMiner {
         this.listener = listener;
     }
 
+    public long getStartScn() {
+        return startScn;
+    }
+
     public void setStartScn(long startScn) {
         this.startScn = startScn;
     }

+ 3 - 28
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMinerHelper.java

@@ -3,7 +3,6 @@
  */
 package org.dbsyncer.connector.oracle.logminer;
 
-import org.apache.commons.lang3.StringUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,24 +50,6 @@ public class LogMinerHelper {
         }
     }
 
-    public static void resetSessionToCdbIfNecessary(Connection connection) {
-        Statement statement = null;
-        try {
-            statement = connection.createStatement();
-            statement.execute("alter session set container=cdb$root");
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
-        } finally {
-            if (statement != null) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                    LOGGER.error("Couldn't close statement", e);
-                }
-            }
-        }
-    }
-
     public static void executeCallableStatement(Connection connection, String statement) throws SQLException {
         Objects.requireNonNull(statement);
         try (CallableStatement s = connection.prepareCall(statement)) {
@@ -142,7 +123,7 @@ public class LogMinerHelper {
     }
 
     public static void buildDataDictionary(Connection connection, String miningStrategy) throws SQLException {
-        if (StringUtils.isBlank(miningStrategy)) {
+        if (StringUtil.isBlank(miningStrategy)) {
             // default
             String sql = "BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;";
             executeCallableStatement(connection, sql);
@@ -152,7 +133,7 @@ public class LogMinerHelper {
     public static void startLogMiner(Connection connection, BigInteger startScn, BigInteger endScn, String miningStrategy) throws SQLException {
         LOGGER.debug("startLogMiner... startScn {}, endScn {}", startScn, endScn);
         // default
-        if (StringUtils.isBlank(miningStrategy)) {
+        if (StringUtil.isBlank(miningStrategy)) {
             miningStrategy = "DBMS_LOGMNR.DICT_FROM_REDO_LOGS + DBMS_LOGMNR.DDL_DICT_TRACKING ";
         }
 
@@ -203,7 +184,7 @@ public class LogMinerHelper {
         query.append("(OPERATION_CODE IN (1,2,3) ");
         query.append(" AND SEG_OWNER NOT IN ('APPQOSSYS','AUDSYS','CTXSYS','DVSYS','DBSFWUSER','DBSNMP','GSMADMIN_INTERNAL','LBACSYS','MDSYS','OJVMSYS','OLAPSYS','ORDDATA','ORDSYS','OUTLN','SYS','SYSTEM','WMSYS','XDB') ");
 
-        if (StringUtils.isNotBlank(schema)) {
+        if (StringUtil.isNotBlank(schema)) {
             query.append(String.format(" AND (REGEXP_LIKE(SEG_OWNER,'^%s$','i')) ", schema));
 //            query.append(" AND ");
 //            query.append("USERNAME = '");
@@ -231,10 +212,4 @@ public class LogMinerHelper {
         executeCallableStatement(connection, "ALTER SESSION SET TIME_ZONE = '00:00'");
     }
 
-    public void executeWithoutCommitting(Connection connection, String sql) throws SQLException {
-        try (Statement statement = connection.createStatement()) {
-            statement.execute(sql);
-        }
-    }
-
 }

+ 2 - 2
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDatabaseConnector.java

@@ -671,8 +671,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
                 databaseTemplate.execute(DatabaseConstant.DBS_UNIQUE_CODE.concat(config.getSql()));
                 return true;
             });
-            Map<String,String> successMap = new HashMap<>();
-            successMap.put("sql",config.getSql());
+            Map<String, String> successMap = new HashMap<>();
+            successMap.put("sql", config.getSql());
             result.addSuccessData(Collections.singletonList(successMap));
         } catch (Exception e) {
             result.getError().append(String.format("执行ddl: %s, 异常:%s", config.getSql(), e.getMessage()));