Переглянути джерело

重构消费模型 https://gitee.com/ghi/dbsyncer/issues/I7UB5M

Signed-off-by: AE86 <836391306@qq.com>
AE86 1 рік тому
батько
коміт
f69426c011

+ 0 - 14
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -23,13 +23,6 @@ public interface Event {
      */
     void flushEvent(Map<String, String> snapshot);
 
-    /**
-     * 强制写入增量点事件
-     *
-     * @param snapshot
-     */
-    void forceFlushEvent(Map<String,String> snapshot);
-
     /**
      * 异常事件
      *
@@ -37,11 +30,4 @@ public interface Event {
      */
     void errorEvent(Exception e);
 
-    /**
-     * 中断异常
-     *
-     * @param e
-     */
-    void interruptException(Exception e);
-
 }

+ 65 - 17
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -11,10 +11,16 @@ import org.dbsyncer.listener.config.ListenerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * @version 1.0.0
@@ -32,11 +38,51 @@ public abstract class AbstractExtractor implements Extractor {
     protected List<Table> sourceTable;
     protected Map<String, String> snapshot;
     protected String metaId;
-    protected Event watcher;
+    private Event consumer;
+    private BlockingQueue<RowChangedEvent> queue;
+    private Thread consumerThread;
+    private volatile boolean enableConsumerThread;
+    private Lock lock = new ReentrantLock();
+    private Condition isFull;
+    private final Duration pollInterval = Duration.of(500, ChronoUnit.MILLIS);
 
     @Override
-    public void register(Event event) {
-        watcher = event;
+    public void start() {
+        this.lock = new ReentrantLock();
+        this.isFull = lock.newCondition();
+        enableConsumerThread = true;
+        consumerThread = new Thread(()->{
+            while (enableConsumerThread) {
+                try {
+                    // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
+                    RowChangedEvent event = queue.take();
+                    if (null != event) {
+                        // TODO 待优化多表并行模型
+                        consumer.changedEvent(event);
+                    }
+                } catch (InterruptedException e) {
+                    break;
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        });
+        consumerThread.setName(new StringBuilder("extractor-consumer-").append(metaId).toString());
+        consumerThread.setDaemon(false);
+        consumerThread.start();
+    }
+
+    @Override
+    public void close() {
+        enableConsumerThread = false;
+        if (consumerThread != null && !enableConsumerThread) {
+            consumerThread.interrupt();
+        }
+    }
+
+    @Override
+    public void register(Event consumer) {
+        this.consumer = consumer;
     }
 
     @Override
@@ -63,23 +109,12 @@ public abstract class AbstractExtractor implements Extractor {
 
     @Override
     public void flushEvent() {
-        watcher.flushEvent(snapshot);
-    }
-
-    @Override
-    public void forceFlushEvent() {
-        logger.info("Force flush:{}", snapshot);
-        watcher.forceFlushEvent(snapshot);
+        consumer.flushEvent(snapshot);
     }
 
     @Override
     public void errorEvent(Exception e) {
-        watcher.errorEvent(e);
-    }
-
-    @Override
-    public void interruptException(Exception e) {
-        watcher.interruptException(e);
+        consumer.errorEvent(e);
     }
 
     protected void sleepInMills(long timeout) {
@@ -98,7 +133,16 @@ public abstract class AbstractExtractor implements Extractor {
      */
     private void processEvent(boolean permitEvent, RowChangedEvent event) {
         if (permitEvent) {
-            watcher.changedEvent(event);
+            if (!queue.offer(event)) {
+                // 容量上限,阻塞重试
+                while (!queue.offer(event)) {
+                    try {
+                        this.isFull.await(pollInterval.toMillis(), TimeUnit.MILLISECONDS);
+                    } catch (InterruptedException e) {
+                        logger.error(e.getMessage(), e);
+                    }
+                }
+            }
         }
     }
 
@@ -134,4 +178,8 @@ public abstract class AbstractExtractor implements Extractor {
     public void setMetaId(String metaId) {
         this.metaId = metaId;
     }
+
+    public void setQueue(BlockingQueue<RowChangedEvent> queue) {
+        this.queue = queue;
+    }
 }

+ 0 - 12
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -34,11 +34,6 @@ public interface Extractor {
      */
     void flushEvent();
 
-    /**
-     * 强制刷新增量点事件
-     */
-    void forceFlushEvent();
-
     /**
      * 异常事件
      *
@@ -46,11 +41,4 @@ public interface Extractor {
      */
     void errorEvent(Exception e);
 
-    /**
-     * 中断异常
-     *
-     * @param e
-     */
-    void interruptException(Exception e);
-
 }

+ 2 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -69,6 +69,7 @@ public class FileExtractor extends AbstractExtractor {
             final FileConfig config = connectorMapper.getConfig();
             final String mapperCacheKey = connectorFactory.getConnector(connectorMapper).getConnectorMapperCacheKey(connectorConfig);
             connected = true;
+            super.start();
 
             separator = config.getSeparator();
             initPipeline(config.getFileDir());
@@ -79,7 +80,6 @@ public class FileExtractor extends AbstractExtractor {
             for (String fileName : pipeline.keySet()) {
                 parseEvent(fileName);
             }
-            forceFlushEvent();
 
             worker = new Worker();
             worker.setName(new StringBuilder("file-parser-").append(mapperCacheKey).append("_").append(worker.hashCode()).toString());
@@ -115,6 +115,7 @@ public class FileExtractor extends AbstractExtractor {
     @Override
     public void close() {
         try {
+            super.close();
             closePipelineAndWatch();
             connected = false;
             if (null != worker && !worker.isInterrupted()) {

+ 4 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -53,6 +53,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                 logger.error("MysqlExtractor is already started");
                 return;
             }
+            super.start();
             run();
             connected = true;
         } catch (Exception e) {
@@ -68,6 +69,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
         try {
             connectLock.lock();
             connected = false;
+            super.close();
             if (null != client) {
                 client.disconnect();
             }
@@ -147,7 +149,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                 logger.error("第{}次重启异常, ThreadName:{}, {}", i, client.getWorkerThreadName(), e.getMessage());
                 // 无法连接,关闭任务
                 if (i == RETRY_TIMES) {
-                    interruptException(new ListenerException(String.format("重启异常, %s, %s", client.getWorkerThreadName(), e.getMessage())));
+                    errorEvent(new ListenerException(String.format("重启异常, %s, %s", client.getWorkerThreadName(), e.getMessage())));
                 }
             }
             try {
@@ -201,7 +203,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
                     String log = String.format("线程[%s]执行异常。由于MySQL配置了过期binlog文件自动删除机制,已无法找到原binlog文件%s。建议先保存驱动(加载最新的binlog文件),再启动驱动。",
                             client.getWorkerThreadName(),
                             client.getBinlogFilename());
-                    interruptException(new ListenerException(log));
+                    errorEvent(new ListenerException(log));
                     return;
                 }
             }
@@ -268,8 +270,6 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
             if (header.getEventType() == EventType.ROTATE) {
                 RotateEventData data = event.getData();
                 refresh(data.getBinlogFilename(), data.getBinlogPosition());
-                forceFlushEvent();
-                return;
             }
         }
 

+ 2 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/OracleExtractor.java

@@ -21,6 +21,7 @@ public class OracleExtractor extends AbstractDatabaseExtractor {
     @Override
     public void start() {
         try {
+            super.start();
             final DatabaseConfig config = (DatabaseConfig) connectorConfig;
             String username = config.getUsername();
             String password = config.getPassword();
@@ -37,6 +38,7 @@ public class OracleExtractor extends AbstractDatabaseExtractor {
 
     @Override
     public void close() {
+        super.close();
         if (null != client) {
             client.close();
         }

+ 2 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java

@@ -68,6 +68,7 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
                 return;
             }
 
+            super.start();
             connectorMapper = (DatabaseConnectorMapper) connectorFactory.connect(connectorConfig);
             config = connectorMapper.getConfig();
 
@@ -117,6 +118,7 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
     public void close() {
         try {
             connected = false;
+            super.close();
             if (null != worker && !worker.isInterrupted()) {
                 worker.interrupt();
                 worker = null;

+ 2 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -64,6 +64,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
         taskKey = UUIDUtil.getUUID();
         running = true;
+        super.start();
 
         scheduledTaskService.start(taskKey, listenerConfig.getCron(), this);
         logger.info("启动定时任务:{} >> {}", taskKey, listenerConfig.getCron());
@@ -92,6 +93,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
     @Override
     public void close() {
+        super.close();
         scheduledTaskService.stop(taskKey);
         running = false;
     }

+ 4 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -62,7 +62,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     private Lsn lastLsn;
     private String serverName;
     private String schema;
-    private LinkedBlockingQueue<Lsn> stopLsnQueue = new LinkedBlockingQueue<>();
+    private LinkedBlockingQueue<Lsn> stopLsnQueue = new LinkedBlockingQueue<>(256);
 
     @Override
     public void start() {
@@ -73,6 +73,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
                 return;
             }
             connected = true;
+            super.start();
             connect();
             readTables();
             Assert.notEmpty(tables, "No tables available");
@@ -99,6 +100,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
     @Override
     public void close() {
         if (connected) {
+            super.close();
             LsnPuller.removeExtractor(metaId);
             if (null != worker && !worker.isInterrupted()) {
                 worker.interrupt();
@@ -359,6 +361,7 @@ public class SqlServerExtractor extends AbstractDatabaseExtractor {
         if (stopLsnQueue.contains(stopLsn)) {
             return;
         }
+        // TODO 优化采用阻塞写
         stopLsnQueue.offer(stopLsn);
     }
 }

+ 11 - 61
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -3,7 +3,6 @@ package org.dbsyncer.manager.puller;
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
-import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
@@ -31,11 +30,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
-import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
-import java.sql.Timestamp;
 import java.time.Instant;
-import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -44,7 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
 /**
@@ -55,7 +51,7 @@ import java.util.stream.Collectors;
  * @date 2020/04/26 15:28
  */
 @Component
-public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob {
+public class IncrementPuller extends AbstractPuller {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -79,11 +75,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
 
     private Map<String, Extractor> map = new ConcurrentHashMap<>();
 
-    @PostConstruct
-    private void init() {
-        scheduledTaskService.start(3000, this);
-    }
-
     @Override
     public void start(Mapping mapping) {
         final String mappingId = mapping.getId();
@@ -126,12 +117,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         logger.info("关闭成功:{}", metaId);
     }
 
-    @Override
-    public void run() {
-        // 定时同步增量信息
-        map.forEach((k, v) -> v.flushEvent());
-    }
-
     private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta) throws InstantiationException, IllegalAccessException {
         AbstractConnectorConfig connectorConfig = connector.getConfig();
         ListenerConfig listenerConfig = mapping.getListener();
@@ -144,14 +129,14 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         if (ListenerTypeEnum.isTiming(listenerType)) {
             AbstractQuartzExtractor quartzExtractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
             quartzExtractor.setCommands(list.stream().map(t -> new TableGroupQuartzCommand(t.getSourceTable(), t.getCommand())).collect(Collectors.toList()));
-            quartzExtractor.register(new QuartzListener(mapping, list));
+            quartzExtractor.register(new QuartzConsumer(mapping, list));
             extractor = quartzExtractor;
         }
 
         // 基于日志抽取
         if (ListenerTypeEnum.isLog(listenerType)) {
             extractor = listener.getExtractor(ListenerTypeEnum.LOG, connectorConfig.getConnectorType(), AbstractExtractor.class);
-            extractor.register(new LogListener(mapping, list, extractor));
+            extractor.register(new LogConsumer(mapping, list));
         }
 
         if (null != extractor) {
@@ -173,33 +158,19 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             extractor.setSourceTable(sourceTable);
             extractor.setSnapshot(meta.getSnapshot());
             extractor.setMetaId(meta.getId());
+            extractor.setQueue(new LinkedBlockingQueue<>(8162));
             return extractor;
         }
 
         throw new ManagerException("未知的监听配置.");
     }
 
-    abstract class AbstractListener implements Event {
-        private static final int FLUSH_DELAYED_SECONDS = 30;
+    abstract class AbstractConsumer implements Event {
         protected Mapping mapping;
-        protected String metaId;
 
         @Override
         public void flushEvent(Map<String, String> snapshot) {
-            // 30s内更新,执行写入
-            Meta meta = manager.getMeta(metaId);
-            LocalDateTime lastSeconds = LocalDateTime.now().minusSeconds(FLUSH_DELAYED_SECONDS);
-            if (meta.getUpdateTime() > Timestamp.valueOf(lastSeconds).getTime()) {
-                if (!CollectionUtils.isEmpty(snapshot)) {
-                    logger.debug("{}", snapshot);
-                }
-                forceFlushEvent(snapshot);
-            }
-        }
-
-        @Override
-        public void forceFlushEvent(Map<String, String> snapshot) {
-            Meta meta = manager.getMeta(metaId);
+            Meta meta = manager.getMeta(mapping.getMetaId());
             if (null != meta) {
                 meta.setSnapshot(snapshot);
                 manager.editConfigModel(meta);
@@ -210,12 +181,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         public void errorEvent(Exception e) {
             logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
         }
-
-        @Override
-        public void interruptException(Exception e) {
-            errorEvent(e);
-            close(metaId);
-        }
     }
 
     /**
@@ -235,13 +200,12 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
      * <li>依次执行同步关系A >> B 然后 A >> C ...</li>
      * </ol>
      */
-    final class QuartzListener extends AbstractListener {
+    final class QuartzConsumer extends AbstractConsumer {
 
         private List<FieldPicker> tablePicker;
 
-        public QuartzListener(Mapping mapping, List<TableGroup> tableGroups) {
+        public QuartzConsumer(Mapping mapping, List<TableGroup> tableGroups) {
             this.mapping = mapping;
-            this.metaId = mapping.getMetaId();
             this.tablePicker = new LinkedList<>();
             tableGroups.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
         }
@@ -277,19 +241,13 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
      * <li>该模式下,会监听表所有字段.</li>
      * </ol>
      */
-    final class LogListener extends AbstractListener {
+    final class LogConsumer extends AbstractConsumer {
 
-        private Extractor extractor;
         private Map<String, List<FieldPicker>> tablePicker;
-        private AtomicInteger eventCounter;
-        private static final int MAX_LOG_CACHE_SIZE = 128;
 
-        public LogListener(Mapping mapping, List<TableGroup> tableGroups, Extractor extractor) {
+        public LogConsumer(Mapping mapping, List<TableGroup> tableGroups) {
             this.mapping = mapping;
-            this.metaId = mapping.getMetaId();
-            this.extractor = extractor;
             this.tablePicker = new LinkedHashMap<>();
-            this.eventCounter = new AtomicInteger();
             tableGroups.forEach(t -> {
                 final Table table = t.getSourceTable();
                 final String tableName = table.getName();
@@ -311,14 +269,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
                         parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
                     }
                 });
-                eventCounter.set(0);
-                return;
-            }
-
-            // 防止挤压无效的增量数据,刷新最新的有效记录点
-            if (eventCounter.incrementAndGet() >= MAX_LOG_CACHE_SIZE) {
-                extractor.forceFlushEvent();
-                eventCounter.set(0);
             }
         }
     }