AE86 5 роки тому
батько
коміт
d06965cdd5

+ 12 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -0,0 +1,12 @@
+package org.dbsyncer.common.event;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-11 22:50
+ */
+public interface Event {
+
+    void closedEvent();
+
+}

+ 23 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/model/Task.java

@@ -1,5 +1,10 @@
 package org.dbsyncer.common.model;
 
+import org.dbsyncer.common.event.Event;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
 public class Task {
 
     private String id;
@@ -10,18 +15,35 @@ public class Task {
 
     private long endTime;
 
+    private List<Event> watcher;
+
     public Task() {
     }
 
     public Task(String id) {
         this.id = id;
         this.state = StateEnum.RUNNING;
+        watcher = new CopyOnWriteArrayList<>();
     }
 
     public void stop() {
         this.state = StateEnum.STOP;
     }
 
+    /**
+     * 订阅事件
+     */
+    public void attachClosedEvent(Event event) {
+        watcher.add(event);
+    }
+
+    /**
+     * 通知关闭事件
+     */
+    public void notifyClosedEvent() {
+        watcher.forEach(w -> w.closedEvent());
+    }
+
     public boolean isRunning() {
         return StateEnum.RUNNING == state;
     }
@@ -50,7 +72,7 @@ public class Task {
         this.endTime = endTime;
     }
 
-    public enum StateEnum{
+    public enum StateEnum {
         /**
          * 运行
          */

+ 1 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ldap/LdapListener.java

@@ -4,7 +4,6 @@ import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.config.LdapConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.LdapConstant;
-import org.dbsyncer.listener.AbstractListener;
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
@@ -29,7 +28,7 @@ import java.util.Hashtable;
  * @Description: 监听ldap新增、修改、刪除操作
  * @date: 2017年8月22日 下午6:39:05
  */
-public final class LdapListener extends AbstractListener {
+public final class LdapListener {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -41,7 +40,6 @@ public final class LdapListener extends AbstractListener {
     // 修改事件监听
     private ObjectChangeListener objectChangeListener;
 
-    @Override
     public void onClose() {
         synchronized (context) {
             try {
@@ -57,7 +55,6 @@ public final class LdapListener extends AbstractListener {
         }
     }
 
-    @Override
     public void run() {
 //        // 启动监听线程
 //        // 1、获取驱动

+ 0 - 56
dbsyncer-manager/src/main/java/org/dbsyncer/manager/enums/TaskEnum.java

@@ -1,56 +0,0 @@
-package org.dbsyncer.manager.enums;
-
-import org.apache.commons.lang.StringUtils;
-import org.dbsyncer.common.model.Task;
-import org.dbsyncer.manager.ManagerException;
-import org.dbsyncer.manager.extractor.increment.LogIncrement;
-import org.dbsyncer.manager.extractor.increment.TimingIncrement;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2020/04/24 14:19
- */
-public enum TaskEnum {
-
-    /**
-     * 定时
-     */
-    TIMING("timing", new TimingIncrement()),
-    /**
-     * 监听日志
-     */
-    LOG("log", new LogIncrement());
-
-    private String type;
-    private Task task;
-
-    TaskEnum(String type, Task task) {
-        this.type = type;
-        this.task = task;
-    }
-
-    /**
-     * 获取同步任务类型
-     *
-     * @param type
-     * @return
-     * @throws ManagerException
-     */
-    public static Task getIncrementTask(String type) throws ManagerException {
-        for (TaskEnum e : TaskEnum.values()) {
-            if (StringUtils.equals(type, e.getType())) {
-                return e.getTask();
-            }
-        }
-        throw new ManagerException(String.format("Task type \"%s\" does not exist.", type));
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public Task getTask() {
-        return task;
-    }
-}

+ 35 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/AbstractIncrement.java

@@ -0,0 +1,35 @@
+package org.dbsyncer.manager.extractor;
+
+import org.dbsyncer.common.event.Event;
+import org.dbsyncer.common.model.Task;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.ListenerConfig;
+import org.springframework.scheduling.annotation.Async;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-11 22:44
+ */
+public abstract class AbstractIncrement implements Increment {
+
+    /**
+     * 启动
+     */
+    protected abstract void run(ListenerConfig listenerConfig, Connector connector);
+
+    /**
+     * 关闭
+     */
+    protected abstract void close();
+
+    @Override
+    public void execute(Task task, ListenerConfig listenerConfig, Connector connector) {
+        // 注册关闭监听事件
+        task.attachClosedEvent(() -> close());
+
+        // 启动
+        run(listenerConfig, connector);
+    }
+
+}

+ 7 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/Increment.java

@@ -1,4 +1,11 @@
 package org.dbsyncer.manager.extractor;
 
+import org.dbsyncer.common.model.Task;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.ListenerConfig;
+
 public interface Increment {
+
+    void execute(Task task, ListenerConfig listenerConfig, Connector connector);
+
 }

+ 4 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/impl/IncrementExtractor.java

@@ -67,14 +67,15 @@ public class IncrementExtractor extends AbstractExtractor implements Application
         try {
             // 执行任务
             logger.info("启动任务:{}", metaId);
-            // TODO increment
+            Task task = map.get(metaId);
+            increment.execute(task, listenerConfig, connector);
         } catch (Exception e) {
             // TODO 记录错误日志
             logger.error(e.getMessage());
         } finally {
             map.remove(metaId);
             publishClosedEvent(metaId);
-            logger.info("结束任务:{}", metaId);
+            logger.info("启动成功:{}", metaId);
         }
     }
 
@@ -82,7 +83,7 @@ public class IncrementExtractor extends AbstractExtractor implements Application
     public void close(String metaId) {
         Task task = map.get(metaId);
         if (null != task) {
-            task.stop();
+            task.notifyClosedEvent();
         }
     }
 

+ 30 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/increment/LogIncrement.java

@@ -1,11 +1,39 @@
 package org.dbsyncer.manager.extractor.increment;
 
-import org.dbsyncer.manager.extractor.Increment;
+import org.dbsyncer.manager.extractor.AbstractIncrement;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.ListenerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
 
 /**
  * @version 1.0.0
  * @Author AE86
  * @Date 2020-05-08 00:31
  */
-public class LogIncrement implements Increment {
+@Component
+public class LogIncrement extends AbstractIncrement {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Override
+    protected void run(ListenerConfig listenerConfig, Connector connector) {
+        for (int i = 0; i < 10; i++) {
+            try {
+                logger.info("模拟监听任务");
+                TimeUnit.SECONDS.sleep(3);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Override
+    protected void close() {
+        logger.info("关闭模拟监听任务");
+
+    }
 }

+ 17 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/increment/TimingIncrement.java

@@ -1,11 +1,25 @@
 package org.dbsyncer.manager.extractor.increment;
 
-import org.dbsyncer.manager.extractor.Increment;
+import org.dbsyncer.manager.extractor.AbstractIncrement;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.ListenerConfig;
+import org.springframework.stereotype.Component;
 
 /**
  * @version 1.0.0
  * @Author AE86
  * @Date 2020-05-08 00:32
  */
-public class TimingIncrement implements Increment {
-}
+@Component
+public class TimingIncrement extends AbstractIncrement {
+
+    @Override
+    protected void run(ListenerConfig listenerConfig, Connector connector) {
+
+    }
+
+    @Override
+    protected void close() {
+
+    }
+}