AE86 5 jaren geleden
bovenliggende
commit
9e03e8b2c5

+ 23 - 23
dbsyncer-common/src/main/java/org/dbsyncer/common/event/RefreshEvent.java → dbsyncer-common/src/main/java/org/dbsyncer/common/event/FullRefreshEvent.java

@@ -1,24 +1,24 @@
-package org.dbsyncer.common.event;
-
-import org.dbsyncer.common.model.Task;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.event.ApplicationContextEvent;
-
-public class RefreshEvent extends ApplicationContextEvent {
-
-    private Task task;
-
-    /**
-     * Create a new ContextStartedEvent.
-     *
-     * @param source the {@code ApplicationContext} that the event is raised for (must not be {@code null})
-     */
-    public RefreshEvent(ApplicationContext source, Task task) {
-        super(source);
-        this.task = task;
-    }
-
-    public Task getTask() {
-        return task;
-    }
+package org.dbsyncer.common.event;
+
+import org.dbsyncer.common.model.Task;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.event.ApplicationContextEvent;
+
+public class FullRefreshEvent extends ApplicationContextEvent {
+
+    private Task task;
+
+    /**
+     * Create a new ContextStartedEvent.
+     *
+     * @param source the {@code ApplicationContext} that the event is raised for (must not be {@code null})
+     */
+    public FullRefreshEvent(ApplicationContext source, Task task) {
+        super(source);
+        this.task = task;
+    }
+
+    public Task getTask() {
+        return task;
+    }
 }
 }

+ 24 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/IncrementRefreshEvent.java

@@ -0,0 +1,24 @@
+package org.dbsyncer.common.event;
+
+import org.dbsyncer.common.model.Task;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.event.ApplicationContextEvent;
+
+public class IncrementRefreshEvent extends ApplicationContextEvent {
+
+    private Task task;
+
+    /**
+     * Create a new ContextStartedEvent.
+     *
+     * @param source the {@code ApplicationContext} that the event is raised for (must not be {@code null})
+     */
+    public IncrementRefreshEvent(ApplicationContext source, Task task) {
+        super(source);
+        this.task = task;
+    }
+
+    public Task getTask() {
+        return task;
+    }
+}

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -19,7 +19,7 @@ import java.util.Map;
  * @version 1.0.0
  * @version 1.0.0
  * @date 2019/9/30 20:31
  * @date 2019/9/30 20:31
  */
  */
-public interface Manager extends TaskExecutor{
+public interface Manager extends Task {
 
 
     boolean alive(ConnectorConfig config);
     boolean alive(ConnectorConfig config);
 
 

+ 36 - 36
dbsyncer-manager/src/main/java/org/dbsyncer/manager/TaskExecutor.java → dbsyncer-manager/src/main/java/org/dbsyncer/manager/Task.java

@@ -1,37 +1,37 @@
-package org.dbsyncer.manager;
-
-import org.dbsyncer.parser.enums.MetaEnum;
-import org.dbsyncer.parser.model.Mapping;
-
-/**
- * 同步任务执行器
- *
- * @author AE86
- * @version 1.0.0
- * @date 2020/04/26 16:32
- */
-public interface TaskExecutor {
-
-    /**
-     * 启动同步任务
-     *
-     * @param mapping
-     */
-    void start(Mapping mapping);
-
-    /**
-     * 关闭同步任务
-     *
-     * @param mapping
-     */
-    void close(Mapping mapping);
-
-    /**
-     * 切换meta状态
-     *
-     * @param metaId
-     * @param metaEnum
-     */
-    void changeMetaState(String metaId, MetaEnum metaEnum);
-
+package org.dbsyncer.manager;
+
+import org.dbsyncer.parser.enums.MetaEnum;
+import org.dbsyncer.parser.model.Mapping;
+
+/**
+ * 同步任务执行器
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/26 16:32
+ */
+public interface Task {
+
+    /**
+     * 启动同步任务
+     *
+     * @param mapping
+     */
+    void start(Mapping mapping);
+
+    /**
+     * 关闭同步任务
+     *
+     * @param mapping
+     */
+    void close(Mapping mapping);
+
+    /**
+     * 切换meta状态
+     *
+     * @param metaId
+     * @param metaEnum
+     */
+    void changeMetaState(String metaId, MetaEnum metaEnum);
+
 }
 }

+ 4 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/impl/FullExtractor.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.manager.extractor.impl;
 package org.dbsyncer.manager.extractor.impl;
 
 
-import org.dbsyncer.common.event.RefreshEvent;
+import org.dbsyncer.common.event.FullRefreshEvent;
 import org.dbsyncer.common.model.Task;
 import org.dbsyncer.common.model.Task;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.extractor.AbstractExtractor;
 import org.dbsyncer.manager.extractor.AbstractExtractor;
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * @date 2020/04/26 15:28
  * @date 2020/04/26 15:28
  */
  */
 @Component
 @Component
-public class FullExtractor extends AbstractExtractor implements ApplicationListener<RefreshEvent> {
+public class FullExtractor extends AbstractExtractor implements ApplicationListener<FullRefreshEvent> {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
@@ -73,9 +73,9 @@ public class FullExtractor extends AbstractExtractor implements ApplicationListe
     }
     }
 
 
     @Override
     @Override
-    public void onApplicationEvent(RefreshEvent refreshEvent) {
+    public void onApplicationEvent(FullRefreshEvent event) {
         // 异步监听任务刷新事件
         // 异步监听任务刷新事件
-        flush(refreshEvent.getTask());
+        flush(event.getTask());
     }
     }
 
 
     private void doTask(Task task, Mapping mapping, List<TableGroup> list) {
     private void doTask(Task task, Mapping mapping, List<TableGroup> list) {

+ 17 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/impl/IncrementExtractor.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.manager.extractor.impl;
 package org.dbsyncer.manager.extractor.impl;
 
 
+import org.dbsyncer.common.event.FullRefreshEvent;
+import org.dbsyncer.common.event.IncrementRefreshEvent;
 import org.dbsyncer.common.model.Task;
 import org.dbsyncer.common.model.Task;
 import org.dbsyncer.listener.ListenerFactory;
 import org.dbsyncer.listener.ListenerFactory;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
@@ -7,9 +9,11 @@ import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.extractor.AbstractExtractor;
 import org.dbsyncer.manager.extractor.AbstractExtractor;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
 
 
@@ -24,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * @date 2020/04/26 15:28
  * @date 2020/04/26 15:28
  */
  */
 @Component
 @Component
-public class IncrementExtractor extends AbstractExtractor {
+public class IncrementExtractor extends AbstractExtractor implements ApplicationListener<IncrementRefreshEvent> {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
@@ -69,4 +73,16 @@ public class IncrementExtractor extends AbstractExtractor {
         }
         }
     }
     }
 
 
+    @Override
+    public void onApplicationEvent(IncrementRefreshEvent event) {
+        // 异步监听任务刷新事件
+        flush(event.getTask());
+    }
+
+    private void flush(Task task) {
+        Meta meta = manager.getMeta(task.getId());
+        Assert.notNull(meta, "检查meta为空.");
+
+        manager.editMeta(meta);
+    }
 }
 }

+ 0 - 6
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/increment/AbstractListener.java

@@ -1,6 +0,0 @@
-package org.dbsyncer.manager.extractor.increment;
-
-
-public abstract class AbstractListener implements Listener {
-
-}

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/increment/LogListener.java

@@ -10,6 +10,6 @@ import org.springframework.stereotype.Component;
  * @date 2020/05/05 15:28
  * @date 2020/05/05 15:28
  */
  */
 @Component
 @Component
-public class LogListener extends AbstractListener {
+public class LogListener implements Listener{
 
 
 }
 }

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/increment/TimingListener.java

@@ -10,6 +10,6 @@ import org.springframework.stereotype.Component;
  * @date 2020/05/05 15:28
  * @date 2020/05/05 15:28
  */
  */
 @Component
 @Component
-public class TimingListener extends AbstractListener {
+public class TimingListener implements Listener{
 
 
 }
 }

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.parser;
 package org.dbsyncer.parser;
 
 
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.cache.CacheService;
-import org.dbsyncer.common.event.RefreshEvent;
+import org.dbsyncer.common.event.FullRefreshEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Task;
 import org.dbsyncer.common.model.Task;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
@@ -245,7 +245,7 @@ public class ParserFactory implements Parser {
 
 
         // 发布刷新事件给FullExtractor
         // 发布刷新事件给FullExtractor
         task.setEndTime(System.currentTimeMillis());
         task.setEndTime(System.currentTimeMillis());
-        applicationContext.publishEvent(new RefreshEvent(applicationContext, task));
+        applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
     }
     }
 
 
     /**
     /**