AE86 5 anni fa
parent
commit
ea9db0f4f5

+ 37 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -10,9 +10,11 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerEnum;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.parser.enums.MetaEnum;
 import org.dbsyncer.parser.enums.ModelEnum;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.slf4j.Logger;
@@ -24,8 +26,11 @@ import org.springframework.context.ApplicationContextAware;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * @author AE86
@@ -70,6 +75,10 @@ public class MappingChecker extends AbstractChecker implements ApplicationContex
 
         // 修改基本配置
         this.modifyConfigModel(mapping, params);
+
+        // 创建meta
+        String metaId = addMeta(mapping.getId());
+        mapping.setMetaId(metaId);
         return mapping;
     }
 
@@ -112,6 +121,9 @@ public class MappingChecker extends AbstractChecker implements ApplicationContex
         // 更新映射关系过滤条件
         setFilterCommand(mapping);
 
+        // 更新meta
+        updateMeta(mapping);
+
         // 增量配置
         return mapping;
     }
@@ -132,4 +144,29 @@ public class MappingChecker extends AbstractChecker implements ApplicationContex
         }
     }
 
+    private String addMeta(String mappingId) {
+        AtomicInteger total = new AtomicInteger(0);
+        AtomicInteger success = new AtomicInteger(0);
+        AtomicInteger fail = new AtomicInteger(0);
+        Map<String, String> map = new ConcurrentHashMap<>();
+        Meta meta = new Meta(mappingId, MetaEnum.RUNNING.getCode(), total, success, fail, map);
+        meta.setType(ConfigConstant.META);
+        meta.setName(ConfigConstant.META);
+
+        // 修改基本配置
+        this.modifyConfigModel(meta, new HashMap<>());
+
+        return manager.addMeta(meta);
+    }
+
+    private void updateMeta(Mapping mapping) {
+        if(StringUtils.equals(ModelEnum.FULL.getCode(), mapping.getModel())){
+            String metaId = mapping.getMetaId();
+            Meta meta = manager.getMeta(metaId);
+            Assert.notNull(meta, "驱动meta不存在.");
+            // TODO 获取驱动数据源总条数
+
+        }
+    }
+
 }

+ 45 - 56
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MappingServiceImpl.java

@@ -19,10 +19,8 @@ import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
-import org.springframework.util.StringUtils;
 
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -46,9 +44,6 @@ public class MappingServiceImpl implements MappingService {
     @Autowired
     private Checker mappingChecker;
 
-    @Autowired
-    private Checker metaChecker;
-
     // 驱动启停锁
     private final static Object LOCK = new Object();
 
@@ -60,13 +55,10 @@ public class MappingServiceImpl implements MappingService {
 
     @Override
     public String edit(Map<String, String> params) {
-        String mappingId = params.get(ConfigConstant.CONFIG_MODEL_ID);
-        Mapping mapping = manager.getMapping(mappingId);
-        Assert.notNull(mapping, "驱动不存在.");
-
-        synchronized (LOCK){
-            isRunning(mapping.getMetaId());
-
+        String id = params.get(ConfigConstant.CONFIG_MODEL_ID);
+        Mapping mapping = assertMappingExist(id);
+        synchronized (LOCK) {
+            assertRunning(mapping.getMetaId());
             ConfigModel model = mappingChecker.checkEditConfigModel(params);
             return manager.editMapping(model);
         }
@@ -74,17 +66,12 @@ public class MappingServiceImpl implements MappingService {
 
     @Override
     public String remove(String id) {
-        Mapping mapping = manager.getMapping(id);
-        Assert.notNull(mapping, "驱动不存在.");
-        String metaId = mapping.getMetaId();
-
-        synchronized (LOCK){
-            isRunning(metaId);
+        Mapping mapping = assertMappingExist(id);
+        synchronized (LOCK) {
+            assertRunning(mapping.getMetaId());
 
             // 删除meta
-            if(!StringUtils.isEmpty(metaId)){
-                manager.removeMeta(metaId);
-            }
+            manager.removeMeta(mapping.getMetaId());
 
             // 删除tableGroup
             List<TableGroup> groupList = manager.getTableGroupAll(id);
@@ -114,46 +101,22 @@ public class MappingServiceImpl implements MappingService {
 
     @Override
     public String start(String id) {
-        Assert.hasText(id, "驱动ID不能为空");
-        Map<String, String> params = new HashMap<>();
-        params.put(ConfigConstant.CONFIG_MODEL_ID, id);
-        Mapping mapping = manager.getMapping(id);
-        Assert.notNull(mapping, "驱动不存在.");
-
-        synchronized (LOCK){
-            isRunning(mapping.getMetaId());
-
-            // 创建增量meta文件
-            ConfigModel model = metaChecker.checkAddConfigModel(params);
-            String metaId = manager.addMeta(model);
-
-            // 获取连接器
-            Connector connector = manager.getConnector(mapping.getSourceConnectorId());
-
-            boolean success = manager.launch(metaId, mapping.getListener(), connector.getConfig());
-            if(!success){
-                // rollback
-                manager.removeMeta(metaId);
-            }
+        Mapping mapping = assertMappingExist(id);
+        synchronized (LOCK) {
+            assertRunning(mapping.getMetaId());
+            manager.start(mapping);
         }
         return "驱动启动成功";
     }
 
     @Override
     public String stop(String id) {
-        Assert.hasText(id, "驱动ID不能为空");
-        Mapping mapping = manager.getMapping(id);
-        Assert.notNull(mapping, "驱动不存在.");
-        String metaId = mapping.getMetaId();
-
-        synchronized (LOCK){
-            if (!manager.isRunning(metaId)) {
+        Mapping mapping = assertMappingExist(id);
+        synchronized (LOCK) {
+            if (!isRunning(mapping.getMetaId())) {
                 throw new BizException("驱动已停止.");
             }
-            boolean success = manager.close(metaId);
-            if(success){
-                manager.removeMeta(metaId);
-            }
+            manager.close(mapping);
         }
         return "驱动停止成功";
     }
@@ -176,8 +139,8 @@ public class MappingServiceImpl implements MappingService {
         BeanUtils.copyProperties(s, sConn);
         ConnectorVo tConn = new ConnectorVo(monitor.alive(t.getId()));
         BeanUtils.copyProperties(t, tConn);
+        boolean isRunning = isRunning(mapping.getMetaId());
 
-        boolean isRunning = manager.isRunning(mapping.getMetaId());
         MappingVo vo = new MappingVo(isRunning, sConn, tConn);
         BeanUtils.copyProperties(mapping, vo);
         return vo;
@@ -193,9 +156,35 @@ public class MappingServiceImpl implements MappingService {
         return metaVo;
     }
 
+    /**
+     * 检查是否存在驱动
+     *
+     * @param mappingId
+     * @return
+     */
+    private Mapping assertMappingExist(String mappingId) {
+        Mapping mapping = manager.getMapping(mappingId);
+        Assert.notNull(mapping, "驱动不存在.");
+        return mapping;
+    }
+
+    /**
+     * 检查是否运行中,运行中抛出异常提示
+     *
+     * @param metaId
+     * @return
+     */
+    private void assertRunning(String metaId) {
+        Assert.isTrue(!isRunning(metaId), "驱动正在运行中, 请先停止.");
+    }
+
     private boolean isRunning(String metaId) {
-        boolean running = manager.isRunning(metaId);
-        Assert.isTrue(!running, "驱动正在运行中, 请先停止.");
+        Meta meta = manager.getMeta(metaId);
+        if (null != meta) {
+            int state = meta.getState();
+            return MetaEnum.isRunning(state);
+        }
         return false;
     }
+
 }

+ 7 - 15
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Executor.java

@@ -4,36 +4,28 @@ import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
 
 /**
- * 监听任务执行器
+ * 同步任务执行器
  *
  * @author AE86
  * @version 1.0.0
- * @date 2019/9/16 23:59
+ * @date 2020/04/26 16:32
  */
 public interface Executor {
 
     /**
-     * 启动监听任务
+     * 启动同步任务
      *
      * @param metaId
      * @param listenerConfig
      * @param connectorConfig
      */
-    boolean launch(String metaId, ListenerConfig listenerConfig, ConnectorConfig connectorConfig);
+    boolean start(String metaId, ListenerConfig listenerConfig, ConnectorConfig connectorConfig);
 
     /**
-     * 关闭监听任务
+     * 关闭同步任务
      *
      * @param metaId
      */
-    boolean close(String metaId);
+    boolean shutdown(String metaId);
 
-    /**
-     * 是否运行中
-     *
-     * @param metaId
-     * @return
-     */
-    boolean isRunning(String metaId);
-
-}
+}

+ 15 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -19,7 +19,7 @@ import java.util.Map;
  * @version 1.0.0
  * @date 2019/9/30 20:31
  */
-public interface Manager extends Executor {
+public interface Manager {
 
     boolean alive(ConnectorConfig config);
 
@@ -86,4 +86,18 @@ public interface Manager extends Executor {
     // Plugin
     List<Plugin> getPluginAll();
 
+    /**
+     * 启动同步任务
+     *
+     * @param mapping
+     */
+    boolean start(Mapping mapping);
+
+    /**
+     * 关闭同步任务
+     *
+     * @param mapping
+     */
+    boolean close(Mapping mapping);
+
 }

+ 48 - 14
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -5,8 +5,10 @@ import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
-import org.dbsyncer.listener.config.ListenerConfig;
-import org.dbsyncer.manager.config.*;
+import org.dbsyncer.listener.Listener;
+import org.dbsyncer.manager.config.OperationConfig;
+import org.dbsyncer.manager.config.PreloadConfig;
+import org.dbsyncer.manager.config.QueryConfig;
 import org.dbsyncer.manager.enums.GroupStrategyEnum;
 import org.dbsyncer.manager.enums.HandlerEnum;
 import org.dbsyncer.manager.template.impl.OperationTemplate;
@@ -19,10 +21,14 @@ import org.dbsyncer.plugin.config.Plugin;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 import org.springframework.context.ApplicationListener;
 import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
 
 import java.util.List;
 import java.util.Map;
@@ -33,7 +39,7 @@ import java.util.Map;
  * @date 2019/9/16 23:59
  */
 @Component
-public class ManagerFactory implements Manager, ApplicationListener<ContextRefreshedEvent> {
+public class ManagerFactory implements Manager, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -43,12 +49,22 @@ public class ManagerFactory implements Manager, ApplicationListener<ContextRefre
     @Autowired
     private PluginFactory pluginFactory;
 
+    @Autowired
+    private Listener listener;
+
     @Autowired
     private PreloadTemplate preloadTemplate;
 
     @Autowired
     private OperationTemplate operationTemplate;
 
+    private Map<String, Executor> map;
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        map = applicationContext.getBeansOfType(Executor.class);
+    }
+
     @Override
     public boolean alive(ConnectorConfig config) {
         return parser.alive(config);
@@ -206,6 +222,26 @@ public class ManagerFactory implements Manager, ApplicationListener<ContextRefre
         return pluginFactory.getPluginAll();
     }
 
+    @Override
+    public boolean start(Mapping mapping) {
+        // 获取数据源连接器
+        Connector connector = getConnector(mapping.getSourceConnectorId());
+        Assert.notNull(connector, "数据源配置不能为空");
+
+        // 启动任务
+        Executor executor = getExecutor(mapping);
+        boolean start = executor.start(mapping.getMetaId(), mapping.getListener(), connector.getConfig());
+        return start;
+    }
+
+    @Override
+    public boolean close(Mapping mapping) {
+        // 关闭任务
+        Executor executor = getExecutor(mapping);
+        boolean shutdown = executor.shutdown(mapping.getMetaId());
+        return shutdown;
+    }
+
     @Override
     public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
         // Load connectors
@@ -219,18 +255,16 @@ public class ManagerFactory implements Manager, ApplicationListener<ContextRefre
 
     }
 
-    @Override
-    public boolean launch(String metaId, ListenerConfig listenerConfig, ConnectorConfig connectorConfig) {
-        return false;
-    }
+    private Executor getExecutor(Mapping mapping) {
+        Assert.notNull(mapping, "驱动不能为空");
+        String model = mapping.getModel();
+        String metaId = mapping.getMetaId();
+        Assert.hasText(model, "同步方式不能为空");
+        Assert.hasText(metaId, "任务ID不能为空");
 
-    @Override
-    public boolean close(String metaId) {
-        return false;
+        Executor executor = map.get(model.concat("Executor"));
+        Assert.notNull(executor, String.format("未知的同步方式: %s", model));
+        return executor;
     }
 
-    @Override
-    public boolean isRunning(String metaId) {
-        return false;
-    }
 }

+ 28 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/executor/FullExecutor.java

@@ -0,0 +1,28 @@
+package org.dbsyncer.manager.executor;
+
+import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.config.ListenerConfig;
+import org.dbsyncer.manager.Executor;
+import org.springframework.stereotype.Component;
+
+/**
+ * 全量同步
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/26 15:28
+ */
+@Component
+public class FullExecutor implements Executor {
+
+    @Override
+    public boolean start(String metaId, ListenerConfig listenerConfig, ConnectorConfig connectorConfig) {
+        return true;
+    }
+
+    @Override
+    public boolean shutdown(String metaId) {
+        return true;
+    }
+
+}

+ 28 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/executor/IncrementExecutor.java

@@ -0,0 +1,28 @@
+package org.dbsyncer.manager.executor;
+
+import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.config.ListenerConfig;
+import org.dbsyncer.manager.Executor;
+import org.springframework.stereotype.Component;
+
+/**
+ * 增量同步
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/26 15:28
+ */
+@Component
+public class IncrementExecutor implements Executor {
+
+    @Override
+    public boolean start(String metaId, ListenerConfig listenerConfig, ConnectorConfig connectorConfig) {
+        return true;
+    }
+
+    @Override
+    public boolean shutdown(String metaId) {
+        return true;
+    }
+
+}

+ 5 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/enums/MetaEnum.java

@@ -41,6 +41,10 @@ public enum MetaEnum {
         throw new ParserException(String.format("Meta code \"%s\" does not exist.", code));
     }
 
+    public static boolean isRunning(int state) {
+        return RUNNING.getCode() == state || STOPPING.getCode() == state;
+    }
+
     public int getCode() {
         return code;
     }
@@ -56,4 +60,4 @@ public enum MetaEnum {
     public void setMessage(String message) {
         this.message = message;
     }
-}
+}