Ver Fonte

fix bug

穿云 há 3 meses atrás
pai
commit
99b0f48cbe

+ 2 - 7
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/cdc/OracleListener.java

@@ -30,7 +30,6 @@ import org.dbsyncer.sdk.model.Field;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -150,12 +149,8 @@ public class OracleListener extends AbstractDatabaseListener {
 
     @Override
     public void close() {
-        try {
-            if (logMiner != null) {
-                logMiner.close();
-            }
-        } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
+        if (logMiner != null) {
+            logMiner.close();
         }
     }
 

+ 75 - 46
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java

@@ -9,7 +9,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -31,6 +36,10 @@ public class LogMiner {
     private final String schema;
     private final String driverClassName;
     private final String miningStrategy = "DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG";
+    /** LogMiner执行查询SQL的超时参数,单位秒 */
+    private final int queryTimeout = 300;
+    /** LogMiner从v$logmnr_contents视图中批量拉取条数,值越大,消费存量数据越快 */
+    private final int fetchSize = 1000;
     private volatile boolean connected = false;
     private Connection connection;
     private List<BigInteger> currentRedoLogSequences;
@@ -50,15 +59,17 @@ public class LogMiner {
         this.driverClassName = driverClassName;
     }
 
-    public void close() throws SQLException {
+    public void close() {
         connected = false;
+        closeQuietly();
+    }
+
+    private void closeQuietly() {
         if (null != worker && !worker.isInterrupted()) {
             worker.interrupt();
             worker = null;
         }
-        if (connection != null) {
-            connection.close();
-        }
+        close(connection);
     }
 
     public void start() throws SQLException {
@@ -80,8 +91,12 @@ public class LogMiner {
         }
     }
 
+    private Connection createConnection() throws SQLException {
+        return DatabaseUtil.getConnection(driverClassName, url, username, password);
+    }
+
     private void connect() throws SQLException {
-        this.connection = DatabaseUtil.getConnection(driverClassName, url, username, password);
+        this.connection = createConnection();
         // 判断是否第一次读取
         if (startScn == 0) {
             startScn = getCurrentScn(connection);
@@ -93,7 +108,7 @@ public class LogMiner {
         // 2.构建数据字典 && add redo / archived log
         initializeLogMiner();
         worker = new Worker();
-        worker.setName(new StringBuilder("log-miner-parser-").append(url).append("_").append(worker.hashCode()).toString());
+        worker.setName("log-miner-parser-" + url + "_" + worker.hashCode());
         worker.setDaemon(false);
         worker.start();
     }
@@ -102,13 +117,7 @@ public class LogMiner {
         logger.error("Connection interrupted, attempting to reconnect");
         while (connected) {
             try {
-                if (null != worker && !worker.isInterrupted()) {
-                    worker.interrupt();
-                    worker = null;
-                }
-                if (connection != null) {
-                    connection.close();
-                }
+                closeQuietly();
                 connect();
                 logger.info("Reconnect successfully");
                 break;
@@ -198,12 +207,6 @@ public class LogMiner {
             int operationCode = rs.getInt("OPERATION_CODE");
             Timestamp changeTime = rs.getTimestamp("TIMESTAMP");
             String txId = rs.getString("XID");
-            String operation = rs.getString("OPERATION");
-            String username = rs.getString("USERNAME");
-
-            logger.trace("Capture record, SCN:{}, TABLE_NAME:{}, SEG_OWNER:{}, OPERATION_CODE:{}, TIMESTAMP:{}, XID:{}, OPERATION:{}, USERNAME:{}",
-                    scn, tableName, segOwner, operationCode, changeTime, txId, operation, username);
-
             // Commit
             if (operationCode == LogMinerHelper.LOG_MINER_OC_COMMIT) {
                 // 将TransactionalBuffer中当前事务的DML 转移到消费者处理
@@ -309,14 +312,48 @@ public class LogMiner {
         return connected;
     }
 
+    // 判断连接是否正常
+    private boolean isValid() {
+        try {
+            return connection != null && connection.isValid(queryTimeout);
+        } catch (SQLException e) {
+            return false;
+        }
+    }
+
+    private void close(AutoCloseable closeable) {
+        if (null != closeable) {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                logger.error(e.getMessage());
+            }
+        }
+    }
+
+    /** 关闭数据库连接资源 */
+    private void closeResources(ResultSet rs, Statement stmt) {
+        close(rs);
+        close(stmt);
+    }
+
     final class Worker extends Thread {
 
         @Override
         public void run() {
             String minerViewQuery = LogMinerHelper.logMinerViewQuery(schema, username);
-            try (PreparedStatement minerViewStatement = connection.prepareStatement(minerViewQuery, ResultSet.TYPE_FORWARD_ONLY,
-                    ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT)) {
+            PreparedStatement statement = null;
+            ResultSet rs = null;
+            try {
                 while (!isInterrupted() && connected) {
+                    if (!isValid()) {
+                        connection = createConnection();
+                    }
+                    closeResources(rs, statement);
+                    statement = connection.prepareStatement(minerViewQuery, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
+                    statement.setFetchSize(fetchSize);
+                    statement.setFetchDirection(ResultSet.FETCH_FORWARD);
+                    statement.setQueryTimeout(queryTimeout);
                     // 1.确定 endScn
                     BigInteger endScn = determineEndScn();
 
@@ -331,39 +368,31 @@ public class LogMiner {
                     LogMinerHelper.startLogMiner(connection, BigInteger.valueOf(startScn), endScn, miningStrategy);
 
                     // 4.查询 logMiner view, 处理结果集
-                    minerViewStatement.setFetchSize(2000);
-                    minerViewStatement.setFetchDirection(ResultSet.FETCH_FORWARD);
-                    minerViewStatement.setString(1, String.valueOf(startScn));
-                    minerViewStatement.setString(2, endScn.toString());
-                    try (ResultSet rs = minerViewStatement.executeQuery()) {
-                        try {
-                            logMinerViewProcessor(rs);
-                        } catch (SQLException e) {
-                            if (e.getMessage().contains("ORA-00310")) {
-                                logger.error("ORA-00310 try continue");
-                                restartLogMiner();
-                                currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
-                                continue;
-                            }
-                            throw e;
+                    statement.setString(1, String.valueOf(startScn));
+                    statement.setString(2, endScn.toString());
+                    try {
+                        rs = statement.executeQuery();
+                        logMinerViewProcessor(rs);
+                    } catch (SQLException e) {
+                        if (e.getMessage().contains("ORA-00310")) {
+                            logger.error("ORA-00310 try continue");
+                            restartLogMiner();
+                            currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
+                            continue;
                         }
+                        throw e;
                     }
-
                     // 5.确定新的SCN
                     startScn = Long.parseLong(endScn.toString());
                     sleepSeconds(3);
                 }
             } catch (Exception e) {
-                if (e instanceof SQLRecoverableException) {
+                if (connected) {
+                    logger.error(e.getMessage(), e);
                     recover();
-                    return;
-                }
-                logger.error(e.getMessage(), e);
-                try {
-                    close();
-                } catch (SQLException ex) {
-                    logger.error(ex.getMessage(), ex);
                 }
+            } finally {
+                closeResources(rs, statement);
             }
         }
     }

+ 2 - 4
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMinerHelper.java

@@ -118,10 +118,8 @@ public class LogMinerHelper {
     }
 
     public static void addLogFile(Connection connection, String fileName) throws SQLException {
-        String addLogFile = "BEGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => '%s', OPTIONS => %s);END;";
-        String options = "DBMS_LOGMNR.ADDFILE";
-//        String options = "DBMS_LOGMNR.NEW";
-        executeCallableStatement(connection, String.format(addLogFile, fileName, options));
+        String addLogFile = "BEGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => '%s', OPTIONS => DBMS_LOGMNR.ADDFILE);END;";
+        executeCallableStatement(connection, String.format(addLogFile, fileName));
     }
 
     public static List<BigInteger> getCurrentRedoLogSequences(Connection connection) throws SQLException {