Selaa lähdekoodia

优化消费模型

AE86 1 vuosi sitten
vanhempi
säilyke
31968a9d80

+ 1 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java → dbsyncer-common/src/main/java/org/dbsyncer/common/event/Watcher.java

@@ -7,7 +7,7 @@ import java.util.Map;
  * @Author AE86
  * @Date 2020-05-11 22:50
  */
-public interface Event {
+public interface Watcher {
 
     /**
      * 数据变更事件

+ 26 - 23
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.listener;
 
-import org.dbsyncer.common.event.Event;
+import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
@@ -42,14 +42,14 @@ public abstract class AbstractExtractor implements Extractor {
     protected List<Table> sourceTable;
     protected Map<String, String> snapshot;
     protected String metaId;
-    private Event consumer;
+    private Watcher watcher;
     private BlockingQueue<RowChangedEvent> queue;
-    private Thread consumerThread;
-    private volatile boolean enableConsumerThread;
+    private Thread consumer;
+    private volatile boolean enableConsumer;
     private Lock lock = new ReentrantLock();
     private Condition isFull;
     private final Duration pollInterval = Duration.of(500, ChronoUnit.MILLIS);
-    private static final int FLUSH_DELAYED_SECONDS = 30;
+    private static final int FLUSH_DELAYED_SECONDS = 20;
     private long updateTime;
 
 
@@ -57,15 +57,15 @@ public abstract class AbstractExtractor implements Extractor {
     public void start() {
         this.lock = new ReentrantLock();
         this.isFull = lock.newCondition();
-        enableConsumerThread = true;
-        consumerThread = new Thread(() -> {
-            while (enableConsumerThread) {
+        enableConsumer = true;
+        consumer = new Thread(() -> {
+            while (enableConsumer) {
                 try {
                     // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
                     RowChangedEvent event = queue.take();
                     if (null != event) {
                         // TODO 待优化多表并行模型
-                        consumer.changedEvent(event);
+                        watcher.changedEvent(event);
                         // 更新增量点
                         refreshEvent(event);
                         updateTime = Instant.now().toEpochMilli();
@@ -77,22 +77,22 @@ public abstract class AbstractExtractor implements Extractor {
                 }
             }
         });
-        consumerThread.setName(new StringBuilder("extractor-consumer-").append(metaId).toString());
-        consumerThread.setDaemon(false);
-        consumerThread.start();
+        consumer.setName(new StringBuilder("extractor-consumer-").append(metaId).toString());
+        consumer.setDaemon(false);
+        consumer.start();
     }
 
     @Override
     public void close() {
-        enableConsumerThread = false;
-        if (consumerThread != null && !enableConsumerThread) {
-            consumerThread.interrupt();
+        enableConsumer = false;
+        if (consumer != null && !enableConsumer) {
+            consumer.interrupt();
         }
     }
 
     @Override
-    public void register(Event consumer) {
-        this.consumer = consumer;
+    public void register(Watcher watcher) {
+        this.watcher = watcher;
     }
 
     @Override
@@ -119,23 +119,26 @@ public abstract class AbstractExtractor implements Extractor {
 
     @Override
     public void flushEvent() {
-        // 30s内更新,执行写入
+        // 20s内更新,执行写入
         if (updateTime > 0 && updateTime > Timestamp.valueOf(LocalDateTime.now().minusSeconds(FLUSH_DELAYED_SECONDS)).getTime()) {
-            forceFlushEvent();
+            if (!CollectionUtils.isEmpty(snapshot)) {
+                watcher.flushEvent(snapshot);
+            }
         }
     }
 
 
     @Override
     public void forceFlushEvent() {
+        logger.info("snapshot:{}", snapshot);
         if (!CollectionUtils.isEmpty(snapshot)) {
-            consumer.flushEvent(snapshot);
+            watcher.flushEvent(snapshot);
         }
     }
 
     @Override
     public void errorEvent(Exception e) {
-        consumer.errorEvent(e);
+        watcher.errorEvent(e);
     }
 
     /**
@@ -172,11 +175,11 @@ public abstract class AbstractExtractor implements Extractor {
             if (lock) {
                 if (!queue.offer(event)) {
                     // 容量上限,阻塞重试
-                    while (!queue.offer(event)) {
+                    while (!queue.offer(event) && enableConsumer) {
                         try {
                             this.isFull.await(pollInterval.toMillis(), TimeUnit.MILLISECONDS);
                         } catch (InterruptedException e) {
-                            logger.error(e.getMessage(), e);
+                            break;
                         }
                     }
                 }

+ 3 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.listener;
 
-import org.dbsyncer.common.event.Event;
+import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.event.RowChangedEvent;
 
 public interface Extractor {
@@ -18,9 +18,9 @@ public interface Extractor {
     /**
      * 注册监听事件(获取增量数据)
      *
-     * @param event
+     * @param watcher
      */
-    void register(Event event);
+    void register(Watcher watcher);
 
     /**
      * 数据变更事件

+ 3 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.manager.puller;
 
-import org.dbsyncer.common.event.Event;
+import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
@@ -171,14 +171,14 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             extractor.setSourceTable(sourceTable);
             extractor.setSnapshot(meta.getSnapshot());
             extractor.setMetaId(meta.getId());
-            extractor.setQueue(new LinkedBlockingQueue<>(8162));
+            extractor.setQueue(new LinkedBlockingQueue<>(8192));
             return extractor;
         }
 
         throw new ManagerException("未知的监听配置.");
     }
 
-    abstract class AbstractConsumer implements Event {
+    abstract class AbstractConsumer implements Watcher {
         protected Mapping mapping;
 
         @Override

+ 1 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableWriterBufferActuatorStrategy.java

@@ -20,10 +20,7 @@ public final class EnableWriterBufferActuatorStrategy extends AbstractWriterBinl
 
     @Override
     public void execute(String tableGroupId, String event, Map<String, Object> data) {
-        // 缓存队列满时转写磁盘
-        if (!writerBufferActuator.offer(new WriterRequest(tableGroupId, event, data))) {
-            super.flush(tableGroupId, event, data);
-        }
+        writerBufferActuator.offer(new WriterRequest(tableGroupId, event, data));
     }
 
     @Override