Browse Source

优化逻辑

穿云 4 months ago
parent
commit
faa65b2277

+ 32 - 25
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java

@@ -3,6 +3,7 @@
  */
 package org.dbsyncer.connector.oracle.logminer;
 
+import org.dbsyncer.connector.oracle.OracleException;
 import org.dbsyncer.sdk.util.DatabaseUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -23,17 +24,17 @@ import java.util.stream.Collectors;
 public class LogMiner {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
-    private Lock lock = new ReentrantLock();
-    private String username;
-    private String password;
-    private String url;
-    private String schema;
-    private String driverClassName;
-    private String miningStrategy = "DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG";
+    private final Lock lock = new ReentrantLock();
+    private final String username;
+    private final String password;
+    private final String url;
+    private final String schema;
+    private final String driverClassName;
+    private final String miningStrategy = "DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG";
     private volatile boolean connected = false;
     private Connection connection;
     private List<BigInteger> currentRedoLogSequences;
-    private TransactionalBuffer transactionalBuffer = new TransactionalBuffer();
+    private final TransactionalBuffer transactionalBuffer = new TransactionalBuffer();
     // 已提交的位点
     private Long committedScn = 0L;
     // 初始位点
@@ -50,7 +51,7 @@ public class LogMiner {
     }
 
     public void close() throws SQLException {
-        lock.lock();
+        connected = false;
         if (null != worker && !worker.isInterrupted()) {
             worker.interrupt();
             worker = null;
@@ -58,20 +59,25 @@ public class LogMiner {
         if (connection != null) {
             connection.close();
         }
-        connected = false;
-        lock.unlock();
     }
 
     public void start() throws SQLException {
-        lock.lock();
-        if (connected) {
-            logger.error("LogMiner is already started");
-            lock.unlock();
-            return;
+        boolean locked = false;
+        try {
+            locked = lock.tryLock(5, TimeUnit.SECONDS);
+            if (locked && !connected) {
+                connected = true;
+                connect();
+            } else {
+                logger.error("LogMiner is already started");
+            }
+        } catch (InterruptedException e) {
+            throw new OracleException(e);
+        } finally {
+            if (locked) {
+                lock.unlock();
+            }
         }
-        connected = true;
-        lock.unlock();
-        connect();
     }
 
     private void connect() throws SQLException {
@@ -80,8 +86,7 @@ public class LogMiner {
         if (startScn == 0) {
             startScn = getCurrentScn(connection);
         }
-        logger.info("scn start '{}'", startScn);
-        logger.info("start LogMiner...");
+        logger.info("start LogMiner, scn={}", startScn);
         LogMinerHelper.setSessionParameter(connection);
         // 1.记录当前redoLog,用于下文判断redoLog 是否切换
         currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
@@ -118,7 +123,9 @@ public class LogMiner {
         try {
             TimeUnit.SECONDS.sleep(seconds);
         } catch (InterruptedException e) {
-            logger.error(e.getMessage(), e);
+            if (connected) {
+                logger.error(e.getMessage(), e);
+            }
         }
     }
 
@@ -246,7 +253,7 @@ public class LogMiner {
                         if (counter == 0) {
                             updateCommittedScn(commitScn.longValue());
                         }
-                        event.setScn(startScn < committedScn ? committedScn:startScn);
+                        event.setScn(startScn < committedScn ? committedScn : startScn);
                         listener.onEvent(event);
                     };
                     transactionalBuffer.registerCommitCallback(txId, scn, commitCallback);
@@ -281,7 +288,7 @@ public class LogMiner {
         return redoBuilder.toString();
     }
 
-    public void registerEventListener(EventListener listener){
+    public void registerEventListener(EventListener listener) {
         this.listener = listener;
     }
 
@@ -298,7 +305,7 @@ public class LogMiner {
         void onEvent(RedoEvent redoEvent);
     }
 
-    public boolean isConnected(){
+    public boolean isConnected() {
         return connected;
     }
 

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/util/DatabaseUtil.java

@@ -31,7 +31,7 @@ public abstract class DatabaseUtil {
             try {
                 rs.close();
             } catch (Exception e) {
-                e.printStackTrace();
+                throw new SdkException(e);
             }
         }
     }