소스 검색

add Extractor

AE86 5 년 전
부모
커밋
89b1514e02

+ 31 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/ClosedEvent.java

@@ -0,0 +1,31 @@
+package org.dbsyncer.common.event;
+
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.event.ApplicationContextEvent;
+
+/**
+ * 任务关闭事件
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-04-26 22:45
+ */
+public class ClosedEvent extends ApplicationContextEvent {
+
+    private String id;
+
+    /**
+     * Create a new ContextStartedEvent.
+     *
+     * @param source the {@code ApplicationContext} that the event is raised for
+     *               (must not be {@code null})
+     */
+    public ClosedEvent(ApplicationContext source, String id) {
+        super(source);
+        this.id = id;
+    }
+
+    public String getId() {
+        return id;
+    }
+}

+ 5 - 8
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Executor.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.manager;
 
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.listener.config.ListenerConfig;
+import org.dbsyncer.parser.model.Mapping;
 
 /**
  * 同步任务执行器
@@ -15,17 +14,15 @@ public interface Executor {
     /**
      * 启动同步任务
      *
-     * @param metaId
-     * @param listenerConfig
-     * @param connectorConfig
+     * @param mapping
      */
-    boolean start(String metaId, ListenerConfig listenerConfig, ConnectorConfig connectorConfig);
+    void start(Mapping mapping);
 
     /**
      * 关闭同步任务
      *
-     * @param metaId
+     * @param mapping
      */
-    boolean shutdown(String metaId);
+    void close(Mapping mapping);
 
 }

+ 89 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ExecutorImpl.java

@@ -0,0 +1,89 @@
+package org.dbsyncer.manager;
+
+import org.dbsyncer.common.event.ClosedEvent;
+import org.dbsyncer.manager.extractor.Extractor;
+import org.dbsyncer.parser.enums.MetaEnum;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import java.util.Map;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-04-26 23:40
+ */
+@Component
+public class ExecutorImpl implements Executor, ApplicationContextAware, ApplicationListener<ClosedEvent> {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private Manager manager;
+
+    private Map<String, Extractor> map;
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        map = applicationContext.getBeansOfType(Extractor.class);
+    }
+
+    @Override
+    public void start(Mapping mapping) {
+        // 获取数据源连接器
+        Connector connector = manager.getConnector(mapping.getSourceConnectorId());
+        Assert.notNull(connector, "数据源配置不能为空.");
+
+        Extractor extractor = getExtractor(mapping);
+
+        // 标记运行中
+        changeMetaState(mapping.getMetaId(), MetaEnum.RUNNING);
+
+        extractor.start(mapping);
+    }
+
+    @Override
+    public void close(Mapping mapping) {
+        Extractor extractor = getExtractor(mapping);
+
+        String metaId = mapping.getMetaId();
+        changeMetaState(metaId, MetaEnum.STOPPING);
+
+        extractor.close(metaId);
+    }
+
+    @Override
+    public void onApplicationEvent(ClosedEvent event) {
+        // 异步监听任务关闭事件
+        changeMetaState(event.getId(), MetaEnum.READY);
+    }
+
+    private Extractor getExtractor(Mapping mapping) {
+        Assert.notNull(mapping, "驱动不能为空");
+        String model = mapping.getModel();
+        String metaId = mapping.getMetaId();
+        Assert.hasText(model, "同步方式不能为空");
+        Assert.hasText(metaId, "任务ID不能为空");
+
+        Extractor extractor = map.get(model.concat("Extractor"));
+        Assert.notNull(extractor, String.format("未知的同步方式: %s", model));
+        return extractor;
+    }
+
+    private void changeMetaState(String metaId, MetaEnum metaEnum){
+        Meta meta = manager.getMeta(metaId);
+        meta.setState(metaEnum.getCode());
+        manager.editMeta(meta);
+    }
+
+}

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -93,13 +93,13 @@ public interface Manager {
      *
      * @param mapping
      */
-    boolean start(Mapping mapping);
+    void start(Mapping mapping);
 
     /**
      * 关闭同步任务
      *
      * @param mapping
      */
-    boolean close(Mapping mapping);
+    void close(Mapping mapping);
 
 }

+ 8 - 59
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -7,7 +7,6 @@ import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.listener.Listener;
 import org.dbsyncer.manager.config.OperationConfig;
-import org.dbsyncer.manager.config.PreloadConfig;
 import org.dbsyncer.manager.config.QueryConfig;
 import org.dbsyncer.manager.enums.GroupStrategyEnum;
 import org.dbsyncer.manager.enums.HandlerEnum;
@@ -15,21 +14,14 @@ import org.dbsyncer.manager.template.impl.OperationTemplate;
 import org.dbsyncer.manager.template.impl.PreloadTemplate;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.enums.ConvertEnum;
-import org.dbsyncer.parser.enums.MetaEnum;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.plugin.PluginFactory;
 import org.dbsyncer.plugin.config.Plugin;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-import org.springframework.context.ApplicationListener;
-import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.stereotype.Component;
-import org.springframework.util.Assert;
 
 import java.util.List;
 import java.util.Map;
@@ -40,7 +32,7 @@ import java.util.Map;
  * @date 2019/9/16 23:59
  */
 @Component
-public class ManagerFactory implements Manager, ApplicationContextAware {
+public class ManagerFactory implements Manager {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -53,19 +45,15 @@ public class ManagerFactory implements Manager, ApplicationContextAware {
     @Autowired
     private Listener listener;
 
+    @Autowired
+    private Executor executor;
+
     @Autowired
     private PreloadTemplate preloadTemplate;
 
     @Autowired
     private OperationTemplate operationTemplate;
 
-    private Map<String, Executor> map;
-
-    @Override
-    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
-        map = applicationContext.getBeansOfType(Executor.class);
-    }
-
     @Override
     public boolean alive(ConnectorConfig config) {
         return parser.alive(config);
@@ -229,54 +217,15 @@ public class ManagerFactory implements Manager, ApplicationContextAware {
     }
 
     @Override
-    public boolean start(Mapping mapping) {
-        // 获取数据源连接器
-        Connector connector = getConnector(mapping.getSourceConnectorId());
-        Assert.notNull(connector, "数据源配置不能为空");
-
-        // 标记运行中
-        String metaId = mapping.getMetaId();
-        changeMetaState(metaId, MetaEnum.RUNNING);
-
+    public void start(Mapping mapping) {
         // 启动任务
-        Executor executor = getExecutor(mapping);
-        boolean start = executor.start(metaId, mapping.getListener(), connector.getConfig());
-
-        // rollback
-        if(!start){
-            logger.warn("启动失败:{}", metaId);
-            changeMetaState(metaId, MetaEnum.READY);
-        }
-        return start;
+        executor.start(mapping);
     }
 
     @Override
-    public boolean close(Mapping mapping) {
-        String metaId = mapping.getMetaId();
-        changeMetaState(metaId, MetaEnum.STOPPING);
-
+    public void close(Mapping mapping) {
         // 关闭任务
-        Executor executor = getExecutor(mapping);
-        boolean shutdown = executor.shutdown(metaId);
-        return shutdown;
-    }
-
-    private Executor getExecutor(Mapping mapping) {
-        Assert.notNull(mapping, "驱动不能为空");
-        String model = mapping.getModel();
-        String metaId = mapping.getMetaId();
-        Assert.hasText(model, "同步方式不能为空");
-        Assert.hasText(metaId, "任务ID不能为空");
-
-        Executor executor = map.get(model.concat("Executor"));
-        Assert.notNull(executor, String.format("未知的同步方式: %s", model));
-        return executor;
-    }
-
-    private void changeMetaState(String metaId, MetaEnum metaEnum){
-        Meta meta = getMeta(metaId);
-        meta.setState(metaEnum.getCode());
-        editMeta(meta);
+        executor.close(mapping);
     }
 
 }

+ 0 - 28
dbsyncer-manager/src/main/java/org/dbsyncer/manager/executor/FullExecutor.java

@@ -1,28 +0,0 @@
-package org.dbsyncer.manager.executor;
-
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.listener.config.ListenerConfig;
-import org.dbsyncer.manager.Executor;
-import org.springframework.stereotype.Component;
-
-/**
- * 全量同步
- *
- * @author AE86
- * @version 1.0.0
- * @date 2020/04/26 15:28
- */
-@Component
-public class FullExecutor implements Executor {
-
-    @Override
-    public boolean start(String metaId, ListenerConfig listenerConfig, ConnectorConfig connectorConfig) {
-        return true;
-    }
-
-    @Override
-    public boolean shutdown(String metaId) {
-        return true;
-    }
-
-}

+ 0 - 28
dbsyncer-manager/src/main/java/org/dbsyncer/manager/executor/IncrementExecutor.java

@@ -1,28 +0,0 @@
-package org.dbsyncer.manager.executor;
-
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.listener.config.ListenerConfig;
-import org.dbsyncer.manager.Executor;
-import org.springframework.stereotype.Component;
-
-/**
- * 增量同步
- *
- * @author AE86
- * @version 1.0.0
- * @date 2020/04/26 15:28
- */
-@Component
-public class IncrementExecutor implements Executor {
-
-    @Override
-    public boolean start(String metaId, ListenerConfig listenerConfig, ConnectorConfig connectorConfig) {
-        return true;
-    }
-
-    @Override
-    public boolean shutdown(String metaId) {
-        return true;
-    }
-
-}

+ 11 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/Extractor.java

@@ -0,0 +1,11 @@
+package org.dbsyncer.manager.extractor;
+
+import org.dbsyncer.parser.model.Mapping;
+
+public interface Extractor {
+
+    void start(Mapping mapping);
+
+    void close(String metaId);
+
+}

+ 54 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/FullExtractor.java

@@ -0,0 +1,54 @@
+package org.dbsyncer.manager.extractor;
+
+import org.dbsyncer.common.event.ClosedEvent;
+import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.config.ListenerConfig;
+import org.dbsyncer.parser.Parser;
+import org.dbsyncer.parser.model.Mapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 全量同步
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/26 15:28
+ */
+@Component
+public class FullExtractor implements Extractor {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private Parser parser;
+
+    @Autowired
+    private ApplicationContext applicationContext;
+
+    @Override
+    public void start(Mapping mapping) {
+        new Thread(()->{
+            String metaId = mapping.getMetaId();
+            logger.info("模拟同步...等待5s");
+            try {
+                TimeUnit.SECONDS.sleep(5);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            logger.info("同步结束");
+            applicationContext.publishEvent(new ClosedEvent(applicationContext, metaId));
+        }).start();
+    }
+
+    @Override
+    public void close(String metaId) {
+
+    }
+
+}

+ 39 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/IncrementExtractor.java

@@ -0,0 +1,39 @@
+package org.dbsyncer.manager.extractor;
+
+import org.dbsyncer.common.event.ClosedEvent;
+import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.Listener;
+import org.dbsyncer.listener.config.ListenerConfig;
+import org.dbsyncer.parser.model.Mapping;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+
+/**
+ * 增量同步
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/26 15:28
+ */
+@Component
+public class IncrementExtractor implements Extractor {
+
+    @Autowired
+    private Listener listener;
+
+    @Autowired
+    private ApplicationContext applicationContext;
+
+    @Override
+    public void start(Mapping mapping) {
+        final String metaId = mapping.getMetaId();
+        applicationContext.publishEvent(new ClosedEvent(applicationContext, metaId));
+    }
+
+    @Override
+    public void close(String metaId) {
+
+    }
+
+}

+ 1 - 1
dbsyncer-web/src/main/resources/templates/index/index.html

@@ -145,7 +145,7 @@
                 <thead>
                 <tr>
                     <th>序号</th>
-                    <th>驱动名称</th>
+                    <th>任务名称</th>
                     <th>同步方式</th>
                     <th>结果</th>
                     <th>状态</th>