AE86 5 лет назад
Родитель
Сommit
16ed62f86e

+ 10 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -21,6 +21,7 @@ public abstract class AbstractExtractor implements Extractor {
     protected Map<String, String> map;
     private List<Event> watcher;
 
+    @Override
     public void addListener(Event event) {
         if (null != event) {
             if (null == watcher) {
@@ -30,6 +31,7 @@ public abstract class AbstractExtractor implements Extractor {
         }
     }
 
+    @Override
     public void clearAllListener() {
         if (null != watcher) {
             watcher.clear();
@@ -37,24 +39,28 @@ public abstract class AbstractExtractor implements Extractor {
         }
     }
 
-    public void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after) {
+    @Override
+    public void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
         if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.changedLogEvent(tableName, event, before, after));
+            watcher.forEach(w -> w.changedQuartzEvent(tableGroupIndex, event, before, after));
         }
     }
 
-    public void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
+    @Override
+    public void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after) {
         if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.changedQuartzEvent(tableGroupIndex, event, before, after));
+            watcher.forEach(w -> w.changedLogEvent(tableName, event, before, after));
         }
     }
 
+    @Override
     public void flushEvent() {
         if (!CollectionUtils.isEmpty(watcher)) {
             watcher.forEach(w -> w.flushEvent(map));
         }
     }
 
+    @Override
     public void errorEvent(Exception e) {
         if (!CollectionUtils.isEmpty(watcher)) {
             watcher.forEach(w -> w.errorEvent(e));

+ 50 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -1,9 +1,14 @@
 package org.dbsyncer.listener;
 
+import org.dbsyncer.common.event.Event;
+
+import java.util.List;
+import java.util.Map;
+
 public interface Extractor {
 
     /**
-     * 启动定时/日志等方式抽取增量数据
+     * 启动定时/日志抽取任务
      */
     void start();
 
@@ -12,4 +17,48 @@ public interface Extractor {
      */
     void close();
 
+    /**
+     * 添加监听器(获取增量数据)
+     *
+     * @param event
+     */
+    void addListener(Event event);
+
+    /**
+     * 清空监听器
+     */
+    void clearAllListener();
+
+    /**
+     * 定时模式: 监听增量事件
+     *
+     * @param tableGroupIndex
+     * @param event
+     * @param before
+     * @param after
+     */
+    void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after);
+
+    /**
+     * 日志模式: 监听增量事件
+     *
+     * @param tableName
+     * @param event
+     * @param before
+     * @param after
+     */
+    void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after);
+
+    /**
+     * 刷新增量点事件
+     */
+    void flushEvent();
+
+    /**
+     * 异常事件
+     *
+     * @param e
+     */
+    void errorEvent(Exception e);
+
 }

+ 3 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -7,6 +7,7 @@ import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.Extractor;
 import org.dbsyncer.listener.Listener;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerTypeEnum;
@@ -66,7 +67,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
 
     private String key;
 
-    private Map<String, AbstractExtractor> map = new ConcurrentHashMap<>();
+    private Map<String, Extractor> map = new ConcurrentHashMap<>();
 
     @Override
     public void asyncStart(Mapping mapping) {
@@ -99,7 +100,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
 
     @Override
     public void close(String metaId) {
-        AbstractExtractor extractor = map.get(metaId);
+        Extractor extractor = map.get(metaId);
         if (null != extractor) {
             extractor.clearAllListener();
             extractor.close();