浏览代码

修复Oracle重连问题

穿云 5 月之前
父节点
当前提交
e1c55b04c7

+ 93 - 74
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java

@@ -3,7 +3,6 @@
  */
 package org.dbsyncer.connector.oracle.logminer;
 
-import org.apache.commons.lang3.time.StopWatch;
 import org.dbsyncer.sdk.util.DatabaseUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +31,6 @@ public class LogMiner {
     private String driverClassName;
     private String miningStrategy = "DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG";
     private volatile boolean connected = false;
-    private final StopWatch stopWatch = StopWatch.create();
     private Connection connection;
     private List<BigInteger> currentRedoLogSequences;
     private TransactionalBuffer transactionalBuffer = new TransactionalBuffer();
@@ -41,6 +39,7 @@ public class LogMiner {
     // 初始位点
     private long startScn = 0;
     private EventListener listener;
+    private Worker worker;
 
     public LogMiner(String username, String password, String url, String schema, String driverClassName) {
         this.username = username;
@@ -52,12 +51,13 @@ public class LogMiner {
 
     public void close() throws SQLException {
         lock.lock();
-        if (!connected) {
-            logger.error("LogMiner is already stop");
-            lock.unlock();
-            return;
+        if (null != worker && !worker.isInterrupted()) {
+            worker.interrupt();
+            worker = null;
+        }
+        if (connection != null) {
+            connection.close();
         }
-        this.connection.close();
         connected = false;
         lock.unlock();
     }
@@ -69,93 +69,54 @@ public class LogMiner {
             lock.unlock();
             return;
         }
-        this.connection = DatabaseUtil.getConnection(driverClassName, url, username, password);
         connected = true;
         lock.unlock();
-        //get current scn 判断是否第一次没有存储
+        connect();
+    }
+
+    private void connect() throws SQLException {
+        this.connection = DatabaseUtil.getConnection(driverClassName, url, username, password);
+        // 判断是否第一次读取
         if (startScn == 0) {
             startScn = getCurrentScn(connection);
         }
-
         logger.info("scn start '{}'", startScn);
         logger.info("start LogMiner...");
         LogMinerHelper.setSessionParameter(connection);
-
         // 1.记录当前redoLog,用于下文判断redoLog 是否切换
         currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
-
         // 2.构建数据字典 && add redo / archived log
         initializeLogMiner();
+        worker = new Worker();
+        worker.setName(new StringBuilder("log-miner-parser-").append(url).append("_").append(worker.hashCode()).toString());
+        worker.setDaemon(false);
+        worker.start();
+    }
 
-        String minerViewQuery = LogMinerHelper.logMinerViewQuery(schema, username);
-        logger.debug(minerViewQuery);
-
-        try (PreparedStatement minerViewStatement = connection.prepareStatement(minerViewQuery, ResultSet.TYPE_FORWARD_ONLY,
-                ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT)) {
-            // while
-            while (connected) {
-                // 3.确定 endScn
-                BigInteger endScn = determineEndScn();
-
-                // 4.是否发生redoLog切换
-                if (redoLogSwitchOccurred()) {
-                    // 如果切换则重启logMiner会话
-                    logger.debug("restart LogMiner Session");
-                    restartLogMiner();
-                    currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
+    private void recover() {
+        logger.error("Connection interrupted, attempting to reconnect");
+        while (connected) {
+            try {
+                if (null != worker && !worker.isInterrupted()) {
+                    worker.interrupt();
+                    worker = null;
                 }
-
-                // 5.start logMiner
-                LogMinerHelper.startLogMiner(connection, BigInteger.valueOf(startScn), endScn, miningStrategy);
-
-                // 6.查询 logMiner view, 处理结果集
-                minerViewStatement.setFetchSize(2000);
-                minerViewStatement.setFetchDirection(ResultSet.FETCH_FORWARD);
-                minerViewStatement.setString(1, String.valueOf(startScn));
-                minerViewStatement.setString(2, endScn.toString());
-
-                stopWatch.start();
-
-                try (ResultSet rs = minerViewStatement.executeQuery()) {
-                    logger.trace("Query V$LOGMNR_CONTENTS spend time {} ms", stopWatch.getTime(TimeUnit.MILLISECONDS));
-                    stopWatch.reset();
-                    try{
-                        logMinerViewProcessor(rs);
-                    }catch (SQLException e){
-                        if (e.getMessage().contains("ORA-00310")){
-                            logger.error("ORA-00310 try continue");
-                            restartLogMiner();
-                            currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
-                            continue;
-                        }
-                        throw e;
-                    }
+                if (connection != null) {
+                    connection.close();
                 }
-
-                // 7.确定新的SCN
-                startScn = Long.parseLong(endScn.toString());
-                sleepFiveSeconds();
-            }
-
-        } catch (Exception e) {
-            if (e instanceof SQLRecoverableException) {
-                logger.error("Connection timed out, attempting to reconnect in 5 seconds");
-                reConnect();
-                return;
+                connect();
+                logger.info("Reconnect successfully");
+                break;
+            } catch (Exception e) {
+                logger.error(url, e);
+                sleepSeconds(5);
             }
-            logger.error(e.getMessage(), e);
         }
     }
 
-    private void reConnect() throws SQLException {
-        connected = false;
-        sleepFiveSeconds();
-        start();
-    }
-
-    private void sleepFiveSeconds() {
+    private void sleepSeconds(int seconds) {
         try {
-            TimeUnit.SECONDS.sleep(5);
+            TimeUnit.SECONDS.sleep(seconds);
         } catch (InterruptedException e) {
             logger.error(e.getMessage(), e);
         }
@@ -341,4 +302,62 @@ public class LogMiner {
         return connected;
     }
 
+    final class Worker extends Thread {
+
+        @Override
+        public void run() {
+            String minerViewQuery = LogMinerHelper.logMinerViewQuery(schema, username);
+            try (PreparedStatement minerViewStatement = connection.prepareStatement(minerViewQuery, ResultSet.TYPE_FORWARD_ONLY,
+                    ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT)) {
+                while (!isInterrupted() && connected) {
+                    // 1.确定 endScn
+                    BigInteger endScn = determineEndScn();
+
+                    // 2.是否发生redoLog切换
+                    if (redoLogSwitchOccurred()) {
+                        // 如果切换则重启logMiner会话
+                        restartLogMiner();
+                        currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
+                    }
+
+                    // 3.start logMiner
+                    LogMinerHelper.startLogMiner(connection, BigInteger.valueOf(startScn), endScn, miningStrategy);
+
+                    // 4.查询 logMiner view, 处理结果集
+                    minerViewStatement.setFetchSize(2000);
+                    minerViewStatement.setFetchDirection(ResultSet.FETCH_FORWARD);
+                    minerViewStatement.setString(1, String.valueOf(startScn));
+                    minerViewStatement.setString(2, endScn.toString());
+                    try (ResultSet rs = minerViewStatement.executeQuery()) {
+                        try {
+                            logMinerViewProcessor(rs);
+                        } catch (SQLException e) {
+                            if (e.getMessage().contains("ORA-00310")) {
+                                logger.error("ORA-00310 try continue");
+                                restartLogMiner();
+                                currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
+                                continue;
+                            }
+                            throw e;
+                        }
+                    }
+
+                    // 5.确定新的SCN
+                    startScn = Long.parseLong(endScn.toString());
+                    sleepSeconds(3);
+                }
+            } catch (Exception e) {
+                if (e instanceof SQLRecoverableException) {
+                    recover();
+                    return;
+                }
+                logger.error(e.getMessage(), e);
+                try {
+                    close();
+                } catch (SQLException ex) {
+                    logger.error(ex.getMessage(), ex);
+                }
+            }
+        }
+    }
 }