AE86 5 年之前
父節點
當前提交
b0672fde99

+ 3 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/model/Task.java

@@ -10,6 +10,9 @@ public class Task {
 
     private long endTime;
 
+    public Task() {
+    }
+
     public Task(String id) {
         this.id = id;
         this.state = StateEnum.RUNNING;

+ 9 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Listener.java

@@ -0,0 +1,9 @@
+package org.dbsyncer.listener;
+
+import org.dbsyncer.common.model.Task;
+import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.config.ListenerConfig;
+
+public interface Listener {
+    void execute(Task t, ListenerConfig listenerConfig, ConnectorConfig config);
+}

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java

@@ -7,7 +7,7 @@ import org.dbsyncer.listener.config.ListenerConfig;
 import org.springframework.stereotype.Component;
 
 @Component
-public class ListenerFactory {
+public class ListenerFactory implements Listener{
 
     public void execute(Task task, ListenerConfig listenerConfig, ConnectorConfig connectorConfig) {
         // extract

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Task.java → dbsyncer-manager/src/main/java/org/dbsyncer/manager/Executor.java

@@ -10,7 +10,7 @@ import org.dbsyncer.parser.model.Mapping;
  * @version 1.0.0
  * @date 2020/04/26 16:32
  */
-public interface Task {
+public interface Executor {
 
     /**
      * 启动同步任务

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -19,7 +19,7 @@ import java.util.Map;
  * @version 1.0.0
  * @date 2019/9/30 20:31
  */
-public interface Manager extends Task {
+public interface Manager extends Executor {
 
     boolean alive(ConnectorConfig config);
 

+ 56 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/enums/TaskEnum.java

@@ -0,0 +1,56 @@
+package org.dbsyncer.manager.enums;
+
+import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.common.model.Task;
+import org.dbsyncer.manager.ManagerException;
+import org.dbsyncer.manager.extractor.task.LogTask;
+import org.dbsyncer.manager.extractor.task.TimingTask;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/24 14:19
+ */
+public enum TaskEnum {
+
+    /**
+     * 定时
+     */
+    TIMING("timing", new TimingTask()),
+    /**
+     * 监听日志
+     */
+    LOG("log", new LogTask());
+
+    private String type;
+    private Task task;
+
+    TaskEnum(String type, Task task) {
+        this.type = type;
+        this.task = task;
+    }
+
+    /**
+     * 获取同步任务类型
+     *
+     * @param type
+     * @return
+     * @throws ManagerException
+     */
+    public static Task getIncrementTask(String type) throws ManagerException {
+        for (TaskEnum e : TaskEnum.values()) {
+            if (StringUtils.equals(type, e.getType())) {
+                return e.getTask();
+            }
+        }
+        throw new ManagerException(String.format("Task type \"%s\" does not exist.", type));
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public Task getTask() {
+        return task;
+    }
+}

+ 16 - 11
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/impl/IncrementExtractor.java

@@ -1,11 +1,11 @@
 package org.dbsyncer.manager.extractor.impl;
 
-import org.dbsyncer.common.event.FullRefreshEvent;
 import org.dbsyncer.common.event.IncrementRefreshEvent;
 import org.dbsyncer.common.model.Task;
-import org.dbsyncer.listener.ListenerFactory;
+import org.dbsyncer.listener.Listener;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.manager.enums.TaskEnum;
 import org.dbsyncer.manager.extractor.AbstractExtractor;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
@@ -33,7 +33,7 @@ public class IncrementExtractor extends AbstractExtractor implements Application
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Autowired
-    private ListenerFactory listenerFactory;
+    private Listener listener;
 
     @Autowired
     private Manager manager;
@@ -42,18 +42,23 @@ public class IncrementExtractor extends AbstractExtractor implements Application
 
     @Override
     public void asyncStart(Mapping mapping) {
+        ListenerConfig listenerConfig = mapping.getListener();
+        Connector connector = manager.getConnector(mapping.getSourceConnectorId());
+        Assert.notNull(connector, "连接器不能为空.");
+        // log/timing
+        String type = listenerConfig.getListenerType();
+        Task task = TaskEnum.getIncrementTask(type);
+        Assert.notNull(task, "未知的增量同步方式.");
+
         final String metaId = mapping.getMetaId();
-        map.putIfAbsent(metaId, new Task(metaId));
+        task.setId(metaId);
+        map.putIfAbsent(metaId, task);
 
         try {
-            ListenerConfig listenerConfig = mapping.getListener();
-            Connector connector = manager.getConnector(mapping.getSourceConnectorId());
-            Assert.notNull(connector, "连接器不能为空.");
-
             // 执行任务
             logger.info("启动任务:{}", metaId);
-            Task task = map.get(metaId);
-            listenerFactory.execute(task, listenerConfig, connector.getConfig());
+            Task t = map.get(metaId);
+            listener.execute(t, listenerConfig, connector.getConfig());
 
         } catch (Exception e) {
             // TODO 记录错误日志
@@ -82,7 +87,7 @@ public class IncrementExtractor extends AbstractExtractor implements Application
     private void flush(Task task) {
         Meta meta = manager.getMeta(task.getId());
         Assert.notNull(meta, "检查meta为空.");
-
         manager.editMeta(meta);
     }
+
 }

+ 0 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/increment/Listener.java

@@ -1,4 +0,0 @@
-package org.dbsyncer.manager.extractor.increment;
-
-public interface Listener {
-}

+ 0 - 15
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/increment/LogListener.java

@@ -1,15 +0,0 @@
-package org.dbsyncer.manager.extractor.increment;
-
-import org.springframework.stereotype.Component;
-
-/**
- * 监听日志同步
- *
- * @author AE86
- * @version 1.0.0
- * @date 2020/05/05 15:28
- */
-@Component
-public class LogListener implements Listener{
-
-}

+ 0 - 15
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/increment/TimingListener.java

@@ -1,15 +0,0 @@
-package org.dbsyncer.manager.extractor.increment;
-
-import org.springframework.stereotype.Component;
-
-/**
- * 定时同步
- *
- * @author AE86
- * @version 1.0.0
- * @date 2020/05/05 15:28
- */
-@Component
-public class TimingListener implements Listener{
-
-}

+ 11 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/task/LogTask.java

@@ -0,0 +1,11 @@
+package org.dbsyncer.manager.extractor.task;
+
+import org.dbsyncer.common.model.Task;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-08 00:31
+ */
+public class LogTask extends Task {
+}

+ 11 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/task/TimingTask.java

@@ -0,0 +1,11 @@
+package org.dbsyncer.manager.extractor.task;
+
+import org.dbsyncer.common.model.Task;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-08 00:32
+ */
+public class TimingTask extends Task {
+}