فهرست منبع

update package

AE86 5 سال پیش
والد
کامیت
a6e60ec951

+ 2 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/LogConfigChecker.java

@@ -4,8 +4,8 @@ import org.dbsyncer.biz.checker.MappingConfigChecker;
 import org.dbsyncer.biz.checker.MappingLogConfigChecker;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.enums.ListenerTypeEnum;
 import org.dbsyncer.manager.Manager;
-import org.dbsyncer.manager.enums.IncrementEnum;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.parser.model.Mapping;
@@ -51,7 +51,7 @@ public class LogConfigChecker implements MappingConfigChecker, ApplicationContex
         ListenerConfig listener = mapping.getListener();
         Assert.notNull(listener, "ListenerConfig can not be null.");
 
-        listener.setListenerType(IncrementEnum.LOG.getType());
+        listener.setListenerType(ListenerTypeEnum.LOG.getType());
     }
 
 }

+ 2 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -8,8 +8,8 @@ import org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.listener.config.ListenerConfig;
+import org.dbsyncer.listener.enums.ListenerTypeEnum;
 import org.dbsyncer.manager.Manager;
-import org.dbsyncer.manager.enums.IncrementEnum;
 import org.dbsyncer.parser.enums.ModelEnum;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.storage.constant.ConfigConstant;
@@ -66,7 +66,7 @@ public class MappingChecker extends AbstractChecker implements ApplicationContex
         mapping.setSourceConnectorId(sourceConnectorId);
         mapping.setTargetConnectorId(targetConnectorId);
         mapping.setModel(ModelEnum.FULL.getCode());
-        mapping.setListener(new ListenerConfig(IncrementEnum.TIMING.getType()));
+        mapping.setListener(new ListenerConfig(ListenerTypeEnum.TIMING.getType()));
 
         // 修改基本配置
         this.modifyConfigModel(mapping, params);

+ 2 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/TimingConfigChecker.java

@@ -2,8 +2,8 @@ package org.dbsyncer.biz.checker.impl.mapping;
 
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.biz.checker.MappingConfigChecker;
-import org.dbsyncer.manager.enums.IncrementEnum;
 import org.dbsyncer.listener.config.ListenerConfig;
+import org.dbsyncer.listener.enums.ListenerTypeEnum;
 import org.dbsyncer.parser.model.Mapping;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
@@ -47,7 +47,7 @@ public class TimingConfigChecker implements MappingConfigChecker {
             config.setDelete(delete);
         }
 
-        config.setListenerType(IncrementEnum.TIMING.getType());
+        config.setListenerType(ListenerTypeEnum.TIMING.getType());
         mapping.setListener(config);
     }
 

+ 6 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Action.java

@@ -0,0 +1,6 @@
+package org.dbsyncer.listener;
+
+public interface Action {
+
+    void execute(Extractor extractor);
+}

+ 9 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/DefaultExtractor.java

@@ -20,6 +20,11 @@ public abstract class DefaultExtractor implements Extractor {
     private ListenerConfig listenerConfig;
     private Map<String, String> map;
     private List<Event> watcher;
+    private Action action;
+
+    public void run() {
+        action.execute(this);
+    }
 
     public void addListener(Event event) {
         if (null != event) {
@@ -59,4 +64,8 @@ public abstract class DefaultExtractor implements Extractor {
     public void setMap(Map<String, String> map) {
         this.map = map;
     }
+
+    public void setAction(Action action) {
+        this.action = action;
+    }
 }

+ 3 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java

@@ -4,6 +4,7 @@ package org.dbsyncer.listener;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerEnum;
+import org.dbsyncer.listener.enums.ListenerTypeEnum;
 import org.springframework.stereotype.Component;
 
 import java.util.Map;
@@ -14,6 +15,8 @@ public class ListenerFactory implements Listener {
     @Override
     public DefaultExtractor createExtractor(ConnectorConfig config, ListenerConfig listenerConfig, Map<String, String> map) {
         DefaultExtractor extractor = ListenerEnum.getExtractor(config.getConnectorType());
+        // log/timing
+        extractor.setAction(ListenerTypeEnum.getAction(listenerConfig.getListenerType()));
         extractor.setConnectorConfig(config);
         extractor.setListenerConfig(listenerConfig);
         return extractor;

+ 47 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerTypeEnum.java

@@ -0,0 +1,47 @@
+package org.dbsyncer.listener.enums;
+
+import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.listener.Action;
+import org.dbsyncer.listener.ListenerException;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/24 14:19
+ */
+public enum ListenerTypeEnum {
+
+    /**
+     * 日志
+     */
+    LOG("log", extractor -> extractor.extract()),
+    /**
+     * 定时
+     */
+    TIMING("timing", extractor -> extractor.extractTiming());
+
+    private String type;
+    private Action action;
+
+    ListenerTypeEnum(String type, Action action) {
+        this.type = type;
+        this.action = action;
+    }
+
+    public static Action getAction(String type) throws ListenerException {
+        for (ListenerTypeEnum e : ListenerTypeEnum.values()) {
+            if (StringUtils.equals(type, e.getType())) {
+                return e.getAction();
+            }
+        }
+        throw new ListenerException(String.format("Action type \"%s\" does not exist.", type));
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public Action getAction() {
+        return action;
+    }
+}

+ 0 - 60
dbsyncer-manager/src/main/java/org/dbsyncer/manager/enums/IncrementEnum.java

@@ -1,60 +0,0 @@
-package org.dbsyncer.manager.enums;
-
-import org.apache.commons.lang.StringUtils;
-import org.dbsyncer.listener.Extractor;
-import org.dbsyncer.manager.ManagerException;
-import org.dbsyncer.manager.puller.Increment;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2020/04/24 14:19
- */
-public enum IncrementEnum {
-
-    /**
-     * 日志
-     */
-    LOG("log", new Increment() {
-
-        @Override
-        public void execute(Extractor extractor) {
-            extractor.extract();
-        }
-    }),
-    /**
-     * 定时
-     */
-    TIMING("timing", new Increment() {
-
-        @Override
-        public void execute(Extractor extractor) {
-            extractor.extractTiming();
-        }
-    });
-
-    private String    type;
-    private Increment increment;
-
-    IncrementEnum(String type, Increment increment) {
-        this.type = type;
-        this.increment = increment;
-    }
-
-    public static Increment getIncrement(String type) throws ManagerException {
-        for (IncrementEnum e : IncrementEnum.values()) {
-            if (StringUtils.equals(type, e.getType())) {
-                return e.getIncrement();
-            }
-        }
-        throw new ManagerException(String.format("Increment type \"%s\" does not exist.", type));
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public Increment getIncrement() {
-        return increment;
-    }
-}

+ 2 - 8
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -1,13 +1,11 @@
 package org.dbsyncer.manager.puller.impl;
 
 import org.dbsyncer.common.event.Event;
-import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.listener.DefaultExtractor;
 import org.dbsyncer.listener.Extractor;
 import org.dbsyncer.listener.Listener;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.manager.Manager;
-import org.dbsyncer.manager.enums.IncrementEnum;
 import org.dbsyncer.manager.puller.AbstractPuller;
 import org.dbsyncer.manager.puller.Increment;
 import org.dbsyncer.parser.Parser;
@@ -53,17 +51,13 @@ public class IncrementPuller extends AbstractPuller {
         final String mappingId = mapping.getId();
         final String metaId = mapping.getMetaId();
         try {
-            ListenerConfig listenerConfig = mapping.getListener();
-            // log/timing
-            Increment increment = IncrementEnum.getIncrement(listenerConfig.getListenerType());
-            Assert.notNull(increment, "未知的增量同步方式.");
             Connector connector = manager.getConnector(mapping.getSourceConnectorId());
             Assert.notNull(connector, "连接器不能为空.");
             List<TableGroup> list = manager.getTableGroupAll(mappingId);
             Assert.notEmpty(list, "映射关系不能为空");
             Meta meta = manager.getMeta(metaId);
             Assert.notNull(meta, "Meta不能为空.");
-            DefaultExtractor extractor = listener.createExtractor(connector.getConfig(), listenerConfig, meta.getMap());
+            DefaultExtractor extractor = listener.createExtractor(connector.getConfig(), mapping.getListener(), meta.getMap());
             Assert.notNull(extractor, "未知的监听配置.");
 
             // 监听数据变更事件
@@ -72,7 +66,7 @@ public class IncrementPuller extends AbstractPuller {
 
             // 执行任务
             logger.info("启动成功:{}", metaId);
-            increment.execute(map.get(metaId));
+            map.get(metaId).run();
         } catch (Exception e) {
             logger.error("任务:{} 运行异常:{}", metaId, e.getMessage());
         } finally {