AE86 5 年 前
コミット
96006a9e96

+ 0 - 2
dbsyncer-cache/src/main/java/org/dbsyncer/cache/CacheService.java

@@ -1,7 +1,5 @@
 package org.dbsyncer.cache;
 
-import java.util.Map;
-
 /**
  * @author AE86
  * @version 1.0.0

+ 24 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/RefreshEvent.java

@@ -0,0 +1,24 @@
+package org.dbsyncer.common.event;
+
+import org.dbsyncer.common.task.Task;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.event.ApplicationContextEvent;
+
+public class RefreshEvent extends ApplicationContextEvent {
+
+    private Task task;
+
+    /**
+     * Create a new ContextStartedEvent.
+     *
+     * @param source the {@code ApplicationContext} that the event is raised for (must not be {@code null})
+     */
+    public RefreshEvent(ApplicationContext source, Task task) {
+        super(source);
+        this.task = task;
+    }
+
+    public Task getTask() {
+        return task;
+    }
+}

+ 38 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/task/Result.java

@@ -0,0 +1,38 @@
+package org.dbsyncer.common.task;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Result {
+
+    private List<Map<String, Object>> data;
+
+    private AtomicLong fail;
+
+    private String error;
+
+    public List<Map<String, Object>> getData() {
+        return data;
+    }
+
+    public void setData(List<Map<String, Object>> data) {
+        this.data = data;
+    }
+
+    public AtomicLong getFail() {
+        return fail;
+    }
+
+    public void setFail(AtomicLong fail) {
+        this.fail = fail;
+    }
+
+    public String getError() {
+        return error;
+    }
+
+    public void setError(String error) {
+        this.error = error;
+    }
+}

+ 1 - 15
dbsyncer-common/src/main/java/org/dbsyncer/common/task/Task.java

@@ -4,20 +4,14 @@ public class Task {
 
     private String id;
 
-    private int batchNum;
-
-    private int threadNum;
-
     private StateEnum state;
 
     private long beginTime;
 
     private long endTime;
 
-    public Task(String id, int batchNum, int threadNum) {
+    public Task(String id) {
         this.id = id;
-        this.batchNum = batchNum;
-        this.threadNum = threadNum;
         this.state = StateEnum.RUNNING;
     }
 
@@ -37,14 +31,6 @@ public class Task {
         this.id = id;
     }
 
-    public int getBatchNum() {
-        return batchNum;
-    }
-
-    public int getThreadNum() {
-        return threadNum;
-    }
-
     public long getBeginTime() {
         return beginTime;
     }

+ 23 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java

@@ -1,8 +1,8 @@
 package org.dbsyncer.connector;
 
+import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.MetaInfo;
-import org.dbsyncer.connector.config.CommandConfig;
 
 import java.util.List;
 import java.util.Map;
@@ -57,4 +57,26 @@ public interface Connector {
      */
     Map<String, String> getTargetCommand(CommandConfig commandConfig);
 
+    /**
+     * 分页获取数据源数据
+     *
+     * @param config 数据源配置
+     * @param command 执行命令
+     * @param pageIndex 页数
+     * @param pageSize 页大小
+     * @return
+     */
+    //Result reader(ConnectorConfig config, Map<String, String> command, int pageIndex, int pageSize);
+
+    /**
+     * 批量写入目标源数据
+     *
+     * @param config 数据源配置
+     * @param command 执行命令
+     * @param threadSize 线程数
+     * @param data 数据
+     * @return
+     */
+    //Result writer(ConnectorConfig config, Map<String, String> command, int threadSize, List<Map<String, Object>> data);
+
 }

+ 11 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -1,6 +1,8 @@
 package org.dbsyncer.connector;
 
+import org.dbsyncer.common.task.Result;
 import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.config.CommandConfig;
@@ -72,13 +74,21 @@ public class ConnectorFactory {
         return map;
     }
 
+    public Result reader(ConnectorConfig config, Map<String, String> command, int pageIndex, int pageSize){
+        return new Result();
+    }
+
+    public Result writer(ConnectorConfig config, Map<String,String> command, int threadSize, List<Field> fields, List<Map<String,Object>> data) {
+        return new Result();
+    }
+
     /**
      * 获取连接器
      *
      * @param connectorType
      * @return
      */
-    public Connector getConnector(String connectorType) {
+    private Connector getConnector(String connectorType) {
         // 获取连接器类型
         Assert.hasText(connectorType, "ConnectorType can not be empty.");
         return ConnectorEnum.getConnector(connectorType);

+ 13 - 8
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/impl/FullExtractor.java

@@ -1,17 +1,17 @@
 package org.dbsyncer.manager.extractor.impl;
 
+import org.dbsyncer.common.event.RefreshEvent;
 import org.dbsyncer.common.task.Task;
-import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.extractor.AbstractExtractor;
 import org.dbsyncer.parser.Parser;
-import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * @date 2020/04/26 15:28
  */
 @Component
-public class FullExtractor extends AbstractExtractor {
+public class FullExtractor extends AbstractExtractor implements ApplicationListener<RefreshEvent> {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -43,9 +43,7 @@ public class FullExtractor extends AbstractExtractor {
     public void asyncStart(Mapping mapping) {
         final String mappingId = mapping.getId();
         final String metaId = mapping.getMetaId();
-        int batchNum = mapping.getBatchNum();
-        int threadNum = mapping.getThreadNum();
-        map.putIfAbsent(metaId, new Task(metaId, batchNum, threadNum));
+        map.putIfAbsent(metaId, new Task(metaId));
 
         try {
             List<TableGroup> list = manager.getTableGroupAll(mappingId);
@@ -57,6 +55,7 @@ public class FullExtractor extends AbstractExtractor {
             doTask(task, mapping, list);
 
         } catch (Exception e) {
+            // TODO 记录错误日志
             logger.error(e.getMessage());
         } finally {
             map.remove(metaId);
@@ -73,6 +72,12 @@ public class FullExtractor extends AbstractExtractor {
         }
     }
 
+    @Override
+    public void onApplicationEvent(RefreshEvent refreshEvent) {
+        // 异步监听任务刷新事件
+        flush(refreshEvent.getTask());
+    }
+
     private void doTask(Task task, Mapping mapping, List<TableGroup> list) {
         // 记录开始时间
         task.setBeginTime(System.currentTimeMillis());
@@ -91,12 +96,12 @@ public class FullExtractor extends AbstractExtractor {
     }
 
     private void flush(Task task) {
-        String id = task.getId();
-        Meta meta = manager.getMeta(id);
+        Meta meta = manager.getMeta(task.getId());
         Assert.notNull(meta, "检查meta为空.");
 
         meta.setBeginTime(task.getBeginTime());
         meta.setEndTime(task.getEndTime());
         manager.editMeta(meta);
     }
+
 }

+ 112 - 21
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -1,6 +1,9 @@
 package org.dbsyncer.parser;
 
+import org.apache.commons.lang.math.NumberUtils;
 import org.dbsyncer.cache.CacheService;
+import org.dbsyncer.common.event.RefreshEvent;
+import org.dbsyncer.common.task.Result;
 import org.dbsyncer.common.task.Task;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
@@ -12,17 +15,21 @@ import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
+import org.dbsyncer.parser.convert.Convert;
 import org.dbsyncer.parser.enums.ConvertEnum;
-import org.dbsyncer.parser.model.Connector;
-import org.dbsyncer.parser.model.FieldMapping;
-import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.enums.ParserEnum;
+import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.util.ConvertUtil;
+import org.dbsyncer.parser.util.PickerUtil;
+import org.dbsyncer.plugin.PluginFactory;
+import org.dbsyncer.plugin.config.Plugin;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
@@ -45,9 +52,15 @@ public class ParserFactory implements Parser {
     @Autowired
     private ConnectorFactory connectorFactory;
 
+    @Autowired
+    private PluginFactory pluginFactory;
+
     @Autowired
     private CacheService cacheService;
 
+    @Autowired
+    private ApplicationContext applicationContext;
+
     @Override
     public boolean alive(ConnectorConfig config) {
         return connectorFactory.isAlive(config);
@@ -143,32 +156,110 @@ public class ParserFactory implements Parser {
 
     @Override
     public void execute(Task task, Mapping mapping, TableGroup tableGroup) {
+        final String metaId = task.getId();
         final String sourceConnectorId = mapping.getSourceConnectorId();
         final String targetConnectorId = mapping.getTargetConnectorId();
+
         ConnectorConfig sConfig = getConnectorConfig(sourceConnectorId);
-        ConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
         Assert.notNull(sConfig, "数据源配置不能为空.");
+        ConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
         Assert.notNull(tConfig, "目标源配置不能为空.");
+        Map<String, String> command = tableGroup.getCommand();
+        Assert.notEmpty(command, "执行命令不能为空.");
+        List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
+        String sTableName = tableGroup.getSourceTable().getName();
+        String tTableName = tableGroup.getTargetTable().getName();
+        Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
+        // 获取同步字段
+        Picker picker = new Picker();
+        PickerUtil.pickFields(picker, fieldMapping);
 
-        try {
-            for (int i = 0; i < 10; i++) {
-                if (!task.isRunning()) {
-                    break;
-                }
-
-                // TODO 全量同步任务
-                // 1、获取数据源数据
-                // 2、值映射
-                // 3、参数转换
-                // 4、插件转换
-                // 5、写入目标源
-
-                logger.info("模拟迁移5s");
+        // 转换配置(默认使用全局)
+        List<Convert> convert = CollectionUtils.isEmpty(tableGroup.getConvert()) ? mapping.getConvert() : tableGroup.getConvert();
+        // 插件配置(默认使用全局)
+        Plugin plugin = null == tableGroup.getPlugin() ? mapping.getPlugin() : tableGroup.getPlugin();
+
+        // 检查分页参数
+        Map<String, String> params = getMeta(metaId).getMap();
+        params.putIfAbsent(ParserEnum.PAGE_INDEX.getCode(), ParserEnum.PAGE_INDEX.getDefaultValue());
+        int pageSize = mapping.getBatchNum();
+        int threadSize = mapping.getThreadNum();
+
+        for (; ; ) {
+            // TODO 模拟测试
+            logger.info("模拟迁移5s");
+            try {
                 TimeUnit.SECONDS.sleep(5);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
             }
-        } catch (InterruptedException e) {
-            e.printStackTrace();
+
+            if (!task.isRunning()) {
+                break;
+            }
+
+            // 1、获取数据源数据
+            int pageIndex = NumberUtils.toInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
+            Result reader = connectorFactory.reader(sConfig, command, pageIndex, pageSize);
+            List<Map<String, Object>> data = reader.getData();
+            if (CollectionUtils.isEmpty(data)) {
+                break;
+            }
+
+            // 2、映射字段
+            PickerUtil.pickData(picker, data);
+
+            // 3、参数转换
+            List<Map<String, Object>> target = picker.getTarget();
+            ConvertUtil.convert(convert, target);
+
+            // 4、插件转换
+            pluginFactory.convert(plugin, data, target);
+
+            // 5、写入目标源
+            Result writer = connectorFactory.writer(tConfig, command, threadSize, picker.getTargetFields(), target);
+
+            // 6、更新结果
+            flush(task, writer, target.size());
+
+            // 7、更新分页数
+            params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(pageIndex++));
         }
+        logger.info("完成任务:{}, 表[%s]写入到表[%s]", metaId, sTableName, tTableName);
+    }
+
+    /**
+     * 更新缓存
+     *
+     * @param task
+     * @param writer
+     * @param total
+     */
+    private void flush(Task task, Result writer, int total) {
+        // 引用传递
+        long fail = writer.getFail().get();
+        long success = total - fail;
+        Meta meta = getMeta(task.getId());
+        meta.getFail().getAndAdd(fail);
+        meta.getSuccess().getAndAdd(success);
+
+        // TODO 记录错误日志
+
+        // 发布刷新事件给FullExtractor
+        applicationContext.publishEvent(new RefreshEvent(applicationContext, task));
+    }
+
+    /**
+     * 获取Meta(注: 没有bean拷贝, 便于直接更新缓存)
+     *
+     * @param metaId
+     * @return
+     */
+    private Meta getMeta(String metaId) {
+        Assert.hasText(metaId, "Meta id can not be empty.");
+        Meta meta = cacheService.get(metaId, Meta.class);
+        Assert.notNull(meta, "Meta can not be null.");
+        return meta;
     }
 
     /**

+ 46 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/enums/ParserEnum.java

@@ -0,0 +1,46 @@
+package org.dbsyncer.parser.enums;
+
+/**
+ * 解析器参数枚举
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/29 10:19
+ */
+public enum ParserEnum {
+
+    /**
+     * 页数
+     */
+    PAGE_INDEX("pageIndex", "1");
+
+    /**
+     * 编码
+     */
+    private String code;
+    /**
+     * 默认值
+     */
+    private String defaultValue;
+
+    ParserEnum(String code, String defaultValue) {
+        this.code = code;
+        this.defaultValue = defaultValue;
+    }
+
+    public String getCode() {
+        return code;
+    }
+
+    public void setCode(String code) {
+        this.code = code;
+    }
+
+    public String getDefaultValue() {
+        return defaultValue;
+    }
+
+    public void setDefaultValue(String defaultValue) {
+        this.defaultValue = defaultValue;
+    }
+}

+ 37 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -0,0 +1,37 @@
+package org.dbsyncer.parser.model;
+
+import org.dbsyncer.connector.config.Field;
+
+import java.util.List;
+import java.util.Map;
+
+public class Picker {
+
+    private List<Field> sourceFields;
+    private List<Field> targetFields;
+    private List<Map<String, Object>> target;
+
+    public List<Field> getSourceFields() {
+        return sourceFields;
+    }
+
+    public void setSourceFields(List<Field> sourceFields) {
+        this.sourceFields = sourceFields;
+    }
+
+    public List<Field> getTargetFields() {
+        return targetFields;
+    }
+
+    public void setTargetFields(List<Field> targetFields) {
+        this.targetFields = targetFields;
+    }
+
+    public List<Map<String, Object>> getTarget() {
+        return target;
+    }
+
+    public void setTarget(List<Map<String, Object>> target) {
+        this.target = target;
+    }
+}

+ 26 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/ConvertUtil.java

@@ -0,0 +1,26 @@
+package org.dbsyncer.parser.util;
+
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.parser.convert.Convert;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class ConvertUtil {
+
+    private ConvertUtil() {
+    }
+
+    /**
+     * 转换参数
+     *
+     * @param convert
+     * @param data
+     */
+    public static void convert(List<Convert> convert, List<Map<String, Object>> data) {
+        if (!CollectionUtils.isEmpty(convert) && !CollectionUtils.isEmpty(data)) {
+            // TODO 参数转换
+        }
+    }
+
+}

+ 63 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/PickerUtil.java

@@ -0,0 +1,63 @@
+package org.dbsyncer.parser.util;
+
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.parser.convert.Convert;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.dbsyncer.parser.model.Picker;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class PickerUtil {
+
+    private PickerUtil() {
+    }
+
+    public static void pickFields(Picker picker, List<FieldMapping> fieldMapping) {
+        if(!CollectionUtils.isEmpty(fieldMapping)){
+            List<Field> sFields = new ArrayList<>();
+            List<Field> tFields = new ArrayList<>();
+            fieldMapping.forEach(m -> {
+                sFields.add(m.getSource());
+                tFields.add(m.getTarget());
+            });
+            picker.setSourceFields(sFields);
+            picker.setTargetFields(tFields);
+        }
+    }
+
+    public static void pickData(Picker picker, List<Map<String, Object>> data) {
+        if(!CollectionUtils.isEmpty(data)){
+            List<Map<String, Object>> target = new ArrayList<>();
+            List<Field> sFields = picker.getSourceFields();
+            List<Field> tFields = picker.getTargetFields();
+
+            final int kSize = sFields.size();
+            final int size = data.size();
+            Map<String, Object> row = null;
+            Map<String, Object> r = null;
+            String sName = null;
+            String tName = null;
+            Object v = null;
+            for (int i = 0; i < size; i++) {
+                row = data.get(i);
+                r = new HashMap<>();
+
+                for (int k = 0; k < kSize; k++) {
+                    sName = sFields.get(k).getName();
+                    v = row.get(sName);
+
+                    tName = tFields.get(k).getName();
+                    r.put(tName, v);
+                }
+                target.add(r);
+            }
+
+            picker.setTarget(target);
+        }
+    }
+
+}

+ 6 - 0
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java

@@ -5,6 +5,7 @@ import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * @author AE86
@@ -23,4 +24,9 @@ public class PluginFactory {
         return list;
     }
 
+    public void convert(Plugin plugin, List<Map<String, Object>> source, List<Map<String, Object>> target) {
+        if (null != plugin) {
+            // TODO 插件转换
+        }
+    }
 }