Browse Source

优化消费模型

AE86 1 year ago
parent
commit
702db6514f

+ 20 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/RowChangedEvent.java

@@ -22,6 +22,8 @@ public class RowChangedEvent {
     private String event;
     private List<Object> dataList;
     private Map<String, Object> dataMap;
+    private String nextFileName;
+    private Object position;
 
     public RowChangedEvent(int tableGroupIndex, String event, Map<String, Object> data) {
         this.tableGroupIndex = tableGroupIndex;
@@ -67,6 +69,24 @@ public class RowChangedEvent {
         this.dataMap = dataMap;
     }
 
+    public String getNextFileName() {
+        return nextFileName;
+    }
+
+    public RowChangedEvent setNextFileName(String nextFileName) {
+        this.nextFileName = nextFileName;
+        return this;
+    }
+
+    public Object getPosition() {
+        return position;
+    }
+
+    public RowChangedEvent setPosition(Object position) {
+        this.position = position;
+        return this;
+    }
+
     @Override
     public String toString() {
         return JsonUtil.objToJson(this);

+ 52 - 10
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -4,6 +4,7 @@ import org.dbsyncer.common.event.Event;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.model.Table;
@@ -11,7 +12,10 @@ import org.dbsyncer.listener.config.ListenerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.Timestamp;
 import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
 import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.Map;
@@ -45,13 +49,16 @@ public abstract class AbstractExtractor implements Extractor {
     private Lock lock = new ReentrantLock();
     private Condition isFull;
     private final Duration pollInterval = Duration.of(500, ChronoUnit.MILLIS);
+    private static final int FLUSH_DELAYED_SECONDS = 30;
+    private long updateTime;
+
 
     @Override
     public void start() {
         this.lock = new ReentrantLock();
         this.isFull = lock.newCondition();
         enableConsumerThread = true;
-        consumerThread = new Thread(()->{
+        consumerThread = new Thread(() -> {
             while (enableConsumerThread) {
                 try {
                     // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
@@ -59,6 +66,9 @@ public abstract class AbstractExtractor implements Extractor {
                     if (null != event) {
                         // TODO 待优化多表并行模型
                         consumer.changedEvent(event);
+                        // 更新增量点
+                        refreshEvent(event);
+                        updateTime = Instant.now().toEpochMilli();
                     }
                 } catch (InterruptedException e) {
                     break;
@@ -109,7 +119,18 @@ public abstract class AbstractExtractor implements Extractor {
 
     @Override
     public void flushEvent() {
-        consumer.flushEvent(snapshot);
+        // 30s内更新,执行写入
+        if (updateTime > 0 && updateTime > Timestamp.valueOf(LocalDateTime.now().minusSeconds(FLUSH_DELAYED_SECONDS)).getTime()) {
+            forceFlushEvent();
+        }
+    }
+
+
+    @Override
+    public void forceFlushEvent() {
+        if (!CollectionUtils.isEmpty(snapshot)) {
+            consumer.flushEvent(snapshot);
+        }
     }
 
     @Override
@@ -117,6 +138,15 @@ public abstract class AbstractExtractor implements Extractor {
         consumer.errorEvent(e);
     }
 
+    /**
+     * 更新增量点
+     *
+     * @param event
+     */
+    protected void refreshEvent(RowChangedEvent event) {
+        // nothing to do
+    }
+
     protected void sleepInMills(long timeout) {
         try {
             TimeUnit.MILLISECONDS.sleep(timeout);
@@ -132,17 +162,29 @@ public abstract class AbstractExtractor implements Extractor {
      * @param event
      */
     private void processEvent(boolean permitEvent, RowChangedEvent event) {
-        if (permitEvent) {
-            if (!queue.offer(event)) {
-                // 容量上限,阻塞重试
-                while (!queue.offer(event)) {
-                    try {
-                        this.isFull.await(pollInterval.toMillis(), TimeUnit.MILLISECONDS);
-                    } catch (InterruptedException e) {
-                        logger.error(e.getMessage(), e);
+        if (!permitEvent) {
+            return;
+        }
+
+        boolean lock = false;
+        try {
+            lock = this.lock.tryLock();
+            if (lock) {
+                if (!queue.offer(event)) {
+                    // 容量上限,阻塞重试
+                    while (!queue.offer(event)) {
+                        try {
+                            this.isFull.await(pollInterval.toMillis(), TimeUnit.MILLISECONDS);
+                        } catch (InterruptedException e) {
+                            logger.error(e.getMessage(), e);
+                        }
                     }
                 }
             }
+        } finally {
+            if (lock) {
+                this.lock.unlock();
+            }
         }
     }
 

+ 5 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -34,6 +34,11 @@ public interface Extractor {
      */
     void flushEvent();
 
+    /**
+     * 强制刷新增量点事件
+     */
+    void forceFlushEvent();
+
     /**
      * 异常事件
      *

+ 20 - 7
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -80,6 +80,11 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
         }
     }
 
+    @Override
+    protected void refreshEvent(RowChangedEvent event) {
+        refreshSnapshot(event.getNextFileName(), (Long) event.getPosition());
+    }
+
     private void run() throws Exception {
         final DatabaseConfig config = (DatabaseConfig) connectorConfig;
         if (StringUtil.isBlank(config.getUrl())) {
@@ -92,15 +97,20 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
         final Host host = cluster.get(MASTER);
         final String username = config.getUsername();
         final String password = config.getPassword();
-        final String pos = snapshot.get(BINLOG_POSITION);
+        boolean containsPos = snapshot.containsKey(BINLOG_POSITION);
         client = new BinaryLogRemoteClient(host.getIp(), host.getPort(), username, password);
         client.setBinlogFilename(snapshot.get(BINLOG_FILENAME));
-        client.setBinlogPosition(StringUtil.isBlank(pos) ? 0 : Long.parseLong(pos));
+        client.setBinlogPosition(containsPos ? Long.parseLong(snapshot.get(BINLOG_POSITION)) : 0);
         client.setTableMapEventByTableId(tables);
         client.registerEventListener(new MysqlEventListener());
         client.registerLifecycleListener(new MysqlLifecycleListener());
 
         client.connect();
+
+        if (!containsPos) {
+            refreshSnapshot(client.getBinlogFilename(), client.getBinlogPosition());
+            super.forceFlushEvent();
+        }
     }
 
     private List<Host> readNodes(String url) {
@@ -168,14 +178,17 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
     private void refresh(String binlogFilename, long nextPosition) {
         if (StringUtil.isNotBlank(binlogFilename)) {
             client.setBinlogFilename(binlogFilename);
-            snapshot.put(BINLOG_FILENAME, binlogFilename);
         }
         if (0 < nextPosition) {
             client.setBinlogPosition(nextPosition);
-            snapshot.put(BINLOG_POSITION, String.valueOf(nextPosition));
         }
     }
 
+    private void refreshSnapshot(String binlogFilename, long nextPosition) {
+        snapshot.put(BINLOG_FILENAME, binlogFilename);
+        snapshot.put(BINLOG_POSITION, String.valueOf(nextPosition));
+    }
+
     final class MysqlLifecycleListener implements BinaryLogRemoteClient.LifecycleListener {
 
         @Override
@@ -238,7 +251,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                 if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
                         List<Object> after = Stream.of(m.getValue()).collect(Collectors.toList());
-                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_UPDATE, after));
+                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_UPDATE, after).setNextFileName(client.getBinlogFilename()).setPosition(client.getBinlogPosition()));
                     });
                 }
                 return;
@@ -249,7 +262,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                 if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
                         List<Object> after = Stream.of(m).collect(Collectors.toList());
-                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_INSERT, after));
+                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_INSERT, after).setNextFileName(client.getBinlogFilename()).setPosition(client.getBinlogPosition()));
                     });
                 }
                 return;
@@ -260,7 +273,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                 if (isFilterTable(data.getTableId())) {
                     data.getRows().forEach(m -> {
                         List<Object> before = Stream.of(m).collect(Collectors.toList());
-                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_DELETE, before));
+                        sendChangedEvent(new RowChangedEvent(getTableName(data.getTableId()), ConnectorConstant.OPERTION_DELETE, before).setNextFileName(client.getBinlogFilename()).setPosition(client.getBinlogPosition()));
                     });
                 }
                 return;

+ 12 - 8
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.listener.postgresql;
 
+import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.BooleanUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
@@ -131,6 +132,11 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
         }
     }
 
+    @Override
+    protected void refreshEvent(RowChangedEvent event) {
+        snapshot.put(LSN_POSITION, LogSequenceNumber.valueOf((Long) event.getPosition()).asString());
+    }
+
     private void connect() throws SQLException {
         Properties props = new Properties();
         PGProperty.USER.set(props, config.getUsername());
@@ -184,6 +190,7 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
                 throw new ListenerException("No maximum LSN recorded in the database");
             }
             snapshot.put(LSN_POSITION, lsn.asString());
+            super.forceFlushEvent();
         }
 
         this.startLsn = LogSequenceNumber.valueOf(snapshot.get(LSN_POSITION));
@@ -253,12 +260,6 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
         }
     }
 
-    private void flushLsn(LogSequenceNumber lsn) {
-        if (null != lsn && lsn.asLong() > 0) {
-            snapshot.put(LSN_POSITION, lsn.asString());
-        }
-    }
-
     final class Worker extends Thread {
 
         @Override
@@ -278,9 +279,12 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
                         continue;
                     }
 
-                    flushLsn(lsn);
                     // process decoder
-                    sendChangedEvent(messageDecoder.processMessage(msg));
+                    RowChangedEvent event = messageDecoder.processMessage(msg);
+                    if(event != null){
+                        event.setPosition(lsn.asLong());
+                    }
+                    sendChangedEvent(event);
 
                     // feedback
                     stream.setAppliedLSN(lsn);

+ 11 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -110,6 +110,11 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
         }
     }
 
+    @Override
+    protected void refreshEvent(RowChangedEvent event) {
+        snapshot.put(LSN_POSITION, event.getPosition().toString());
+    }
+
     private void close(AutoCloseable closeable) {
         if (null != closeable) {
             try {
@@ -134,6 +139,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
             lastLsn = queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
             if (null != lastLsn && lastLsn.isAvailable()) {
                 snapshot.put(LSN_POSITION, lastLsn.toString());
+                super.forceFlushEvent();
                 return;
             }
             // Shouldn't happen if the agent is running, but it is better to guard against such situation
@@ -244,26 +250,26 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
             });
 
             if (!CollectionUtils.isEmpty(list)) {
-                parseEvent(list);
+                parseEvent(list, stopLsn);
             }
         });
     }
 
-    private void parseEvent(List<CDCEvent> list) {
+    private void parseEvent(List<CDCEvent> list, Lsn stopLsn) {
         for (CDCEvent event : list) {
             int code = event.getCode();
             if (TableOperationEnum.isUpdateAfter(code)) {
-                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, event.getRow()));
+                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, event.getRow()).setPosition(stopLsn));
                 continue;
             }
 
             if (TableOperationEnum.isInsert(code)) {
-                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, event.getRow()));
+                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, event.getRow()).setPosition(stopLsn));
                 continue;
             }
 
             if (TableOperationEnum.isDelete(code)) {
-                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, event.getRow()));
+                sendChangedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, event.getRow()).setPosition(stopLsn));
             }
         }
     }

+ 16 - 43
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -3,6 +3,7 @@ package org.dbsyncer.manager.puller;
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
+import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import java.time.Instant;
 import java.util.ArrayList;
@@ -51,7 +53,7 @@ import java.util.stream.Collectors;
  * @date 2020/04/26 15:28
  */
 @Component
-public class IncrementPuller extends AbstractPuller {
+public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -75,6 +77,11 @@ public class IncrementPuller extends AbstractPuller {
 
     private Map<String, Extractor> map = new ConcurrentHashMap<>();
 
+    @PostConstruct
+    private void init() {
+        scheduledTaskService.start(3000, this);
+    }
+
     @Override
     public void start(Mapping mapping) {
         final String mappingId = mapping.getId();
@@ -117,6 +124,12 @@ public class IncrementPuller extends AbstractPuller {
         logger.info("关闭成功:{}", metaId);
     }
 
+    @Override
+    public void run() {
+        // 定时同步增量信息
+        map.forEach((k, v) -> v.flushEvent());
+    }
+
     private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta) throws InstantiationException, IllegalAccessException {
         AbstractConnectorConfig connectorConfig = connector.getConfig();
         ListenerConfig listenerConfig = mapping.getListener();
@@ -183,30 +196,11 @@ public class IncrementPuller extends AbstractPuller {
         }
     }
 
-    /**
-     * </p>定时模式
-     * <ol>
-     * <li>根据过滤条件筛选</li>
-     * </ol>
-     * </p>同步关系:
-     * </p>数据源表 >> 目标源表
-     * <ul>
-     * <li>A >> B</li>
-     * <li>A >> C</li>
-     * <li>E >> F</li>
-     * </ul>
-     * </p>PS:
-     * <ol>
-     * <li>依次执行同步关系A >> B 然后 A >> C ...</li>
-     * </ol>
-     */
     final class QuartzConsumer extends AbstractConsumer {
 
-        private List<FieldPicker> tablePicker;
-
+        private List<FieldPicker> tablePicker = new LinkedList<>();
         public QuartzConsumer(Mapping mapping, List<TableGroup> tableGroups) {
             this.mapping = mapping;
-            this.tablePicker = new LinkedList<>();
             tableGroups.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
         }
 
@@ -221,33 +215,12 @@ public class IncrementPuller extends AbstractPuller {
         }
     }
 
-    /**
-     * </p>日志模式
-     * <ol>
-     * <li>监听表增量数据</li>
-     * <li>根据过滤条件筛选</li>
-     * </ol>
-     * </p>同步关系:
-     * </p>数据源表 >> 目标源表
-     * <ul>
-     * <li>A >> B</li>
-     * <li>A >> C</li>
-     * <li>E >> F</li>
-     * </ul>
-     * </p>PS:
-     * <ol>
-     * <li>为减少开销而选择复用监听器实例, 启动时只需创建一个数据源连接器.</li>
-     * <li>关系A >> B和A >> C会复用A监听的数据, A监听到增量数据,会发送给B和C.</li>
-     * <li>该模式下,会监听表所有字段.</li>
-     * </ol>
-     */
     final class LogConsumer extends AbstractConsumer {
 
-        private Map<String, List<FieldPicker>> tablePicker;
+        private Map<String, List<FieldPicker>> tablePicker = new LinkedHashMap<>();
 
         public LogConsumer(Mapping mapping, List<TableGroup> tableGroups) {
             this.mapping = mapping;
-            this.tablePicker = new LinkedHashMap<>();
             tableGroups.forEach(t -> {
                 final Table table = t.getSourceTable();
                 final String tableName = table.getName();