AE86 5 лет назад
Родитель
Сommit
b790b78630

+ 24 - 17
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -29,8 +29,7 @@ import org.springframework.util.Assert;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * @author AE86
@@ -77,8 +76,8 @@ public class MappingChecker extends AbstractChecker implements ApplicationContex
         this.modifyConfigModel(mapping, params);
 
         // 创建meta
-        String metaId = addMeta(mapping.getId());
-        mapping.setMetaId(metaId);
+        addMeta(mapping);
+
         return mapping;
     }
 
@@ -124,7 +123,6 @@ public class MappingChecker extends AbstractChecker implements ApplicationContex
         // 更新meta
         updateMeta(mapping);
 
-        // 增量配置
         return mapping;
     }
 
@@ -144,29 +142,38 @@ public class MappingChecker extends AbstractChecker implements ApplicationContex
         }
     }
 
-    private String addMeta(String mappingId) {
-        AtomicInteger total = new AtomicInteger(0);
-        AtomicInteger success = new AtomicInteger(0);
-        AtomicInteger fail = new AtomicInteger(0);
-        Map<String, String> map = new ConcurrentHashMap<>();
-        Meta meta = new Meta(mappingId, MetaEnum.READY.getCode(), total, success, fail, map);
+    private void addMeta(Mapping mapping) {
+        Meta meta = new Meta();
+        meta.setMappingId(mapping.getId());
         meta.setType(ConfigConstant.META);
         meta.setName(ConfigConstant.META);
 
         // 修改基本配置
         this.modifyConfigModel(meta, new HashMap<>());
 
-        return manager.addMeta(meta);
+        String id = manager.addMeta(meta);
+        mapping.setMetaId(id);
     }
 
     private void updateMeta(Mapping mapping) {
-        if(StringUtils.equals(ModelEnum.FULL.getCode(), mapping.getModel())){
-            String metaId = mapping.getMetaId();
-            Meta meta = manager.getMeta(metaId);
-            Assert.notNull(meta, "驱动meta不存在.");
-            // TODO 获取驱动数据源总条数
+        Meta meta = manager.getMeta(mapping.getMetaId());
+        Assert.notNull(meta, "驱动meta不存在.");
+
+        // 清空状态
+        meta.clear();
+
+        getMetaTotal(meta, mapping.getModel());
+
+        manager.editMeta(meta);
+    }
 
+    private void getMetaTotal(Meta meta, String model) {
+        if (ModelEnum.isFull(model)) {
+            // TODO 统计tableGroup总条数
+            meta.setTotal(new AtomicLong(1000));
         }
+        meta.setSuccess(new AtomicLong(200));
+        meta.setFail(new AtomicLong(1));
     }
 
 }

+ 46 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/task/Task.java

@@ -2,9 +2,22 @@ package org.dbsyncer.common.task;
 
 public class Task {
 
+    private String id;
+
+    private int batchNum;
+
+    private int threadNum;
+
     private StateEnum state;
 
-    public Task() {
+    private long beginTime;
+
+    private long endTime;
+
+    public Task(String id, int batchNum, int threadNum) {
+        this.id = id;
+        this.batchNum = batchNum;
+        this.threadNum = threadNum;
         this.state = StateEnum.RUNNING;
     }
 
@@ -16,6 +29,38 @@ public class Task {
         return StateEnum.RUNNING == state;
     }
 
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public int getBatchNum() {
+        return batchNum;
+    }
+
+    public int getThreadNum() {
+        return threadNum;
+    }
+
+    public long getBeginTime() {
+        return beginTime;
+    }
+
+    public void setBeginTime(long beginTime) {
+        this.beginTime = beginTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+    }
+
     public enum StateEnum{
         /**
          * 运行

+ 0 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/AbstractExtractor.java

@@ -15,7 +15,6 @@ public abstract class AbstractExtractor implements Extractor {
 
     protected void publishClosedEvent(String metaId) {
         applicationContext.publishEvent(new ClosedEvent(applicationContext, metaId));
-        logger.info("结束任务:{}", metaId);
     }
 
 }

+ 30 - 22
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/impl/FullExtractor.java

@@ -1,23 +1,22 @@
 package org.dbsyncer.manager.extractor.impl;
 
-import org.dbsyncer.common.event.ClosedEvent;
 import org.dbsyncer.common.task.Task;
+import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.extractor.AbstractExtractor;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.TableGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 
 /**
  * 全量同步
@@ -41,22 +40,41 @@ public class FullExtractor extends AbstractExtractor {
 
     @Override
     public void asyncStart(Mapping mapping) {
-        String metaId = mapping.getMetaId();
-        map.putIfAbsent(metaId, new Task());
-        Task task = map.get(metaId);
-
-        // TODO 获取数据源连接器
-        Connector connector = manager.getConnector(mapping.getSourceConnectorId());
-        Assert.notNull(connector, "数据源配置不能为空.");
+        final String mappingId = mapping.getId();
+        final String sourceConnectorId = mapping.getSourceConnectorId();
+        final String metaId = mapping.getMetaId();
+        int batchNum = mapping.getBatchNum();
+        int threadNum = mapping.getThreadNum();
+        map.putIfAbsent(metaId, new Task(metaId, batchNum, threadNum));
 
         try {
+            Connector connector = manager.getConnector(sourceConnectorId);
+            Assert.notNull(connector, "数据源连接器不能为空.");
+            ConnectorConfig config = connector.getConfig();
+            Assert.notNull(config, "连接器配置不能为空.");
+            List<TableGroup> list = manager.getTableGroupAll(mappingId);
+            Assert.notEmpty(list, "映射关系为空");
+
             logger.info("启动任务:{}", metaId);
-            run(task);
+            Task task = map.get(metaId);
+            task.setBeginTime(System.currentTimeMillis());
+            for (TableGroup t : list) {
+                if (!task.isRunning()) {
+                    break;
+                }
+                parser.execute(task, config, t);
+            }
+            task.setEndTime(System.currentTimeMillis());
+
+            // TODO 同步运行结果
+
+
         } catch (Exception e) {
             logger.error(e.getMessage());
         } finally {
             map.remove(metaId);
             publishClosedEvent(metaId);
+            logger.info("结束任务:{}", metaId);
         }
     }
 
@@ -68,14 +86,4 @@ public class FullExtractor extends AbstractExtractor {
         }
     }
 
-    protected void run(Task task) throws InterruptedException {
-        for (; ; ) {
-            if (!task.isRunning()) {
-                break;
-            }
-            logger.info("模拟同步休眠5s");
-            TimeUnit.SECONDS.sleep(5);
-        }
-    }
-
 }

+ 9 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.parser;
 
+import org.dbsyncer.common.task.Task;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.enums.ConnectorEnum;
@@ -100,4 +101,12 @@ public interface Parser {
      */
     List<ConvertEnum> getConvertEnumAll();
 
+    /**
+     * 全量同步
+     *
+     * @param task
+     * @param config
+     * @param tableGroup
+     */
+    void execute(Task task, ConnectorConfig config, TableGroup tableGroup);
 }

+ 23 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -1,18 +1,21 @@
 package org.dbsyncer.parser;
 
 import org.dbsyncer.cache.CacheService;
+import org.dbsyncer.common.task.Task;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
-import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.parser.enums.ConvertEnum;
-import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.storage.SnowflakeIdWorker;
 import org.json.JSONException;
 import org.json.JSONObject;
@@ -27,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author AE86
@@ -66,7 +70,7 @@ public class ParserFactory implements Parser {
     @Override
     public Map<String, String> getCommand(String sourceConnectorId, String targetConnectorId, TableGroup tableGroup) {
         List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
-        if(CollectionUtils.isEmpty(fieldMapping)){
+        if (CollectionUtils.isEmpty(fieldMapping)) {
             return null;
         }
         String sType = getConnectorConfig(sourceConnectorId).getConnectorType();
@@ -75,7 +79,7 @@ public class ParserFactory implements Parser {
         String tTableName = tableGroup.getTargetTable().getName();
         Table sTable = new Table().setName(sTableName).setColumn(new ArrayList<>());
         Table tTable = new Table().setName(tTableName).setColumn(new ArrayList<>());
-        fieldMapping.forEach(m ->{
+        fieldMapping.forEach(m -> {
             sTable.getColumn().add(m.getSource());
             tTable.getColumn().add(m.getTarget());
         });
@@ -138,6 +142,21 @@ public class ParserFactory implements Parser {
         return Arrays.asList(ConvertEnum.values());
     }
 
+    @Override
+    public void execute(Task task, ConnectorConfig config, TableGroup tableGroup) {
+        try {
+            for (int i = 0; i < 10; i++) {
+                if (!task.isRunning()) {
+                    break;
+                }
+                logger.info("模拟迁移5s");
+                TimeUnit.SECONDS.sleep(5);
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
     /**
      * 获取连接配置
      *

+ 45 - 17
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Meta.java

@@ -2,8 +2,9 @@ package org.dbsyncer.parser.model;
 
 import org.dbsyncer.parser.enums.MetaEnum;
 
+import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * <p>驱动同步元信息</p>
@@ -23,21 +24,32 @@ public class Meta extends ConfigModel {
      * {@link MetaEnum}
      */
     private int state;
-    private AtomicInteger total;
-    private AtomicInteger success;
-    private AtomicInteger fail;
+    private AtomicLong total;
+    private AtomicLong success;
+    private AtomicLong fail;
     private Map<String, String> map;
+    private long beginTime;
+    private long endTime;
 
     public Meta() {
+        init();
     }
 
-    public Meta(String mappingId, int state, AtomicInteger total, AtomicInteger success, AtomicInteger fail, Map<String, String> map) {
-        this.mappingId = mappingId;
-        this.state = state;
-        this.total = total;
-        this.success = success;
-        this.fail = fail;
-        this.map = map;
+    /**
+     * 还原状态
+     */
+    public void clear() {
+        init();
+    }
+
+    private void init(){
+        this.state = MetaEnum.READY.getCode();
+        this.total = new AtomicLong(0);
+        this.success = new AtomicLong(0);
+        this.fail = new AtomicLong(0);
+        this.map = new LinkedHashMap<>();
+        this.beginTime = 0L;
+        this.endTime = 0L;
     }
 
     public String getMappingId() {
@@ -56,27 +68,27 @@ public class Meta extends ConfigModel {
         this.state = state;
     }
 
-    public AtomicInteger getTotal() {
+    public AtomicLong getTotal() {
         return total;
     }
 
-    public void setTotal(AtomicInteger total) {
+    public void setTotal(AtomicLong total) {
         this.total = total;
     }
 
-    public AtomicInteger getSuccess() {
+    public AtomicLong getSuccess() {
         return success;
     }
 
-    public void setSuccess(AtomicInteger success) {
+    public void setSuccess(AtomicLong success) {
         this.success = success;
     }
 
-    public AtomicInteger getFail() {
+    public AtomicLong getFail() {
         return fail;
     }
 
-    public void setFail(AtomicInteger fail) {
+    public void setFail(AtomicLong fail) {
         this.fail = fail;
     }
 
@@ -87,4 +99,20 @@ public class Meta extends ConfigModel {
     public void setMap(Map<String, String> map) {
         this.map = map;
     }
+
+    public long getBeginTime() {
+        return beginTime;
+    }
+
+    public void setBeginTime(long beginTime) {
+        this.beginTime = beginTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+    }
 }

+ 1 - 1
dbsyncer-web/src/main/resources/templates/index/index.html

@@ -159,7 +159,7 @@
                     <td th:text="${m?.model}"></td>
                     <td>
                         <span th:if="${m?.model eq '全量' and m?.total gt 0}">
-                            进度:[[${#numbers.formatDecimal(((m?.success + m?.fail) / m?.total * 10000 / 100.00),2 ,2)}]]%,
+                            进度:[[${#numbers.formatDecimal(((m?.success + m?.fail) / m?.total * 100.00),0 ,2)}]]%,
                             总数:[[${m?.total}]],
                         </span>
                         成功:[[${m?.success}]]