AE86 2 vuotta sitten
vanhempi
säilyke
64e8be94e4

+ 1 - 16
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/LsnPuller.java

@@ -2,7 +2,6 @@ package org.dbsyncer.listener.sqlserver;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.util.LinkedCaseInsensitiveMap;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -57,8 +56,6 @@ public class LsnPuller {
 
     final class Worker extends Thread {
 
-        private final Map<String, Lsn> maxLsnSnapshot = new LinkedCaseInsensitiveMap<>();
-
         @Override
         public void run() {
             while (!isInterrupted()) {
@@ -67,10 +64,9 @@ public class LsnPuller {
                         TimeUnit.SECONDS.sleep(1);
                         continue;
                     }
-                    maxLsnSnapshot.clear();
                     Lsn maxLsn = null;
                     for (SqlServerExtractor extractor : map.values()) {
-                        maxLsn = getMaxLsn(maxLsnSnapshot, extractor);
+                        maxLsn = extractor.getMaxLsn();
                         if (null != maxLsn && maxLsn.isAvailable() && maxLsn.compareTo(extractor.getLastLsn()) > 0) {
                             extractor.pushStopLsn(maxLsn);
                         }
@@ -89,15 +85,4 @@ public class LsnPuller {
 
     }
 
-    private Lsn getMaxLsn(Map<String, Lsn> maxLsnSnapshot, SqlServerExtractor extractor) {
-        final String url = extractor.getDatabaseConfigUrl();
-        if (maxLsnSnapshot.containsKey(url)) {
-            return maxLsnSnapshot.get(url);
-        }
-
-        Lsn maxLsn = extractor.getMaxLsn();
-        maxLsnSnapshot.put(url, maxLsn);
-        return maxLsn;
-    }
-
 }

+ 7 - 10
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -315,7 +315,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
         return (T) execute;
     }
 
-    public Lsn getMaxLsn(){
+    public Lsn getMaxLsn() {
         return queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
     }
 
@@ -327,7 +327,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
                 try {
                     Lsn stopLsn = stopLsnQueue.take();
                     Lsn poll;
-                    while((poll = stopLsnQueue.poll()) != null){
+                    while ((poll = stopLsnQueue.poll()) != null) {
                         stopLsn = poll;
                     }
                     if (!stopLsn.isAvailable() || stopLsn.compareTo(lastLsn) <= 0) {
@@ -339,24 +339,21 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
                     lastLsn = stopLsn;
                     snapshot.put(LSN_POSITION, lastLsn.toString());
                 } catch (Exception e) {
-                    logger.error(e.getMessage());
-                    sleepInMills(1000L);
+                    if (connected) {
+                        logger.error(e.getMessage(), e);
+                        sleepInMills(1000L);
+                    }
                 }
             }
         }
     }
 
-    public String getDatabaseConfigUrl(){
-        DatabaseConfig config = (DatabaseConfig) connectorConfig;
-        return config.getUrl();
-    }
-
     public Lsn getLastLsn() {
         return lastLsn;
     }
 
     public void pushStopLsn(Lsn stopLsn) {
-        if(stopLsnQueue.contains(stopLsn)){
+        if (stopLsnQueue.contains(stopLsn)) {
             return;
         }
         stopLsnQueue.offer(stopLsn);