AE86 5 ani în urmă
părinte
comite
4fe702b417

+ 8 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/MappingService.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.biz;
 
 import org.dbsyncer.biz.vo.MappingVo;
-import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.biz.vo.MetaVo;
 
 import java.util.List;
 import java.util.Map;
@@ -63,4 +63,11 @@ public interface MappingService {
      */
     boolean stop(String id);
 
+    /**
+     * 获取运行的驱动列表
+     *
+     * @return
+     */
+    List<MetaVo> getMetaAll();
+
 }

+ 3 - 3
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -10,7 +10,7 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerEnum;
 import org.dbsyncer.manager.Manager;
-import org.dbsyncer.parser.constant.ModelConstant;
+import org.dbsyncer.parser.enums.ModelEnum;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
@@ -65,7 +65,7 @@ public class MappingChecker extends AbstractChecker implements ApplicationContex
         mapping.setType(ConfigConstant.MAPPING);
         mapping.setSourceConnectorId(sourceConnectorId);
         mapping.setTargetConnectorId(targetConnectorId);
-        mapping.setModel(ModelConstant.FULL);
+        mapping.setModel(ModelEnum.FULL.getCode());
         mapping.setListener(new ListenerConfig(ListenerEnum.TIMING.getCode()));
 
         // 修改基本配置
@@ -87,7 +87,7 @@ public class MappingChecker extends AbstractChecker implements ApplicationContex
         // 同步方式(仅支持全量或增量同步方式)
         String model = params.get("model");
         if (StringUtils.isNotBlank(model)) {
-            if (StringUtils.equals(ModelConstant.FULL, model) || StringUtils.equals(ModelConstant.INCREMENT, model)) {
+            if (StringUtils.equals(ModelEnum.FULL.getCode(), model) || StringUtils.equals(ModelEnum.INCREMENT.getCode(), model)) {
                 mapping.setModel(model);
             }
         }

+ 70 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MetaChecker.java

@@ -0,0 +1,70 @@
+package org.dbsyncer.biz.checker.impl.mapping;
+
+import org.dbsyncer.biz.BizException;
+import org.dbsyncer.biz.checker.AbstractChecker;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.manager.Manager;
+import org.dbsyncer.parser.enums.MetaEnum;
+import org.dbsyncer.parser.model.ConfigModel;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.storage.constant.ConfigConstant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/1/8 15:17
+ */
+@Component
+public class MetaChecker extends AbstractChecker {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private Manager manager;
+
+    @Override
+    public ConfigModel checkAddConfigModel(Map<String, String> params) {
+        String mappingId = params.get(ConfigConstant.CONFIG_MODEL_ID);
+        Mapping mapping = manager.getMapping(mappingId);
+        Assert.notNull(mapping, "驱动不存在.");
+
+        // 驱动和元信息1对1关系
+        List<Meta> metaAll = manager.getMetaAll(mappingId);
+        if (!CollectionUtils.isEmpty(metaAll)) {
+            Meta meta = metaAll.get(0);
+            if (MetaEnum.READY.getCode() != meta.getState()) {
+                throw new BizException("驱动正在运行中.");
+            }
+        }
+
+        // TODO 获取驱动数据源总条数
+        AtomicInteger total = new AtomicInteger();
+        AtomicInteger success = new AtomicInteger();
+        AtomicInteger fail = new AtomicInteger();
+        Map<String, String> map = new ConcurrentHashMap<>();
+        Meta meta = new Meta(mappingId, MetaEnum.READY.getCode(), total, success, fail, map);
+        meta.setType(ConfigConstant.META);
+        meta.setName(ConfigConstant.META);
+
+        // 修改基本配置
+        this.modifyConfigModel(meta, params);
+        return meta;
+    }
+
+    @Override
+    public ConfigModel checkEditConfigModel(Map<String, String> params) {
+        return null;
+    }
+
+}

+ 42 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MappingServiceImpl.java

@@ -1,14 +1,19 @@
 package org.dbsyncer.biz.impl;
 
+import org.dbsyncer.biz.BizException;
 import org.dbsyncer.biz.MappingService;
 import org.dbsyncer.biz.checker.Checker;
 import org.dbsyncer.biz.vo.ConnectorVo;
 import org.dbsyncer.biz.vo.MappingVo;
+import org.dbsyncer.biz.vo.MetaVo;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.parser.enums.ModelEnum;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.storage.constant.ConfigConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
@@ -18,6 +23,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -37,6 +43,9 @@ public class MappingServiceImpl implements MappingService {
     @Autowired
     private Checker mappingChecker;
 
+    @Autowired
+    private Checker metaChecker;
+
     @Override
     public String add(Map<String, String> params) {
         ConfigModel model = mappingChecker.checkAddConfigModel(params);
@@ -74,16 +83,38 @@ public class MappingServiceImpl implements MappingService {
 
     @Override
     public boolean start(String id) {
-        manager.start(id);
+        Map<String, String> params = new HashMap<>();
+        params.put(ConfigConstant.CONFIG_MODEL_ID, id);
+        synchronized (metaChecker){
+            ConfigModel model = metaChecker.checkAddConfigModel(params);
+            manager.addMeta(model);
+        }
         return true;
     }
 
     @Override
     public boolean stop(String id) {
-        manager.stop(id);
+        synchronized (metaChecker){
+            List<Meta> metaAll = manager.getMetaAll(id);
+            if(!CollectionUtils.isEmpty(metaAll)){
+                metaAll.forEach(m -> manager.removeMeta(m.getId()));
+            }else{
+                throw new BizException("驱动已停止.");
+            }
+        }
         return true;
     }
 
+    @Override
+    public List<MetaVo> getMetaAll() {
+        List<MetaVo> temp = new ArrayList<>();
+        List<Meta> list = manager.getMetaAll();
+        if (!CollectionUtils.isEmpty(list)) {
+            list.forEach(m -> temp.add(convertMeta2Vo(m)));
+        }
+        return temp;
+    }
+
     boolean running = false;
 
     /**
@@ -107,4 +138,13 @@ public class MappingServiceImpl implements MappingService {
         return vo;
     }
 
+    private MetaVo convertMeta2Vo(Meta meta) {
+        Mapping mapping = manager.getMapping(meta.getMappingId());
+        Assert.notNull(mapping, "驱动不存在.");
+        ModelEnum modelEnum = ModelEnum.getModelEnum(mapping.getModel());
+        MetaVo metaVo = new MetaVo(mapping.getName(), modelEnum.getCode());
+        BeanUtils.copyProperties(meta, metaVo);
+        return metaVo;
+    }
+
 }

+ 37 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/vo/MetaVo.java

@@ -0,0 +1,37 @@
+package org.dbsyncer.biz.vo;
+
+import org.dbsyncer.parser.model.Meta;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/01/03 17:20
+ */
+public class MetaVo extends Meta {
+
+    // 驱动名称
+    private String mappingName;
+    // 同步方式
+    private String model;
+
+    public MetaVo(String mappingName, String model) {
+        this.mappingName = mappingName;
+        this.model = model;
+    }
+
+    public String getMappingName() {
+        return mappingName;
+    }
+
+    public void setMappingName(String mappingName) {
+        this.mappingName = mappingName;
+    }
+
+    public String getModel() {
+        return model;
+    }
+
+    public void setModel(String model) {
+        this.model = model;
+    }
+}

+ 10 - 18
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -6,10 +6,7 @@ import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
-import org.dbsyncer.parser.model.ConfigModel;
-import org.dbsyncer.parser.model.Connector;
-import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.*;
 import org.dbsyncer.plugin.config.Plugin;
 
 import java.util.List;
@@ -63,6 +60,15 @@ public interface Manager {
 
     Map<String, String> getCommand(String sourceConnectorId, String targetConnectorId, TableGroup tableGroup);
 
+    // Meta
+    String addMeta(ConfigModel model);
+
+    void removeMeta(String metaId);
+
+    List<Meta> getMetaAll(String mappingId);
+
+    List<Meta> getMetaAll();
+
     // ConnectorEnum
     List<ConnectorEnum> getConnectorEnumAll();
 
@@ -78,18 +84,4 @@ public interface Manager {
     // Plugin
     List<Plugin> getPluginAll();
 
-    /**
-     * 启动驱动
-     *
-     * @param mappingId
-     */
-    void start(String mappingId);
-
-    /**
-     * 停止驱动
-     *
-     * @param mappingId
-     */
-    void stop(String mappingId);
-
 }

+ 73 - 14
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -10,10 +10,7 @@ import org.dbsyncer.manager.template.impl.ConfigOperationTemplate;
 import org.dbsyncer.manager.template.impl.ConfigPreLoadTemplate;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.enums.ConvertEnum;
-import org.dbsyncer.parser.model.ConfigModel;
-import org.dbsyncer.parser.model.Connector;
-import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.*;
 import org.dbsyncer.plugin.PluginFactory;
 import org.dbsyncer.plugin.config.Plugin;
 import org.dbsyncer.storage.constant.ConfigConstant;
@@ -55,6 +52,9 @@ public class ManagerFactory implements Manager, ApplicationListener<ContextRefre
     @Autowired
     private GroupStrategy tableGroupStrategy;
 
+    @Autowired
+    private GroupStrategy metaGroupStrategy;
+
     @Override
     public boolean alive(ConnectorConfig config) {
         return parser.alive(config);
@@ -292,6 +292,75 @@ public class ManagerFactory implements Manager, ApplicationListener<ContextRefre
         return parser.getCommand(sourceConnectorId, targetConnectorId, tableGroup);
     }
 
+    @Override
+    public String addMeta(ConfigModel model) {
+        return operationTemplate.execute(model, new OperationTemplate() {
+
+            @Override
+            public void handleEvent(ConfigOperationTemplate.Call call) {
+                call.add();
+            }
+
+            @Override
+            public GroupStrategy getGroupStrategy() {
+                return metaGroupStrategy;
+            }
+
+        });
+    }
+
+    @Override
+    public void removeMeta(String metaId) {
+        operationTemplate.remove(new RemoveTemplate() {
+
+            @Override
+            public GroupStrategy getGroupStrategy() {
+                return metaGroupStrategy;
+            }
+
+            @Override
+            public String getId() {
+                return metaId;
+            }
+        });
+    }
+
+    @Override
+    public List<Meta> getMetaAll(String mappingId) {
+        return operationTemplate.queryAll(new QueryTemplate<Meta>() {
+            @Override
+            public ConfigModel getConfigModel() {
+                Meta model = new Meta();
+                model.setType(ConfigConstant.META);
+                model.setMappingId(mappingId);
+                return model;
+            }
+
+            @Override
+            public GroupStrategy getGroupStrategy() {
+                return metaGroupStrategy;
+            }
+        });
+    }
+
+    @Override
+    public List<Meta> getMetaAll() {
+        return operationTemplate.queryAll(new QueryTemplate<Meta>() {
+
+            @Override
+            public ConfigModel getConfigModel() {
+                Meta model = new Meta();
+                model.setType(ConfigConstant.META);
+                return model;
+            }
+
+            @Override
+            public GroupStrategy getGroupStrategy() {
+                return defaultGroupStrategy;
+            }
+        });
+    }
+
     @Override
     public List<ConnectorEnum> getConnectorEnumAll() {
         return parser.getConnectorEnumAll();
@@ -317,16 +386,6 @@ public class ManagerFactory implements Manager, ApplicationListener<ContextRefre
         return pluginFactory.getPluginAll();
     }
 
-    @Override
-    public void start(String mappingId) {
-
-    }
-
-    @Override
-    public void stop(String mappingId) {
-
-    }
-
     @Override
     public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
         // Load connectors

+ 3 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/impl/ConfigOperationTemplate.java

@@ -1,8 +1,8 @@
 package org.dbsyncer.manager.template.impl;
 
-import org.dbsyncer.cache.CacheException;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.manager.ManagerException;
 import org.dbsyncer.manager.template.*;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.util.ConfigModelUtil;
@@ -131,9 +131,9 @@ public class ConfigOperationTemplate {
             BeanUtils.copyProperties(o, t);
             return t;
         } catch (InstantiationException e) {
-            throw new CacheException(e.getMessage());
+            throw new ManagerException(e.getMessage());
         } catch (IllegalAccessException e) {
-            throw new CacheException(e.getMessage());
+            throw new ManagerException(e.getMessage());
         }
     }
 

+ 29 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/impl/MetaGroupStrategy.java

@@ -0,0 +1,29 @@
+package org.dbsyncer.manager.template.impl;
+
+import org.dbsyncer.manager.ManagerException;
+import org.dbsyncer.manager.template.GroupStrategy;
+import org.dbsyncer.parser.model.ConfigModel;
+import org.dbsyncer.parser.model.Meta;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/21 21:35
+ */
+@Component
+public class MetaGroupStrategy implements GroupStrategy {
+
+    @Override
+    public String getGroupId(ConfigModel model) {
+        if (model instanceof Meta) {
+            Meta m = (Meta) model;
+            String type = m.getType();
+            String mappingId = m.getMappingId();
+            // 格式:${type} + "_" + ${mappingId}
+            return new StringBuilder(type).append("_").append(mappingId).toString();
+        }
+        throw new ManagerException(String.format("Not support config model \"%s\".", model));
+    }
+
+}

+ 0 - 20
dbsyncer-parser/src/main/java/org/dbsyncer/parser/constant/ModelConstant.java

@@ -1,20 +0,0 @@
-package org.dbsyncer.parser.constant;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2020/03/30 16:55
- */
-public class ModelConstant {
-
-    /**
-     * 增量同步
-     */
-    public static final String INCREMENT = "increment";
-
-    /**
-     * 全量同步
-     */
-    public static final String FULL = "full";
-
-}

+ 3 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/enums/ConvertEnum.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.parser.enums;
 
 import org.apache.commons.lang.StringUtils;
-import org.dbsyncer.listener.ListenerException;
+import org.dbsyncer.parser.ParserException;
 import org.dbsyncer.parser.convert.Handler;
 import org.dbsyncer.parser.convert.handler.*;
 
@@ -87,13 +87,13 @@ public enum ConvertEnum {
         this.handler = handler;
     }
 
-    public static Handler getHandler(String code) throws ListenerException {
+    public static Handler getHandler(String code) throws ParserException {
         for (ConvertEnum e : ConvertEnum.values()) {
             if (StringUtils.equals(code, e.getCode())) {
                 return e.getHandler();
             }
         }
-        throw new ListenerException(String.format("Handler code \"%s\" does not exist.", code));
+        throw new ParserException(String.format("Handler code \"%s\" does not exist.", code));
     }
 
     public String getCode() {

+ 55 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/enums/MetaEnum.java

@@ -0,0 +1,55 @@
+package org.dbsyncer.parser.enums;
+
+import org.dbsyncer.parser.ParserException;
+
+/**
+ * 驱动状态枚举
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/21 16:19
+ */
+public enum MetaEnum {
+
+    /**
+     * 未运行
+     */
+    READY(0, "未运行"),
+    /**
+     * 运行中
+     */
+    RUNNING(1, "运行中");
+
+    private int code;
+    private String message;
+
+    MetaEnum(int code, String message) {
+        this.code = code;
+        this.message = message;
+    }
+
+    public static MetaEnum getMetaEnum(int code) throws ParserException {
+        for (MetaEnum e : MetaEnum.values()) {
+            if (code == e.getCode()) {
+                return e;
+            }
+        }
+        throw new ParserException(String.format("Meta code \"%s\" does not exist.", code));
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+}

+ 56 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/enums/ModelEnum.java

@@ -0,0 +1,56 @@
+package org.dbsyncer.parser.enums;
+
+import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.parser.ParserException;
+
+/**
+ * 驱动同步方式枚举
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/21 16:19
+ */
+public enum ModelEnum {
+
+    /**
+     * 全量
+     */
+    FULL("full", "全量"),
+    /**
+     * 增量
+     */
+    INCREMENT("INCREMENT", "增量");
+
+    private String code;
+    private String message;
+
+    ModelEnum(String code, String message) {
+        this.code = code;
+        this.message = message;
+    }
+
+    public static ModelEnum getModelEnum(String code) throws ParserException {
+        for (ModelEnum e : ModelEnum.values()) {
+            if (StringUtils.equals(code, e.getCode())) {
+                return e;
+            }
+        }
+        throw new ParserException(String.format("Model code \"%s\" does not exist.", code));
+    }
+
+    public String getCode() {
+        return code;
+    }
+
+    public void setCode(String code) {
+        this.code = code;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+}

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Mapping.java

@@ -2,7 +2,7 @@ package org.dbsyncer.parser.model;
 
 import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.listener.config.ListenerConfig;
-import org.dbsyncer.parser.constant.ModelConstant;
+import org.dbsyncer.parser.enums.ModelEnum;
 
 import java.util.List;
 
@@ -30,7 +30,7 @@ public class Mapping extends AbstractConfigModel {
     /**
      * 同步方式
      *
-     * @see ModelConstant
+     * @see ModelEnum
      */
     private String model;
 

+ 26 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Meta.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.parser.model;
 
+import org.dbsyncer.parser.enums.MetaEnum;
+
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -16,6 +18,11 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class Meta extends ConfigModel {
 
+    private String mappingId;
+    /**
+     * {@link MetaEnum}
+     */
+    private int state;
     private AtomicInteger total;
     private AtomicInteger success;
     private AtomicInteger fail;
@@ -24,13 +31,31 @@ public class Meta extends ConfigModel {
     public Meta() {
     }
 
-    public Meta(AtomicInteger total, AtomicInteger success, AtomicInteger fail, Map<String, String> map) {
+    public Meta(String mappingId, int state, AtomicInteger total, AtomicInteger success, AtomicInteger fail, Map<String, String> map) {
+        this.mappingId = mappingId;
+        this.state = state;
         this.total = total;
         this.success = success;
         this.fail = fail;
         this.map = map;
     }
 
+    public String getMappingId() {
+        return mappingId;
+    }
+
+    public void setMappingId(String mappingId) {
+        this.mappingId = mappingId;
+    }
+
+    public int getState() {
+        return state;
+    }
+
+    public void setState(int state) {
+        this.state = state;
+    }
+
     public AtomicInteger getTotal() {
         return total;
     }