AE86 2 年之前
父节点
当前提交
04d80d3ee2

+ 3 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -52,11 +52,11 @@ public class ConnectorFactory implements DisposableBean {
         }
         ConnectorMapper connectorMapper = connectorCache.get(cacheKey);
         try {
-            ConnectorMapper clone = (ConnectorMapper)connectorMapper.clone();
+            ConnectorMapper clone = (ConnectorMapper) connectorMapper.clone();
             clone.setConfig(config);
             return clone;
         } catch (CloneNotSupportedException e) {
-            throw new RuntimeException(e);
+            throw new ConnectorException(e);
         }
     }
 
@@ -153,7 +153,7 @@ public class ConnectorFactory implements DisposableBean {
 
     public Result writer(ConnectorMapper connectorMapper, WriterBatchConfig config) {
         Connector connector = getConnector(connectorMapper);
-        if(connector instanceof AbstractConnector){
+        if (connector instanceof AbstractConnector) {
             AbstractConnector conn = (AbstractConnector) connector;
             try {
                 conn.convertProcessBeforeWriter(connectorMapper, config);

+ 36 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorMapper.java

@@ -13,21 +13,55 @@ import org.dbsyncer.connector.config.ConnectorConfig;
  */
 public interface ConnectorMapper<K, V> extends Cloneable {
 
+    /**
+     * 获取连接配置
+     *
+     * @return
+     */
     default ConnectorConfig getOriginalConfig() {
         return (ConnectorConfig) getConfig();
     }
 
+    /**
+     * 获取连接器类型
+     *
+     * @return
+     */
     default String getConnectorType() {
         return getOriginalConfig().getConnectorType();
     }
 
+    /**
+     * 获取连接配置
+     *
+     * @return
+     */
     K getConfig();
 
+    /**
+     * 设置
+     * @param k
+     */
     void setConfig(K k);
 
+    /**
+     * 获取连接通道实例
+     *
+     * @return
+     * @throws Exception
+     */
     V getConnection() throws Exception;
 
+    /**
+     * 关闭连接器
+     */
     void close();
 
-    Object clone() throws CloneNotSupportedException ;
-}
+    /**
+     * 浅拷贝连接器
+     *
+     * @return
+     * @throws CloneNotSupportedException
+     */
+    Object clone() throws CloneNotSupportedException;
+}

+ 29 - 34
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/LsnPuller.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.listener.sqlserver;
 
-import org.dbsyncer.connector.config.DatabaseConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.LinkedCaseInsensitiveMap;
@@ -18,20 +17,21 @@ import java.util.concurrent.TimeUnit;
 public class LsnPuller {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    /**
+     * 间隔拉取最新LSN时间(毫秒)
+     */
     private static final long DEFAULT_POLL_INTERVAL_MILLIS = 100;
-    private static final long WAIT_INTERVAL_MILLIS = 1000;
-    private Worker worker;
     private static volatile LsnPuller instance = null;
+    private final Map<String, SqlServerExtractor> map = new ConcurrentHashMap<>();
+    private Worker worker;
 
-    private Map<String, SqlServerExtractor> map = new ConcurrentHashMap<>();
-
-    private LsnPuller(){
+    private LsnPuller() {
         initWorker();
     }
 
-    private static LsnPuller getInstance(){
-        if(instance == null){
-            synchronized (LsnPuller.class){
+    private static LsnPuller getInstance() {
+        if (instance == null) {
+            synchronized (LsnPuller.class) {
                 if (instance == null) {
                     instance = new LsnPuller();
                 }
@@ -40,7 +40,6 @@ public class LsnPuller {
         return instance;
     }
 
-
     private void initWorker() {
         worker = new Worker();
         worker.setName("cdc-LsnPuller");
@@ -48,51 +47,36 @@ public class LsnPuller {
         worker.start();
     }
 
-    public static void addExtractor(String metaId, SqlServerExtractor extractor){
+    public static void addExtractor(String metaId, SqlServerExtractor extractor) {
         getInstance().map.put(metaId, extractor);
     }
 
-
     public static void removeExtractor(String metaId) {
         getInstance().map.remove(metaId);
     }
 
     final class Worker extends Thread {
 
+        private final Map<String, Lsn> maxLsnSnapshot = new LinkedCaseInsensitiveMap<>();
+
         @Override
         public void run() {
             while (!isInterrupted()) {
                 try {
-                    if(map.isEmpty()){
-                        TimeUnit.MILLISECONDS.sleep(WAIT_INTERVAL_MILLIS);
+                    if (map.isEmpty()) {
+                        TimeUnit.SECONDS.sleep(1);
                         continue;
                     }
-                    Map<String, Lsn> dataBaseMaxLsn = new LinkedCaseInsensitiveMap<>();
+                    maxLsnSnapshot.clear();
+                    Lsn maxLsn = null;
                     for (SqlServerExtractor extractor : map.values()) {
-                        if(extractor.getLastLsn() == null){
-                            continue;
-                        }
-                        DatabaseConfig connectorConfig = extractor.getConnectorConfig();
-                        String url = connectorConfig.getUrl();
-                        Lsn maxLsn = null;
-                        if(dataBaseMaxLsn.containsKey(url)){
-                            maxLsn = dataBaseMaxLsn.get(url);
-                        }else{
-                            try{
-                                maxLsn = extractor.getMaxLsn();
-                                dataBaseMaxLsn.put(url, maxLsn);
-                            }catch (Exception e) {
-                                logger.error("获取maxLsn异常:", e);
-                            }
-                        }
+                        maxLsn = getMaxLsn(maxLsnSnapshot, extractor);
                         if (null != maxLsn && maxLsn.isAvailable() && maxLsn.compareTo(extractor.getLastLsn()) > 0) {
                             extractor.pushStopLsn(maxLsn);
                         }
                     }
                     TimeUnit.MILLISECONDS.sleep(DEFAULT_POLL_INTERVAL_MILLIS);
-                }catch (InterruptedException ex){
-                    logger.warn(ex.getMessage());
-                }catch (Exception e) {
+                } catch (Exception e) {
                     logger.error("异常", e);
                     try {
                         TimeUnit.SECONDS.sleep(1);
@@ -105,4 +89,15 @@ public class LsnPuller {
 
     }
 
+    private Lsn getMaxLsn(Map<String, Lsn> maxLsnSnapshot, SqlServerExtractor extractor) {
+        final String url = extractor.getDatabaseConfigUrl();
+        if (maxLsnSnapshot.containsKey(url)) {
+            return maxLsnSnapshot.get(url);
+        }
+
+        Lsn maxLsn = extractor.getMaxLsn();
+        maxLsnSnapshot.put(url, maxLsn);
+        return maxLsn;
+    }
+
 }

+ 4 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -341,10 +341,11 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
                 }
             }
         }
-
     }
-    public DatabaseConfig getConnectorConfig(){
-        return (DatabaseConfig) connectorConfig;
+
+    public String getDatabaseConfigUrl(){
+        DatabaseConfig config = (DatabaseConfig) connectorConfig;
+        return config.getUrl();
     }
 
     public Lsn getLastLsn() {

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/AbstractPuller.java

@@ -13,4 +13,4 @@ public abstract class AbstractPuller implements Puller {
         applicationContext.publishEvent(new ClosedEvent(applicationContext, metaId));
     }
 
-}
+}