فهرست منبع

优化消费模型

AE86 1 سال پیش
والد
کامیت
f91224c5f1

+ 4 - 5
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Watcher.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.common.event;
 
+import org.dbsyncer.common.spi.Extractor;
+
 import java.util.Map;
 
 /**
@@ -32,15 +34,12 @@ public interface Watcher {
      */
     void errorEvent(Exception e);
 
-    /**
-     * 刷新Meta更新时间
-     */
-    void refreshMetaUpdateTime();
-
     /**
      * 获取Meta更新时间
      *
      * @return
      */
     long getMetaUpdateTime();
+
+    default void setExtractor(Extractor extractor){}
 }

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java → dbsyncer-common/src/main/java/org/dbsyncer/common/spi/Extractor.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.listener;
+package org.dbsyncer.common.spi;
 
 import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.Watcher;

+ 2 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -4,6 +4,7 @@ import org.dbsyncer.common.event.ChangedEvent;
 import org.dbsyncer.common.event.Watcher;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.common.spi.Extractor;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.constant.ConnectorConstant;
@@ -41,6 +42,7 @@ public abstract class AbstractExtractor implements Extractor {
     @Override
     public void register(Watcher watcher) {
         this.watcher = watcher;
+        watcher.setExtractor(this);
     }
 
     @Override
@@ -110,8 +112,6 @@ public abstract class AbstractExtractor implements Extractor {
     private void processEvent(boolean permitEvent, ChangedEvent event) {
         if (permitEvent) {
             watcher.changeEvent(event);
-            // TODO 待优化回调
-            refreshEvent(event);
         }
     }
 

+ 8 - 7
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -11,7 +11,7 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.listener.AbstractExtractor;
-import org.dbsyncer.listener.Extractor;
+import org.dbsyncer.common.spi.Extractor;
 import org.dbsyncer.listener.Listener;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerTypeEnum;
@@ -186,6 +186,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         @Override
         public void changeEvent(ChangedEvent event) {
             onChange((E) event);
+            meta.setUpdateTime(Instant.now().toEpochMilli());
         }
 
         @Override
@@ -199,11 +200,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
         }
 
-        @Override
-        public void refreshMetaUpdateTime() {
-            meta.setUpdateTime(Instant.now().toEpochMilli());
-        }
-
         @Override
         public long getMetaUpdateTime() {
             return meta.getUpdateTime();
@@ -231,7 +227,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
     }
 
     final class LogConsumer extends AbstractConsumer<RowChangedEvent> {
-
+        private Extractor extractor;
         private Map<String, List<FieldPicker>> tablePicker = new LinkedHashMap<>();
 
         public LogConsumer(Mapping mapping, List<TableGroup> tableGroups) {
@@ -258,8 +254,13 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
                         parser.execute(picker.getTableGroup(), event);
                     }
                 });
+                extractor.refreshEvent(event);
             }
         }
+
+        public void setExtractor(Extractor extractor) {
+            this.extractor = extractor;
+        }
     }
 
 }