Browse Source

impl write

AE86 5 years ago
parent
commit
dab7649b13

+ 23 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/IncrementRefreshEvent.java

@@ -0,0 +1,23 @@
+package org.dbsyncer.common.event;
+
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.event.ApplicationContextEvent;
+
+public class IncrementRefreshEvent extends ApplicationContextEvent {
+
+    private String metaId;
+
+    /**
+     * Create a new ContextStartedEvent.
+     *
+     * @param source the {@code ApplicationContext} that the event is raised for (must not be {@code null})
+     */
+    public IncrementRefreshEvent(ApplicationContext source, String metaId) {
+        super(source);
+        this.metaId = metaId;
+    }
+
+    public String getMetaId() {
+        return metaId;
+    }
+}

+ 14 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java

@@ -71,7 +71,7 @@ public interface Connector {
     /**
     /**
      * 分页获取数据源数据
      * 分页获取数据源数据
      *
      *
-     * @param config    数据源配置
+     * @param config    连接器配置
      * @param command   执行命令
      * @param command   执行命令
      * @param pageIndex 页数
      * @param pageIndex 页数
      * @param pageSize  页大小
      * @param pageSize  页大小
@@ -82,11 +82,23 @@ public interface Connector {
     /**
     /**
      * 批量写入目标源数据
      * 批量写入目标源数据
      *
      *
-     * @param config  数据源配置
+     * @param config  连接器配置
      * @param command 执行命令
      * @param command 执行命令
+     * @param fields  字段信息
      * @param data    数据
      * @param data    数据
      * @return
      * @return
      */
      */
     Result writer(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> data);
     Result writer(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> data);
 
 
+    /**
+     * 写入目标源数据
+     *
+     * @param config  连接器配置
+     * @param fields  字段信息
+     * @param command 执行命令
+     * @param event   事件
+     * @param data    数据
+     * @return
+     */
+    Result writer(ConnectorConfig config, List<Field> fields, Map<String, String> command, String event, Map<String, Object> data);
 }
 }

+ 7 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -100,6 +100,13 @@ public class ConnectorFactory {
         return result;
         return result;
     }
     }
 
 
+    public Result writer(ConnectorConfig config, List<Field> fields, Map<String, String> command, String event, Map<String, Object> data) {
+        Connector connector = getConnector(config.getConnectorType());
+        Result result = connector.writer(config, fields, command, event, data);
+        Assert.notNull(result, "Connector writer result can not null");
+        return result;
+    }
+
     /**
     /**
      * 获取连接器
      * 获取连接器
      *
      *

+ 48 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -5,6 +5,7 @@ import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.enums.SetterEnum;
 import org.dbsyncer.connector.enums.SetterEnum;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
@@ -221,6 +222,53 @@ public abstract class AbstractDatabaseConnector implements Database {
         return result;
         return result;
     }
     }
 
 
+    @Override
+    public Result writer(ConnectorConfig config, List<Field> fields, Map<String, String> command, String event, Map<String, Object> data) {
+        // 1、获取 SQL
+        String sql = command.get(event);
+        Assert.hasText(sql, "执行语句不能为空.");
+        if (CollectionUtils.isEmpty(data) || CollectionUtils.isEmpty(fields)) {
+            logger.error("writer data can not be empty.");
+            throw new ConnectorException("writer data can not be empty.");
+        }
+        List<Object> args = new ArrayList<>();
+        fields.forEach(f -> args.add(data.get(f.getName())));
+        if(!StringUtils.equals(ConnectorConstant.OPERTION_INSERT, event)){
+            // set pk
+            List<Field> pkList = fields.stream().filter(f -> f.isPk()).collect(Collectors.toList());
+            if (CollectionUtils.isEmpty(pkList)) {
+                logger.error("writer pk can not be empty.");
+                throw new ConnectorException("writer pk can not be empty.");
+            }
+            String pk = pkList.get(0).getName();
+            args.add(data.get(pk));
+        }
+
+        DatabaseConfig cfg = (DatabaseConfig) config;
+        JdbcTemplate jdbcTemplate = null;
+        Result result = new Result();
+        try {
+            // 2、获取连接
+            jdbcTemplate = getJdbcTemplate(cfg);
+
+            // 3、设置参数
+            int update = jdbcTemplate.update(sql, args);
+            if (0 == update) {
+                throw new ConnectorException("写入失败");
+            }
+        } catch (Exception e) {
+            // 记录错误数据
+            result.getFailData().add(data);
+            result.getFail().set(1);
+            result.getError().append(e.getMessage()).append("\r\n");
+            logger.error(e.getMessage());
+        } finally {
+            // 释放连接
+            this.close(jdbcTemplate);
+        }
+        return result;
+    }
+
     @Override
     @Override
     public JdbcTemplate getJdbcTemplate(DatabaseConfig config) {
     public JdbcTemplate getJdbcTemplate(DatabaseConfig config) {
         return DatabaseUtil.getJdbcTemplate(config);
         return DatabaseUtil.getJdbcTemplate(config);

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ldap/LdapConnector.java

@@ -68,6 +68,11 @@ public final class LdapConnector implements Ldap {
 		return null;
 		return null;
 	}
 	}
 
 
+	@Override
+	public Result writer(ConnectorConfig config, List<Field> fields, Map<String, String> command, String event, Map<String, Object> data) {
+		return null;
+	}
+
 	@Override
 	@Override
 	public LdapTemplate getLdapTemplate(LdapConfig config) throws AuthenticationException, CommunicationException, javax.naming.NamingException {
 	public LdapTemplate getLdapTemplate(LdapConfig config) throws AuthenticationException, CommunicationException, javax.naming.NamingException {
 		return LdapUtil.getLdapTemplate(config);
 		return LdapUtil.getLdapTemplate(config);

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/RedisConnector.java

@@ -72,6 +72,11 @@ public final class RedisConnector implements Redis {
         return null;
         return null;
     }
     }
 
 
+    @Override
+    public Result writer(ConnectorConfig config, List<Field> fields, Map<String, String> command, String event, Map<String, Object> data) {
+        return null;
+    }
+
     @Override
     @Override
     public RedisTemplate getRedisTemplate(RedisConfig config) {
     public RedisTemplate getRedisTemplate(RedisConfig config) {
         return this.getRedisTemplate(config, null, null, null);
         return this.getRedisTemplate(config, null, null, null);

+ 22 - 12
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.manager.puller.impl;
 package org.dbsyncer.manager.puller.impl;
 
 
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.common.event.Event;
+import org.dbsyncer.common.event.IncrementRefreshEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.listener.DefaultExtractor;
 import org.dbsyncer.listener.DefaultExtractor;
@@ -13,6 +14,7 @@ import org.dbsyncer.parser.model.*;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
 
 
@@ -21,7 +23,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 
 
 /**
 /**
  * 增量同步
  * 增量同步
@@ -31,7 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * @date 2020/04/26 15:28
  * @date 2020/04/26 15:28
  */
  */
 @Component
 @Component
-public class IncrementPuller extends AbstractPuller {
+public class IncrementPuller extends AbstractPuller implements ApplicationListener<IncrementRefreshEvent> {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
@@ -88,11 +89,29 @@ public class IncrementPuller extends AbstractPuller {
         }
         }
     }
     }
 
 
+    @Override
+    public void onApplicationEvent(IncrementRefreshEvent event) {
+        flush(event.getMetaId());
+    }
+
     private void finished(String metaId) {
     private void finished(String metaId) {
         map.remove(metaId);
         map.remove(metaId);
         publishClosedEvent(metaId);
         publishClosedEvent(metaId);
     }
     }
 
 
+    private void flush(String metaId){
+        // TODO 更新待优化,存在性能问题
+        logger.info("flushEvent");
+        DefaultExtractor extractor = map.get(metaId);
+        if (null != extractor) {
+            Meta meta = manager.getMeta(metaId);
+            if (null != meta) {
+                meta.setMap(extractor.getMap());
+                manager.editMeta(meta);
+            }
+        }
+    }
+
     final class DefaultListener implements Event {
     final class DefaultListener implements Event {
 
 
         private Mapping mapping;
         private Mapping mapping;
@@ -130,16 +149,7 @@ public class IncrementPuller extends AbstractPuller {
 
 
         @Override
         @Override
         public void flushEvent() {
         public void flushEvent() {
-            // TODO 更新待优化,存在性能问题
-            logger.info("flushEvent");
-            DefaultExtractor extractor = map.get(metaId);
-            if (null != extractor) {
-                Meta meta = manager.getMeta(metaId);
-                if (null != meta) {
-                    meta.setMap(extractor.getMap());
-                    manager.editMeta(meta);
-                }
-            }
+            flush(metaId);
         }
         }
 
 
     }
     }

+ 49 - 15
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -2,6 +2,7 @@ package org.dbsyncer.parser;
 
 
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.event.FullRefreshEvent;
 import org.dbsyncer.common.event.FullRefreshEvent;
+import org.dbsyncer.common.event.IncrementRefreshEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Task;
 import org.dbsyncer.common.model.Task;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
@@ -171,14 +172,13 @@ public class ParserFactory implements Parser {
         String sTableName = tableGroup.getSourceTable().getName();
         String sTableName = tableGroup.getSourceTable().getName();
         String tTableName = tableGroup.getTargetTable().getName();
         String tTableName = tableGroup.getTargetTable().getName();
         Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
         Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
-        // 获取同步字段
-        Picker picker = new Picker();
-        PickerUtil.pickFields(picker, fieldMapping);
-
         // 转换配置(默认使用全局)
         // 转换配置(默认使用全局)
         List<Convert> convert = CollectionUtils.isEmpty(tableGroup.getConvert()) ? mapping.getConvert() : tableGroup.getConvert();
         List<Convert> convert = CollectionUtils.isEmpty(tableGroup.getConvert()) ? mapping.getConvert() : tableGroup.getConvert();
         // 插件配置(默认使用全局)
         // 插件配置(默认使用全局)
         Plugin plugin = null == tableGroup.getPlugin() ? mapping.getPlugin() : tableGroup.getPlugin();
         Plugin plugin = null == tableGroup.getPlugin() ? mapping.getPlugin() : tableGroup.getPlugin();
+        // 获取同步字段
+        Picker picker = new Picker();
+        PickerUtil.pickFields(picker, fieldMapping);
 
 
         // 检查分页参数
         // 检查分页参数
         Map<String, String> params = getMeta(metaId).getMap();
         Map<String, String> params = getMeta(metaId).getMap();
@@ -207,14 +207,14 @@ public class ParserFactory implements Parser {
             PickerUtil.pickData(picker, data);
             PickerUtil.pickData(picker, data);
 
 
             // 3、参数转换
             // 3、参数转换
-            List<Map<String, Object>> target = picker.getTarget();
+            List<Map<String, Object>> target = picker.getTargetList();
             ConvertUtil.convert(convert, target);
             ConvertUtil.convert(convert, target);
 
 
             // 4、插件转换
             // 4、插件转换
             pluginFactory.convert(plugin, data, target);
             pluginFactory.convert(plugin, data, target);
 
 
             // 5、写入目标源
             // 5、写入目标源
-            Result writer = executeBatch(tConfig, command, picker.getTargetFields(), target, threadSize, batchSize);
+            Result writer = writeBatch(tConfig, command, picker.getTargetFields(), target, threadSize, batchSize);
 
 
             // 6、更新结果
             // 6、更新结果
             flush(task, writer, target.size());
             flush(task, writer, target.size());
@@ -227,6 +227,37 @@ public class ParserFactory implements Parser {
     @Override
     @Override
     public void execute(Mapping mapping, TableGroup tableGroup, DataEvent dataEvent) {
     public void execute(Mapping mapping, TableGroup tableGroup, DataEvent dataEvent) {
         logger.info("同步数据=> dataEvent:{}", dataEvent);
         logger.info("同步数据=> dataEvent:{}", dataEvent);
+        final String metaId = mapping.getMetaId();
+
+        ConnectorConfig tConfig = getConnectorConfig(mapping.getTargetConnectorId());
+        Map<String, String> command = tableGroup.getCommand();
+        List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
+        // 转换配置(默认使用全局)
+        List<Convert> convert = CollectionUtils.isEmpty(tableGroup.getConvert()) ? mapping.getConvert() : tableGroup.getConvert();
+        // 插件配置(默认使用全局)
+        Plugin plugin = null == tableGroup.getPlugin() ? mapping.getPlugin() : tableGroup.getPlugin();
+        // 获取同步字段
+        Picker picker = new Picker();
+        PickerUtil.pickFields(picker, fieldMapping);
+
+        // 1、映射字段
+        String event = dataEvent.getEvent();
+        Map<String, Object> data = dataEvent.getData();
+        PickerUtil.pickData(picker, data);
+
+        // 2、参数转换
+        Map<String, Object> target = picker.getTarget();
+        ConvertUtil.convert(convert, target);
+
+        // 3、插件转换
+        pluginFactory.convert(plugin, event, data, target);
+
+        // 4、写入目标源
+        Result writer = connectorFactory.writer(tConfig, picker.getTargetFields(), command, event, target);
+
+        // 5、更新结果
+        flush(metaId, writer, 1);
+        applicationContext.publishEvent(new IncrementRefreshEvent(applicationContext, metaId));
     }
     }
 
 
     /**
     /**
@@ -237,20 +268,23 @@ public class ParserFactory implements Parser {
      * @param total
      * @param total
      */
      */
     private void flush(Task task, Result writer, long total) {
     private void flush(Task task, Result writer, long total) {
+        flush(task.getId(), writer, total);
+
+        // 发布刷新事件给FullExtractor
+        task.setEndTime(System.currentTimeMillis());
+        applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
+    }
+
+    private void flush(String metaId, Result writer, long total) {
         // 引用传递
         // 引用传递
         long fail = writer.getFail().get();
         long fail = writer.getFail().get();
-        long success = total - fail;
-        Meta meta = getMeta(task.getId());
+        Meta meta = getMeta(metaId);
         meta.getFail().getAndAdd(fail);
         meta.getFail().getAndAdd(fail);
-        meta.getSuccess().getAndAdd(success);
+        meta.getSuccess().getAndAdd(total - fail);
         // print process
         // print process
-        logger.info("任务:{}, 成功:{}, 失败:{}", task.getId(), meta.getSuccess(), meta.getFail());
+        logger.info("任务:{}, 成功:{}, 失败:{}", metaId, meta.getSuccess(), meta.getFail());
 
 
         // TODO 记录错误日志
         // TODO 记录错误日志
-
-        // 发布刷新事件给FullExtractor
-        task.setEndTime(System.currentTimeMillis());
-        applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
     }
     }
 
 
     /**
     /**
@@ -292,7 +326,7 @@ public class ParserFactory implements Parser {
      * @param batchSize
      * @param batchSize
      * @return
      * @return
      */
      */
-    private Result executeBatch(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> target,
+    private Result writeBatch(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> target,
                                 int threadSize, int batchSize) {
                                 int threadSize, int batchSize) {
         // 总数
         // 总数
         int total = target.size();
         int total = target.size();

+ 7 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/DataEvent.java

@@ -1,6 +1,8 @@
 package org.dbsyncer.parser.model;
 package org.dbsyncer.parser.model;
 
 
+import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 
 
 import java.util.Map;
 import java.util.Map;
 
 
@@ -16,6 +18,10 @@ public final class DataEvent {
         this.after = after;
         this.after = after;
     }
     }
 
 
+    public Map<String, Object> getData() {
+        return StringUtils.equals(ConnectorConstant.OPERTION_DELETE, event) ? before : after;
+    }
+
     public String getEvent() {
     public String getEvent() {
         return event;
         return event;
     }
     }
@@ -44,4 +50,5 @@ public final class DataEvent {
     public String toString() {
     public String toString() {
         return JsonUtil.objToJson(this);
         return JsonUtil.objToJson(this);
     }
     }
+
 }
 }

+ 12 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -9,7 +9,8 @@ public class Picker {
 
 
     private List<Field> sourceFields;
     private List<Field> sourceFields;
     private List<Field> targetFields;
     private List<Field> targetFields;
-    private List<Map<String, Object>> target;
+    private List<Map<String, Object>> targetList;
+    private Map<String, Object> target;
 
 
     public List<Field> getSourceFields() {
     public List<Field> getSourceFields() {
         return sourceFields;
         return sourceFields;
@@ -27,11 +28,19 @@ public class Picker {
         this.targetFields = targetFields;
         this.targetFields = targetFields;
     }
     }
 
 
-    public List<Map<String, Object>> getTarget() {
+    public List<Map<String, Object>> getTargetList() {
+        return targetList;
+    }
+
+    public void setTargetList(List<Map<String, Object>> targetList) {
+        this.targetList = targetList;
+    }
+
+    public Map<String, Object> getTarget() {
         return target;
         return target;
     }
     }
 
 
-    public void setTarget(List<Map<String, Object>> target) {
+    public void setTarget(Map<String, Object> target) {
         this.target = target;
         this.target = target;
     }
     }
 }
 }

+ 28 - 16
dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/ConvertUtil.java

@@ -21,25 +21,37 @@ public abstract class ConvertUtil {
     public static void convert(List<Convert> convert, List<Map<String, Object>> data) {
     public static void convert(List<Convert> convert, List<Map<String, Object>> data) {
         if (!CollectionUtils.isEmpty(convert) && !CollectionUtils.isEmpty(data)) {
         if (!CollectionUtils.isEmpty(convert) && !CollectionUtils.isEmpty(data)) {
             // 并行流计算
             // 并行流计算
-            final int size = convert.size();
             data.parallelStream().forEach(row -> {
             data.parallelStream().forEach(row -> {
-                // 替换row值, 复用堆栈地址,减少开销
-                Convert c = null;
-                String name = null;
-                String code = null;
-                String args = null;
-                Object value = null;
-                for (int i = 0; i < size; i++) {
-                    c = convert.get(i);
-                    name = c.getName();
-                    code = c.getConvertCode();
-                    args = c.getArgs();
-                    value = ConvertEnum.getHandler(code).handle(args, row.get(name));
-
-                    row.put(name, value);
-                }
+                convert(convert, row);
             });
             });
         }
         }
     }
     }
 
 
+    /**
+     * 转换参数
+     *
+     * @param convert
+     * @param row
+     */
+    public static void convert(List<Convert> convert, Map<String, Object> row) {
+        if (!CollectionUtils.isEmpty(convert) && !CollectionUtils.isEmpty(row)) {
+            // 替换row值, 复用堆栈地址,减少开销
+            final int size = convert.size();
+            Convert c = null;
+            String name = null;
+            String code = null;
+            String args = null;
+            Object value = null;
+            for (int i = 0; i < size; i++) {
+                c = convert.get(i);
+                name = c.getName();
+                code = c.getConvertCode();
+                args = c.getArgs();
+                value = ConvertEnum.getHandler(code).handle(args, row.get(name));
+
+                row.put(name, value);
+            }
+        }
+    }
+
 }
 }

+ 22 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/PickerUtil.java

@@ -55,6 +55,28 @@ public abstract class PickerUtil {
                 target.add(r);
                 target.add(r);
             }
             }
 
 
+            picker.setTargetList(target);
+        }
+    }
+
+    public static void pickData(Picker picker, Map<String, Object> row) {
+        if(!CollectionUtils.isEmpty(row)){
+            Map<String, Object> target = new HashMap<>();
+            List<Field> sFields = picker.getSourceFields();
+            List<Field> tFields = picker.getTargetFields();
+
+            final int kSize = sFields.size();
+            String sName = null;
+            String tName = null;
+            Object v = null;
+            for (int k = 0; k < kSize; k++) {
+                sName = sFields.get(k).getName();
+                v = row.get(sName);
+
+                tName = tFields.get(k).getName();
+                target.put(tName, v);
+            }
+
             picker.setTarget(target);
             picker.setTarget(target);
         }
         }
     }
     }

+ 5 - 0
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java

@@ -29,4 +29,9 @@ public class PluginFactory {
             // TODO 插件转换
             // TODO 插件转换
         }
         }
     }
     }
+
+    public void convert(Plugin plugin, String event, Map<String, Object> source, Map<String, Object> target) {
+        if (null != plugin) {
+        }
+    }
 }
 }