Sfoglia il codice sorgente

增加一个专门拉取lsn的进程,每个库每个周期只拉取一次 & 修改缓存逻辑

xinpeng.fu 2 anni fa
parent
commit
16f2bc2eff

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/event/ClosedEvent.java → dbsyncer-common/src/main/java/org/dbsyncer/common/event/ClosedEvent.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.manager.event;
+package org.dbsyncer.common.event;
 
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.event.ApplicationContextEvent;

+ 9 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -50,7 +50,14 @@ public class ConnectorFactory implements DisposableBean {
                 }
             }
         }
-        return connectorCache.get(cacheKey);
+        ConnectorMapper connectorMapper = connectorCache.get(cacheKey);
+        try {
+            ConnectorMapper clone = (ConnectorMapper)connectorMapper.clone();
+            clone.setConfig(config);
+            return clone;
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     /**
@@ -189,4 +196,4 @@ public class ConnectorFactory implements DisposableBean {
         getConnector(connectorMapper).disconnect(connectorMapper);
     }
 
-}
+}

+ 6 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorMapper.java

@@ -11,7 +11,7 @@ import org.dbsyncer.connector.config.ConnectorConfig;
  * @version 1.0.0
  * @date 2022/3/20 23:00
  */
-public interface ConnectorMapper<K, V> {
+public interface ConnectorMapper<K, V> extends Cloneable {
 
     default ConnectorConfig getOriginalConfig() {
         return (ConnectorConfig) getConfig();
@@ -23,7 +23,11 @@ public interface ConnectorMapper<K, V> {
 
     K getConfig();
 
+    void setConfig(K k);
+
     V getConnection() throws Exception;
 
     void close();
-}
+
+    Object clone() throws CloneNotSupportedException ;
+}

+ 11 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseConnectorMapper.java

@@ -41,6 +41,11 @@ public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig,
         return config;
     }
 
+    @Override
+    public void setConfig(DatabaseConfig config) {
+        this.config = config;
+    }
+
     @Override
     public Connection getConnection() throws Exception {
         return dataSource.getConnection();
@@ -51,4 +56,9 @@ public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig,
         dataSource.close();
     }
 
-}
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
+}

+ 11 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnectorMapper.java

@@ -19,6 +19,11 @@ public final class ESConnectorMapper implements ConnectorMapper<ESConfig, RestHi
         return config;
     }
 
+    @Override
+    public void setConfig(ESConfig config) {
+        this.config = config;
+    }
+
     @Override
     public RestHighLevelClient getConnection() {
         return client;
@@ -28,4 +33,9 @@ public final class ESConnectorMapper implements ConnectorMapper<ESConfig, RestHi
     public void close() {
         ESUtil.close(client);
     }
-}
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+}

+ 11 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnectorMapper.java

@@ -41,6 +41,11 @@ public final class FileConnectorMapper implements ConnectorMapper<FileConfig, St
         return config;
     }
 
+    @Override
+    public void setConfig(FileConfig config) {
+        this.config = config;
+    }
+
     @Override
     public String getConnection() {
         return config.getFileDir();
@@ -51,6 +56,11 @@ public final class FileConnectorMapper implements ConnectorMapper<FileConfig, St
         fileSchemaMap.clear();
     }
 
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
     public List<FileSchema> getFileSchemaList() {
         return fileSchemaList;
     }
@@ -78,4 +88,4 @@ public final class FileConnectorMapper implements ConnectorMapper<FileConfig, St
             Assert.isTrue(file.exists(), String.format("found not file '%s'", filePath));
         }
     }
-}
+}

+ 11 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnectorMapper.java

@@ -18,6 +18,11 @@ public final class KafkaConnectorMapper implements ConnectorMapper<KafkaConfig,
         return config;
     }
 
+    @Override
+    public void setConfig(KafkaConfig config) {
+        this.config = config;
+    }
+
     @Override
     public KafkaClient getConnection() {
         return client;
@@ -27,4 +32,9 @@ public final class KafkaConnectorMapper implements ConnectorMapper<KafkaConfig,
     public void close() {
         KafkaUtil.close(client);
     }
-}
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+}

+ 108 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/LsnPuller.java

@@ -0,0 +1,108 @@
+package org.dbsyncer.listener.sqlserver;
+
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.LinkedCaseInsensitiveMap;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Xinpeng.Fu
+ * @version V1.0
+ * @description
+ * @date 2022/8/30 10:04
+ */
+public class LsnPuller {
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private static final long DEFAULT_POLL_INTERVAL_MILLIS = 100;
+    private static final long WAIT_INTERVAL_MILLIS = 1000;
+    private Worker worker;
+    private static volatile LsnPuller instance = null;
+
+    private Map<String, SqlServerExtractor> map = new ConcurrentHashMap<>();
+
+    private LsnPuller(){
+        initWorker();
+    }
+
+    private static LsnPuller getInstance(){
+        if(instance == null){
+            synchronized (LsnPuller.class){
+                if (instance == null) {
+                    instance = new LsnPuller();
+                }
+            }
+        }
+        return instance;
+    }
+
+
+    private void initWorker() {
+        worker = new Worker();
+        worker.setName("cdc-LsnPuller");
+        worker.setDaemon(false);
+        worker.start();
+    }
+
+    public static void addExtractor(String metaId, SqlServerExtractor extractor){
+        getInstance().map.put(metaId, extractor);
+    }
+
+
+    public static void removeExtractor(String metaId) {
+        getInstance().map.remove(metaId);
+    }
+
+    final class Worker extends Thread {
+
+        @Override
+        public void run() {
+            while (!isInterrupted()) {
+                try {
+                    if(map.isEmpty()){
+                        TimeUnit.MILLISECONDS.sleep(WAIT_INTERVAL_MILLIS);
+                        continue;
+                    }
+                    Map<String, Lsn> dataBaseMaxLsn = new LinkedCaseInsensitiveMap<>();
+                    for (SqlServerExtractor extractor : map.values()) {
+                        if(extractor.getLastLsn() == null){
+                            continue;
+                        }
+                        DatabaseConfig connectorConfig = extractor.getConnectorConfig();
+                        String url = connectorConfig.getUrl();
+                        Lsn maxLsn = null;
+                        if(dataBaseMaxLsn.containsKey(url)){
+                            maxLsn = dataBaseMaxLsn.get(url);
+                        }else{
+                            try{
+                                maxLsn = extractor.getMaxLsn();
+                                dataBaseMaxLsn.put(url, maxLsn);
+                            }catch (Exception e) {
+                                logger.error("获取maxLsn异常:", e);
+                            }
+                        }
+                        if (null != maxLsn && maxLsn.isAvailable() && maxLsn.compareTo(extractor.getLastLsn()) > 0) {
+                            extractor.pushStopLsn(maxLsn);
+                        }
+                    }
+                    TimeUnit.MILLISECONDS.sleep(DEFAULT_POLL_INTERVAL_MILLIS);
+                }catch (InterruptedException ex){
+                    logger.warn(ex.getMessage());
+                }catch (Exception e) {
+                    logger.error("异常", e);
+                    try {
+                        TimeUnit.SECONDS.sleep(1);
+                    } catch (InterruptedException ex) {
+                        logger.warn(ex.getMessage());
+                    }
+                }
+            }
+        }
+
+    }
+
+}

+ 28 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -18,6 +18,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -55,6 +56,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     private Lsn lastLsn;
     private String serverName;
     private String schema;
+    private LinkedBlockingQueue<Lsn> stopLsnQueue = new LinkedBlockingQueue<>();
 
     @Override
     public void start() {
@@ -78,6 +80,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
             worker.setName(new StringBuilder("cdc-parser-").append(serverName).append("_").append(RandomUtil.nextInt(1, 100)).toString());
             worker.setDaemon(false);
             worker.start();
+            LsnPuller.addExtractor(metaId, this);
         } catch (Exception e) {
             close();
             logger.error("启动失败:{}", e.getMessage());
@@ -90,6 +93,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     @Override
     public void close() {
         if (connected) {
+            LsnPuller.removeExtractor(metaId);
             if (null != worker && !worker.isInterrupted()) {
                 worker.interrupt();
                 worker = null;
@@ -308,15 +312,22 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
         return (T) execute;
     }
 
+    public Lsn getMaxLsn(){
+        return queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
+    }
+
     final class Worker extends Thread {
 
         @Override
         public void run() {
             while (!isInterrupted() && connected) {
                 try {
-                    Lsn stopLsn = queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
-                    if (null == stopLsn || !stopLsn.isAvailable() || stopLsn.compareTo(lastLsn) <= 0) {
-                        sleepInMills(500L);
+                    Lsn stopLsn = stopLsnQueue.take();
+                    Lsn poll;
+                    while((poll = stopLsnQueue.poll()) != null){
+                        stopLsn = poll;
+                    }
+                    if (!stopLsn.isAvailable() || stopLsn.compareTo(lastLsn) <= 0) {
                         continue;
                     }
 
@@ -332,5 +343,18 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
         }
 
     }
+    public DatabaseConfig getConnectorConfig(){
+        return (DatabaseConfig) connectorConfig;
+    }
+
+    public Lsn getLastLsn() {
+        return lastLsn;
+    }
 
-}
+    public void pushStopLsn(Lsn stopLsn) {
+        if(stopLsnQueue.contains(stopLsn)){
+            return;
+        }
+        stopLsnQueue.offer(stopLsn);
+    }
+}

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/AbstractPuller.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.manager.puller;
 
-import org.dbsyncer.manager.event.ClosedEvent;
+import org.dbsyncer.common.event.ClosedEvent;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
 
@@ -13,4 +13,4 @@ public abstract class AbstractPuller implements Puller {
         applicationContext.publishEvent(new ClosedEvent(applicationContext, metaId));
     }
 
-}
+}