AE86 пре 5 година
родитељ
комит
2c454b49ee

+ 1 - 7
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MetaChecker.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.biz.checker.impl.mapping;
 package org.dbsyncer.biz.checker.impl.mapping;
 
 
-import org.dbsyncer.biz.BizException;
 import org.dbsyncer.biz.checker.AbstractChecker;
 import org.dbsyncer.biz.checker.AbstractChecker;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.enums.MetaEnum;
 import org.dbsyncer.parser.enums.MetaEnum;
@@ -38,17 +37,12 @@ public class MetaChecker extends AbstractChecker {
         Mapping mapping = manager.getMapping(mappingId);
         Mapping mapping = manager.getMapping(mappingId);
         Assert.notNull(mapping, "驱动不存在.");
         Assert.notNull(mapping, "驱动不存在.");
 
 
-        Meta meta = manager.getMeta(mapping.getMetaId());
-        if (null != meta) {
-            throw new BizException("驱动正在运行中.");
-        }
-
         // TODO 获取驱动数据源总条数
         // TODO 获取驱动数据源总条数
         AtomicInteger total = new AtomicInteger(1000);
         AtomicInteger total = new AtomicInteger(1000);
         AtomicInteger success = new AtomicInteger(500);
         AtomicInteger success = new AtomicInteger(500);
         AtomicInteger fail = new AtomicInteger(0);
         AtomicInteger fail = new AtomicInteger(0);
         Map<String, String> map = new ConcurrentHashMap<>();
         Map<String, String> map = new ConcurrentHashMap<>();
-        meta = new Meta(mappingId, MetaEnum.RUNNING.getCode(), total, success, fail, map);
+        Meta meta = new Meta(mappingId, MetaEnum.RUNNING.getCode(), total, success, fail, map);
         meta.setType(ConfigConstant.META);
         meta.setType(ConfigConstant.META);
         meta.setName(ConfigConstant.META);
         meta.setName(ConfigConstant.META);
 
 

+ 33 - 22
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MappingServiceImpl.java

@@ -61,8 +61,11 @@ public class MappingServiceImpl implements MappingService {
     @Override
     @Override
     public String edit(Map<String, String> params) {
     public String edit(Map<String, String> params) {
         String mappingId = params.get(ConfigConstant.CONFIG_MODEL_ID);
         String mappingId = params.get(ConfigConstant.CONFIG_MODEL_ID);
+        Mapping mapping = manager.getMapping(mappingId);
+        Assert.notNull(mapping, "驱动不存在.");
+
         synchronized (LOCK){
         synchronized (LOCK){
-            checkRunning(mappingId);
+            isRunning(mapping.getMetaId());
 
 
             ConfigModel model = mappingChecker.checkEditConfigModel(params);
             ConfigModel model = mappingChecker.checkEditConfigModel(params);
             return manager.editMapping(model);
             return manager.editMapping(model);
@@ -71,14 +74,14 @@ public class MappingServiceImpl implements MappingService {
 
 
     @Override
     @Override
     public String remove(String id) {
     public String remove(String id) {
-        Assert.hasText(id, "驱动ID不能为空");
+        Mapping mapping = manager.getMapping(id);
+        Assert.notNull(mapping, "驱动不存在.");
+        String metaId = mapping.getMetaId();
 
 
         synchronized (LOCK){
         synchronized (LOCK){
-            checkRunning(id);
+            isRunning(metaId);
 
 
             // 删除meta
             // 删除meta
-            Mapping mapping = manager.getMapping(id);
-            String metaId = mapping.getMetaId();
             if(!StringUtils.isEmpty(metaId)){
             if(!StringUtils.isEmpty(metaId)){
                 manager.removeMeta(metaId);
                 manager.removeMeta(metaId);
             }
             }
@@ -114,12 +117,24 @@ public class MappingServiceImpl implements MappingService {
         Assert.hasText(id, "驱动ID不能为空");
         Assert.hasText(id, "驱动ID不能为空");
         Map<String, String> params = new HashMap<>();
         Map<String, String> params = new HashMap<>();
         params.put(ConfigConstant.CONFIG_MODEL_ID, id);
         params.put(ConfigConstant.CONFIG_MODEL_ID, id);
+        Mapping mapping = manager.getMapping(id);
+        Assert.notNull(mapping, "驱动不存在.");
 
 
         synchronized (LOCK){
         synchronized (LOCK){
-            checkRunning(id);
+            isRunning(mapping.getMetaId());
 
 
+            // 创建增量meta文件
             ConfigModel model = metaChecker.checkAddConfigModel(params);
             ConfigModel model = metaChecker.checkAddConfigModel(params);
-            manager.addMeta(model);
+            String metaId = manager.addMeta(model);
+
+            // 获取连接器
+            Connector connector = manager.getConnector(mapping.getSourceConnectorId());
+
+            boolean success = manager.launch(metaId, mapping.getListener(), connector.getConfig());
+            if(!success){
+                // rollback
+                manager.removeMeta(metaId);
+            }
         }
         }
         return "驱动启动成功";
         return "驱动启动成功";
     }
     }
@@ -129,13 +144,16 @@ public class MappingServiceImpl implements MappingService {
         Assert.hasText(id, "驱动ID不能为空");
         Assert.hasText(id, "驱动ID不能为空");
         Mapping mapping = manager.getMapping(id);
         Mapping mapping = manager.getMapping(id);
         Assert.notNull(mapping, "驱动不存在.");
         Assert.notNull(mapping, "驱动不存在.");
+        String metaId = mapping.getMetaId();
 
 
         synchronized (LOCK){
         synchronized (LOCK){
-            String metaId = mapping.getMetaId();
-            if (null == manager.getMeta(metaId)) {
+            if (!manager.isRunning(metaId)) {
                 throw new BizException("驱动已停止.");
                 throw new BizException("驱动已停止.");
             }
             }
-            manager.removeMeta(metaId);
+            boolean success = manager.close(metaId);
+            if(success){
+                manager.removeMeta(metaId);
+            }
         }
         }
         return "驱动停止成功";
         return "驱动停止成功";
     }
     }
@@ -159,7 +177,7 @@ public class MappingServiceImpl implements MappingService {
         ConnectorVo tConn = new ConnectorVo(monitor.alive(t.getId()));
         ConnectorVo tConn = new ConnectorVo(monitor.alive(t.getId()));
         BeanUtils.copyProperties(t, tConn);
         BeanUtils.copyProperties(t, tConn);
 
 
-        boolean isRunning = null != manager.getMeta(mapping.getMetaId());
+        boolean isRunning = manager.isRunning(mapping.getMetaId());
         MappingVo vo = new MappingVo(isRunning, sConn, tConn);
         MappingVo vo = new MappingVo(isRunning, sConn, tConn);
         BeanUtils.copyProperties(mapping, vo);
         BeanUtils.copyProperties(mapping, vo);
         return vo;
         return vo;
@@ -175,16 +193,9 @@ public class MappingServiceImpl implements MappingService {
         return metaVo;
         return metaVo;
     }
     }
 
 
-    /**
-     * 检查是否运行中
-     *
-     * @param mappingId
-     */
-    private void checkRunning(String mappingId) {
-        Mapping mapping = manager.getMapping(mappingId);
-        Assert.notNull(mapping, "驱动不存在.");
-
-        Meta meta = manager.getMeta(mapping.getMetaId());
-        Assert.isNull(meta, "驱动正在运行, 请先停止.");
+    private boolean isRunning(String metaId) {
+        boolean running = manager.isRunning(metaId);
+        Assert.isTrue(!running, "驱动正在运行中, 请先停止.");
+        return false;
     }
     }
 }
 }

+ 4 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Listener.java

@@ -0,0 +1,4 @@
+package org.dbsyncer.listener;
+
+public interface Listener {
+}

+ 4 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java

@@ -1,7 +1,10 @@
 package org.dbsyncer.listener;
 package org.dbsyncer.listener;
 
 
 
 
-public class ListenerFactory {
+import org.springframework.stereotype.Component;
+
+@Component
+public class ListenerFactory implements Listener {
 
 
 
 
 }
 }

+ 39 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Executor.java

@@ -0,0 +1,39 @@
+package org.dbsyncer.manager;
+
+import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.config.ListenerConfig;
+
+/**
+ * 监听任务执行器
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/9/16 23:59
+ */
+public interface Executor {
+
+    /**
+     * 启动监听任务
+     *
+     * @param metaId
+     * @param listenerConfig
+     * @param connectorConfig
+     */
+    boolean launch(String metaId, ListenerConfig listenerConfig, ConnectorConfig connectorConfig);
+
+    /**
+     * 关闭监听任务
+     *
+     * @param metaId
+     */
+    boolean close(String metaId);
+
+    /**
+     * 是否运行中
+     *
+     * @param metaId
+     * @return
+     */
+    boolean isRunning(String metaId);
+
+}

+ 3 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -13,11 +13,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
 /**
 /**
+ * 驱动配置
+ *
  * @author AE86
  * @author AE86
  * @version 1.0.0
  * @version 1.0.0
  * @date 2019/9/30 20:31
  * @date 2019/9/30 20:31
  */
  */
-public interface Manager {
+public interface Manager extends Executor {
 
 
     boolean alive(ConnectorConfig config);
     boolean alive(ConnectorConfig config);
 
 

+ 15 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -5,6 +5,7 @@ import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
+import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.manager.config.*;
 import org.dbsyncer.manager.config.*;
 import org.dbsyncer.manager.enums.GroupStrategyEnum;
 import org.dbsyncer.manager.enums.GroupStrategyEnum;
 import org.dbsyncer.manager.enums.HandlerEnum;
 import org.dbsyncer.manager.enums.HandlerEnum;
@@ -218,4 +219,18 @@ public class ManagerFactory implements Manager, ApplicationListener<ContextRefre
 
 
     }
     }
 
 
+    @Override
+    public boolean launch(String metaId, ListenerConfig listenerConfig, ConnectorConfig connectorConfig) {
+        return false;
+    }
+
+    @Override
+    public boolean close(String metaId) {
+        return false;
+    }
+
+    @Override
+    public boolean isRunning(String metaId) {
+        return false;
+    }
 }
 }