AE86 3 年之前
父节点
当前提交
10d1a1894a

+ 6 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -5,6 +5,7 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
+import org.dbsyncer.listener.quartz.ScheduledTaskService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -26,6 +27,7 @@ public abstract class AbstractExtractor implements Extractor {
     private final Logger logger = LoggerFactory.getLogger(getClass());
     protected BlockingQueue queue = new LinkedBlockingQueue<>(100);
     protected Executor taskExecutor;
+    protected ScheduledTaskService scheduledTaskService;
     protected ConnectorConfig connectorConfig;
     protected ListenerConfig listenerConfig;
     protected Map<String, String> map;
@@ -101,6 +103,10 @@ public abstract class AbstractExtractor implements Extractor {
         this.taskExecutor = taskExecutor;
     }
 
+    public void setScheduledTaskService(ScheduledTaskService scheduledTaskService) {
+        this.scheduledTaskService = scheduledTaskService;
+    }
+
     public void setConnectorConfig(ConnectorConfig connectorConfig) {
         this.connectorConfig = connectorConfig;
     }

+ 1 - 6
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java

@@ -32,7 +32,6 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     private ConnectorFactory connectorFactory;
-    private ScheduledTaskService scheduledTaskService;
     private List<Map<String, String>> commands;
     private int commandSize;
 
@@ -62,8 +61,8 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
                 for (int i = 0; i < commandSize; i++) {
                     execute(commands.get(i), i);
                 }
+                running.compareAndSet(true, false);
             }
-            running.compareAndSet(true, false);
         } catch (Exception e) {
             running.compareAndSet(true, false);
             errorEvent(e);
@@ -189,10 +188,6 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
         this.connectorFactory = connectorFactory;
     }
 
-    public void setScheduledTaskService(ScheduledTaskService scheduledTaskService) {
-        this.scheduledTaskService = scheduledTaskService;
-    }
-
     public void setCommands(List<Map<String, String>> commands) {
         this.commands = commands;
     }

+ 31 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/CDCEvent.java

@@ -0,0 +1,31 @@
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
+package org.dbsyncer.listener.sqlserver;
+
+import java.util.List;
+
+public final class CDCEvent {
+
+    private String tableName;
+    private int code;
+    private List<Object> row;
+
+    public CDCEvent(String tableName, int code, List<Object> row) {
+        this.tableName = tableName;
+        this.code = code;
+        this.row = row;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public List<Object> getRow() {
+        return row;
+    }
+}

+ 0 - 10
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/Lsn.java

@@ -146,14 +146,4 @@ public class Lsn implements Comparable<Lsn> {
         return 0;
     }
 
-    /**
-     * Verifies whether the LSN falls into a LSN interval
-     *
-     * @param from start of the interval (included)
-     * @param to   end of the interval (excluded)
-     * @return true if the LSN falls into the interval
-     */
-    public boolean isBetween(Lsn from, Lsn to) {
-        return this.compareTo(from) >= 0 && this.compareTo(to) < 0;
-    }
 }

+ 73 - 57
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -5,10 +5,12 @@ import com.microsoft.sqlserver.jdbc.SQLServerException;
 import org.apache.commons.lang.math.RandomUtils;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.ListenerException;
+import org.dbsyncer.listener.quartz.ScheduledTaskJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
@@ -26,7 +28,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * @Author AE86
  * @Date 2021-06-18 01:20
  */
-public class SqlServerExtractor extends AbstractExtractor {
+public class SqlServerExtractor extends AbstractExtractor implements ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -46,13 +48,14 @@ public class SqlServerExtractor extends AbstractExtractor {
     private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT * FROM cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
 
     private static final String LSN_POSITION = "position";
-    private static final long DEFAULT_POLL_INTERVAL_MILLIS = 360;
     private static final int OFFSET_COLUMNS = 4;
     private final Lock connectLock = new ReentrantLock();
     private volatile boolean connected;
     private static Set<String> tables;
     private static Set<SqlServerChangeTable> changeTables;
     private Connection connection;
+    private String taskKey;
+    private String cron = "*/1 * * * * ?";
     private Worker worker;
     private Lsn lastLsn;
 
@@ -82,6 +85,8 @@ public class SqlServerExtractor extends AbstractExtractor {
             worker.setDaemon(false);
             worker.start();
 
+            taskKey = UUIDUtil.getUUID();
+            scheduledTaskService.start(taskKey, cron, this);
             connected = true;
         } catch (Exception e) {
             close();
@@ -97,6 +102,7 @@ public class SqlServerExtractor extends AbstractExtractor {
         if (connected) {
             try {
                 connectLock.lock();
+                scheduledTaskService.stop(taskKey);
                 if (null != worker && !worker.isInterrupted()) {
                     worker.interrupt();
                     worker = null;
@@ -112,6 +118,29 @@ public class SqlServerExtractor extends AbstractExtractor {
         }
     }
 
+    @Override
+    public void run() {
+        if (connected) {
+            try {
+                Lsn stopLsn = queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
+                if (!stopLsn.isAvailable()) {
+                    return;
+                }
+
+                if (stopLsn.compareTo(lastLsn) <= 0) {
+                    return;
+                }
+
+                pull(stopLsn);
+
+                lastLsn = stopLsn;
+                map.put(LSN_POSITION, lastLsn.toString());
+            } catch (Exception e) {
+                logger.error(e.getMessage());
+            }
+        }
+    }
+
     private void close(AutoCloseable closeable) {
         if (null != closeable) {
             try {
@@ -241,55 +270,35 @@ public class SqlServerExtractor extends AbstractExtractor {
         Lsn startLsn = queryAndMap(GET_INCREMENT_LSN, statement -> statement.setBytes(1, lastLsn.getBinary()), rs -> Lsn.valueOf(rs.getBytes(1)));
         changeTables.forEach(changeTable -> {
             final String query = GET_ALL_CHANGES_FOR_TABLE.replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance());
-            List<List<Object>> list = queryAndMapList(query, statement -> {
+            queryAndMapList(query, statement -> {
                 statement.setBytes(1, startLsn.getBinary());
                 statement.setBytes(2, stopLsn.getBinary());
             }, rs -> {
                 int columnCount = rs.getMetaData().getColumnCount();
-                List<List<Object>> data = new ArrayList<>(columnCount);
                 List<Object> row = null;
                 while (rs.next()) {
-                    row = new ArrayList<>(columnCount);
-                    for (int i = 1; i <= columnCount; i++) {
+                    // skip update before
+                    final int operation = rs.getInt(3);
+                    if (TableOperation.isUpdateBefore(operation)) {
+                        continue;
+                    }
+                    row = new ArrayList<>(columnCount - OFFSET_COLUMNS);
+                    for (int i = OFFSET_COLUMNS + 1; i <= columnCount; i++) {
                         row.add(rs.getObject(i));
                     }
-                    data.add(row);
+                    try {
+                        // 如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续
+                        queue.put(new CDCEvent(changeTable.getTableName(), operation, row));
+                    } catch (InterruptedException ex) {
+                        logger.error("Table[{}], Data:{}, Error:{}", changeTable.getTableName(), row, ex.getMessage());
+                    }
                 }
-                return data;
+                return null;
             });
 
-            parseEvent(changeTable.getTableName(), list);
         });
     }
 
-    private void parseEvent(String table, List<List<Object>> list) {
-        if (CollectionUtils.isEmpty(list)) {
-            return;
-        }
-        for (List<Object> row : list) {
-            if (!CollectionUtils.isEmpty(row) && row.size() > OFFSET_COLUMNS) {
-                final int operation = (int) row.get(2);
-                if (TableOperation.isUpdate(operation)) {
-                    asynSendRowChangedEvent(new RowChangedEvent(table, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, getData(row)));
-                    continue;
-                }
-
-                if (TableOperation.isInsert(operation)) {
-                    asynSendRowChangedEvent(new RowChangedEvent(table, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, getData(row)));
-                    continue;
-                }
-
-                if (TableOperation.isDelete(operation)) {
-                    asynSendRowChangedEvent(new RowChangedEvent(table, ConnectorConstant.OPERTION_DELETE, getData(row), Collections.EMPTY_LIST));
-                }
-            }
-        }
-    }
-
-    private List<Object> getData(List<Object> row) {
-        return new ArrayList<>(row.subList(OFFSET_COLUMNS, row.size()));
-    }
-
     private interface ResultSetMapper<T> {
         T apply(ResultSet rs) throws SQLException;
     }
@@ -367,7 +376,11 @@ public class SqlServerExtractor extends AbstractExtractor {
             return INSERT.getCode() == code;
         }
 
-        public static boolean isUpdate(int code) {
+        public static boolean isUpdateBefore(int code) {
+            return UPDATE_BEFORE.getCode() == code;
+        }
+
+        public static boolean isUpdateAfter(int code) {
             return UPDATE_AFTER.getCode() == code;
         }
 
@@ -386,33 +399,36 @@ public class SqlServerExtractor extends AbstractExtractor {
         public void run() {
             while (!isInterrupted()) {
                 try {
-                    Lsn stopLsn = queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
-                    if (!stopLsn.isAvailable()) {
-                        pause();
-                        continue;
-                    }
-
-                    if (stopLsn.compareTo(lastLsn) <= 0) {
-                        pause();
-                        continue;
+                    // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
+                    CDCEvent event = (CDCEvent) queue.take();
+                    if (null != event) {
+                        parseEvent(event);
                     }
-
-                    pull(stopLsn);
-
-                    lastLsn = stopLsn;
-                    map.put(LSN_POSITION, lastLsn.toString());
                 } catch (InterruptedException e) {
                     break;
-                } catch (Exception e) {
-                    logger.error(e.getMessage());
                 }
             }
         }
 
-        private void pause() throws InterruptedException {
-            TimeUnit.MICROSECONDS.sleep(DEFAULT_POLL_INTERVAL_MILLIS);
-        }
+        private void parseEvent(CDCEvent event) {
+            final List<Object> row = event.getRow();
+            if (!CollectionUtils.isEmpty(row)) {
+                int code = event.getCode();
+                if (TableOperation.isUpdateAfter(code)) {
+                    asynSendRowChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, event.getRow()));
+                    return;
+                }
 
+                if (TableOperation.isInsert(code)) {
+                    asynSendRowChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, event.getRow()));
+                    return;
+                }
+
+                if (TableOperation.isDelete(code)) {
+                    asynSendRowChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, event.getRow(), Collections.EMPTY_LIST));
+                }
+            }
+        }
     }
 
 }

+ 0 - 42
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/TableOperation.java

@@ -1,42 +0,0 @@
-package org.dbsyncer.listener.sqlserver;
-
-public enum TableOperation {
-    /**
-     * 插入
-     */
-    INSERT(2),
-    /**
-     * 更新(旧值)
-     */
-    UPDATE_BEFORE(3),
-    /**
-     * 更新(新值)
-     */
-    UPDATE_AFTER(4),
-    /**
-     * 删除
-     */
-    DELETE(1);
-
-    private final int code;
-
-    TableOperation(int code) {
-        this.code = code;
-    }
-
-    public static boolean isInsert(int code) {
-        return INSERT.getCode() == code;
-    }
-
-    public static boolean isUpdate(int code) {
-        return UPDATE_AFTER.getCode() == code;
-    }
-
-    public static boolean isDelete(int code) {
-        return DELETE.getCode() == code;
-    }
-
-    public int getCode() {
-        return code;
-    }
-}

+ 1 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -179,6 +179,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
     private void setExtractorConfig(AbstractExtractor extractor, ConnectorConfig connector, ListenerConfig listener,
                                     Map<String, String> map, Event event) {
         extractor.setTaskExecutor(taskExecutor);
+        extractor.setScheduledTaskService(scheduledTaskService);
         extractor.setConnectorConfig(connector);
         extractor.setListenerConfig(listener);
         extractor.setMap(map);