Parcourir la source

add full function

AE86 il y a 5 ans
Parent
commit
43765f2d92

+ 14 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/task/Result.java

@@ -12,8 +12,22 @@ public class Result {
 
     private String error;
 
+    public Result() {
+        init();
+    }
+
     public Result(List<Map<String, Object>> data) {
         this.data = data;
+        init();
+    }
+
+    public Result(String error) {
+        this.error = error;
+        init();
+    }
+
+    private void init(){
+        this.fail = new AtomicLong(0);
     }
 
     public List<Map<String, Object>> getData() {

+ 1 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java

@@ -75,10 +75,9 @@ public interface Connector {
      *
      * @param config 数据源配置
      * @param command 执行命令
-     * @param threadSize 线程数
      * @param data 数据
      * @return
      */
-    Result writer(ConnectorConfig config, Map<String, String> command, int threadSize, List<Field> fields, List<Map<String, Object>> data);
+    Result writer(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> data);
 
 }

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -81,9 +81,9 @@ public class ConnectorFactory {
         return result;
     }
 
-    public Result writer(ConnectorConfig config, Map<String, String> command, int threadSize, List<Field> fields, List<Map<String, Object>> data) {
+    public Result writer(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> data) {
         Connector connector = getConnector(config.getConnectorType());
-        Result result = connector.writer(config, command, threadSize, fields, data);
+        Result result = connector.writer(config, command, fields, data);
         Assert.notNull(result, "Connector writer result can not null");
         return result;
     }

+ 56 - 14
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -13,6 +13,7 @@ import org.dbsyncer.connector.util.DatabaseUtil;
 import org.dbsyncer.connector.util.JDBCUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.BatchPreparedStatementSetter;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.util.Assert;
 
@@ -147,13 +148,58 @@ public abstract class AbstractDatabaseConnector implements Database {
     }
 
     @Override
-    public Result writer(ConnectorConfig config, Map<String, String> command, int threadSize, List<Field> fields, List<Map<String, Object>> data) {
-        // TODO 实现批量写入
-        // 1、获取连接
-        // 2、获取insert SQL
-        // 3、设置参数
-        // 4、记录失败数量和异常信息
-        return null;
+    public Result writer(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> data) {
+        // 1、获取select SQL
+        String insertSql = command.get(SqlBuilderEnum.INSERT.getName());
+        Assert.hasText(insertSql, "插入语句不能为空.");
+        if (CollectionUtils.isEmpty(fields)) {
+            logger.error("writer fields can not be empty.");
+            throw new ConnectorException(String.format("writer fields can not be empty."));
+        }
+        if (CollectionUtils.isEmpty(data)) {
+            logger.error("writer data can not be empty.");
+            return new Result("writer data can not be empty.");
+        }
+        final int size = data.size();
+        final int fSize = fields.size();
+
+        DatabaseConfig cfg = (DatabaseConfig) config;
+        JdbcTemplate jdbcTemplate = null;
+        Result result = new Result();
+        try {
+            // 2、获取连接
+            jdbcTemplate = getJdbcTemplate(cfg);
+
+            // 3、设置参数
+            int[] batchUpdate = jdbcTemplate.batchUpdate(insertSql, new BatchPreparedStatementSetter() {
+                @Override
+                public void setValues(PreparedStatement preparedStatement, int i) throws SQLException {
+                    batchRowsSetter(preparedStatement, fields, fSize, data.get(i));
+                }
+
+                @Override
+                public int getBatchSize() {
+                    return size;
+                }
+            });
+
+            // 4、返回结果集
+            int length = batchUpdate.length;
+            for (int i = 0; i < length; i++) {
+                // TODO oracle返回值可能不一样
+                if (0 == batchUpdate[i]) {
+                    result.getFail().getAndIncrement();
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+            result.setError(e.getMessage());
+            result.getFail().set(size);
+        } finally {
+            // 释放连接
+            this.close(jdbcTemplate);
+        }
+        return result;
     }
 
     @Override
@@ -318,18 +364,14 @@ public abstract class AbstractDatabaseConnector implements Database {
     /**
      * @param ps     参数构造器
      * @param fields 同步字段,例如[{name=ID, type=4}, {name=NAME, type=12}]
+     * @param fSize  同步字段个数
      * @param row    同步字段对应的值,例如{ID=123, NAME=张三11}
      */
-    private void batchRowsSetter(PreparedStatement ps, List<Field> fields, Map<String, Object> row) {
-        if (CollectionUtils.isEmpty(fields)) {
-            logger.error("Rows fields can not be empty.");
-            throw new ConnectorException(String.format("Rows fields can not be empty."));
-        }
-        int fieldSize = fields.size();
+    private void batchRowsSetter(PreparedStatement ps, List<Field> fields, int fSize, Map<String, Object> row) {
         Field f = null;
         int type;
         Object val = null;
-        for (int i = 0; i < fieldSize; i++) {
+        for (int i = 0; i < fSize; i++) {
             // 取出字段和对应值
             f = fields.get(i);
             type = f.getType();

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ldap/LdapConnector.java

@@ -59,7 +59,7 @@ public final class LdapConnector implements Ldap {
 	}
 
 	@Override
-	public Result writer(ConnectorConfig config, Map<String, String> command, int threadSize, List<Field> fields, List<Map<String, Object>> data) {
+	public Result writer(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> data) {
 		return null;
 	}
 

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/RedisConnector.java

@@ -63,7 +63,7 @@ public final class RedisConnector implements Redis {
     }
 
     @Override
-    public Result writer(ConnectorConfig config, Map<String, String> command, int threadSize, List<Field> fields, List<Map<String, Object>> data) {
+    public Result writer(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> data) {
         return null;
     }
 

+ 23 - 15
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -8,10 +8,7 @@ import org.dbsyncer.common.task.Task;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.config.MetaInfo;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
@@ -186,15 +183,8 @@ public class ParserFactory implements Parser {
         int threadSize = mapping.getThreadNum();
 
         for (; ; ) {
-            // TODO 模拟测试
-            logger.info("模拟迁移5s");
-            try {
-                TimeUnit.SECONDS.sleep(5);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-
             if (!task.isRunning()) {
+                logger.warn("任务被终止:{}", metaId);
                 break;
             }
 
@@ -203,6 +193,8 @@ public class ParserFactory implements Parser {
             Result reader = connectorFactory.reader(sConfig, command, pageIndex, pageSize);
             List<Map<String, Object>> data = reader.getData();
             if (CollectionUtils.isEmpty(data)) {
+                params.clear();
+                logger.info("完成任务:{}, 全量同步表{}>>表{}", metaId, sTableName, tTableName);
                 break;
             }
 
@@ -217,15 +209,14 @@ public class ParserFactory implements Parser {
             pluginFactory.convert(plugin, data, target);
 
             // 5、写入目标源
-            Result writer = connectorFactory.writer(tConfig, command, threadSize, picker.getTargetFields(), target);
+            Result writer = executeBatch(tConfig, command, picker.getTargetFields(), target, threadSize);
 
             // 6、更新结果
             flush(task, writer, target.size());
 
             // 7、更新分页数
-            params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(pageIndex++));
+            params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
         }
-        logger.info("完成任务:{}, 表[%s]写入到表[%s]", metaId, sTableName, tTableName);
     }
 
     /**
@@ -277,4 +268,21 @@ public class ParserFactory implements Parser {
         return connector.getConfig();
     }
 
+    /**
+     * 批量写入
+     *
+     * @param tConfig
+     * @param command
+     * @param targetFields
+     * @param target
+     * @param threadSize
+     * @return
+     */
+    private Result executeBatch(ConnectorConfig tConfig, Map<String, String> command, List<Field> targetFields, List<Map<String, Object>> target, int threadSize) {
+
+        // TODO 多线程run
+
+        return connectorFactory.writer(tConfig, command, targetFields, target);
+    }
+
 }