AE86 5 years ago
parent
commit
4ac13e97fb

+ 35 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/task/Task.java

@@ -0,0 +1,35 @@
+package org.dbsyncer.common.task;
+
+public class Task {
+
+    private TaskCallBack taskCallBack;
+
+    // 0: 停止;1: 运行
+    private int state;
+    public static final int STOP = 0;
+    public static final int RUNNING = 1;
+
+    public TaskCallBack getTaskCallBack() {
+        return taskCallBack;
+    }
+
+    public void setTaskCallBack(TaskCallBack taskCallBack) {
+        this.taskCallBack = taskCallBack;
+    }
+
+    public int getState() {
+        return state;
+    }
+
+    public void setState(int state) {
+        this.state = state;
+    }
+
+    public boolean isRunning() {
+        return state == RUNNING;
+    }
+
+    public void close() {
+        taskCallBack.cancel();
+    }
+}

+ 10 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/task/TaskCallBack.java

@@ -0,0 +1,10 @@
+package org.dbsyncer.common.task;
+
+public interface TaskCallBack {
+
+    /**
+     * 关闭任务
+     */
+    void cancel();
+
+}

+ 12 - 14
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -28,7 +28,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
 import org.springframework.context.ApplicationListener;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
@@ -63,6 +62,13 @@ public class ManagerFactory implements Manager, ApplicationContextAware, Applica
     @Autowired
     private OperationTemplate operationTemplate;
 
+    private Map<String, Extractor> map;
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        map = applicationContext.getBeansOfType(Extractor.class);
+    }
+
     @Override
     public boolean alive(ConnectorConfig config) {
         return parser.alive(config);
@@ -225,19 +231,8 @@ public class ManagerFactory implements Manager, ApplicationContextAware, Applica
         return pluginFactory.getPluginAll();
     }
 
-    private Map<String, Extractor> map;
-
-    @Override
-    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
-        map = applicationContext.getBeansOfType(Extractor.class);
-    }
-
     @Override
     public void start(Mapping mapping) {
-        // 获取数据源连接器
-        Connector connector = getConnector(mapping.getSourceConnectorId());
-        Assert.notNull(connector, "数据源配置不能为空.");
-
         Extractor extractor = getExtractor(mapping);
 
         // 标记运行中
@@ -277,8 +272,11 @@ public class ManagerFactory implements Manager, ApplicationContextAware, Applica
 
     private void changeMetaState(String metaId, MetaEnum metaEnum){
         Meta meta = getMeta(metaId);
-        meta.setState(metaEnum.getCode());
-        editMeta(meta);
+        int code = metaEnum.getCode();
+        if(meta.getState() != code){
+            meta.setState(code);
+            editMeta(meta);
+        }
     }
 
 }

+ 0 - 34
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/AbstractExtractor.java

@@ -1,39 +1,5 @@
 package org.dbsyncer.manager.extractor;
 
-import org.dbsyncer.common.event.ClosedEvent;
-import org.dbsyncer.parser.model.Mapping;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
-
 public abstract class AbstractExtractor implements Extractor {
 
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    @Autowired
-    private ApplicationContext applicationContext;
-
-    protected abstract void doTask(Mapping mapping);
-
-    @Override
-    public void asyncStart(Mapping mapping) {
-        String metaId = mapping.getMetaId();
-        logger.info("启动任务:{}", metaId);
-
-        this.doTask(mapping);
-
-        close(metaId);
-    }
-
-    @Override
-    public void asyncClose(String metaId) {
-        close(metaId);
-    }
-
-    private void close(String metaId) {
-        logger.info("结束任务:{}", metaId);
-        applicationContext.publishEvent(new ClosedEvent(applicationContext, metaId));
-    }
-
 }

+ 65 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/FullExtractor.java

@@ -1,9 +1,22 @@
 package org.dbsyncer.manager.extractor;
 
+import org.dbsyncer.common.event.ClosedEvent;
+import org.dbsyncer.common.task.Task;
+import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.Parser;
+import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 /**
  * 全量同步
@@ -15,14 +28,63 @@ import org.springframework.stereotype.Component;
 @Component
 public class FullExtractor extends AbstractExtractor {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
     @Autowired
     private Parser parser;
 
+    @Autowired
+    private Manager manager;
+
+    @Autowired
+    private ApplicationContext applicationContext;
+
+    protected Map<String, Task> map = new ConcurrentHashMap<>();
+
     @Override
-    protected void doTask(Mapping mapping) {
-        // 获取数据源连接配置
+    public void asyncStart(Mapping mapping) {
+        String metaId = mapping.getMetaId();
+        logger.info("启动任务:{}", metaId);
+        // TODO 获取数据源连接器
+        Connector connector = manager.getConnector(mapping.getSourceConnectorId());
+        Assert.notNull(connector, "数据源配置不能为空.");
+
+        Task task = new Task();
+        task.setState(Task.RUNNING);
+        task.setTaskCallBack(() -> publishClosedEvent(metaId));
+        map.putIfAbsent(metaId, task);
+
+        run(task);
+    }
+
+    @Override
+    public void asyncClose(String metaId) {
+        Task task = map.get(metaId);
+        if (null != task) {
+            task.setState(Task.STOP);
+            logger.info("关闭中...");
+        }
+    }
+
+    protected void run(Task task) {
+        for(;;){
+            if(task.isRunning()){
+                try {
+                    logger.info("模拟同步休眠5s");
+                    TimeUnit.SECONDS.sleep(5);
+                } catch (InterruptedException e) {
+                    logger.error(e.getMessage());
+                }
+                continue;
+            }
+            task.close();
+            break;
+        }
+    }
 
-        // 获取执行命令
+    protected void publishClosedEvent(String metaId) {
+        applicationContext.publishEvent(new ClosedEvent(applicationContext, metaId));
+        logger.info("结束任务:{}", metaId);
     }
 
 }

+ 6 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/IncrementExtractor.java

@@ -19,8 +19,12 @@ public class IncrementExtractor extends AbstractExtractor {
     private Listener listener;
 
     @Override
-    protected void doTask(Mapping mapping) {
-        // 获取数据源连接配置
+    public void asyncStart(Mapping mapping) {
+
     }
 
+    @Override
+    public void asyncClose(String metaId) {
+
+    }
 }

+ 21 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/impl/PreloadTemplate.java

@@ -1,14 +1,19 @@
 package org.dbsyncer.manager.template.impl;
 
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.config.PreloadCallBack;
 import org.dbsyncer.manager.config.PreloadConfig;
+import org.dbsyncer.manager.config.QueryConfig;
 import org.dbsyncer.manager.enums.GroupStrategyEnum;
 import org.dbsyncer.manager.enums.HandlerEnum;
 import org.dbsyncer.manager.template.AbstractTemplate;
 import org.dbsyncer.manager.template.Handler;
 import org.dbsyncer.parser.Parser;
+import org.dbsyncer.parser.enums.MetaEnum;
 import org.dbsyncer.parser.model.ConfigModel;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.query.Query;
@@ -37,6 +42,9 @@ public final class PreloadTemplate extends AbstractTemplate implements Applicati
     @Autowired
     private Parser parser;
 
+    @Autowired
+    private Manager manager;
+
     @Autowired
     private StorageService storageService;
 
@@ -74,6 +82,19 @@ public final class PreloadTemplate extends AbstractTemplate implements Applicati
         // Load metas
         execute(new PreloadConfig(ConfigConstant.META, HandlerEnum.PRELOAD_META.getHandler()));
 
+        // 启动驱动
+        Meta meta = new Meta();
+        meta.setType(ConfigConstant.META);
+        QueryConfig<Meta> queryConfig = new QueryConfig<>(meta);
+        List<Meta> metas = operationTemplate.queryAll(queryConfig);
+        if (!CollectionUtils.isEmpty(metas)) {
+            metas.forEach(m -> {
+                if (MetaEnum.isRunning(m.getState())) {
+                    Mapping mapping = manager.getMapping(m.getMappingId());
+                    manager.start(mapping);
+                }
+            });
+        }
     }
 
 }