瀏覽代碼

optimize code

AE86 5 年之前
父節點
當前提交
202e174e62

+ 12 - 15
dbsyncer-common/src/main/java/org/dbsyncer/common/task/Result.java

@@ -2,14 +2,22 @@ package org.dbsyncer.common.task;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class Result {
 
+    // 读取数据
     private List<Map<String, Object>> data;
 
+    // 错误数据
+    private Queue<Map<String, Object>> failData;
+
+    // 错误数
     private AtomicLong fail;
 
+    // 错误日志
     private StringBuffer error;
 
     public Result() {
@@ -17,16 +25,12 @@ public class Result {
     }
 
     public Result(List<Map<String, Object>> data) {
-        this.data = data;
         init();
-    }
-
-    public Result(StringBuffer error) {
-        this.fail = new AtomicLong(0);
-        this.error = error;
+        this.data = data;
     }
 
     private void init(){
+        this.failData = new ConcurrentLinkedQueue<>();
         this.fail = new AtomicLong(0);
         this.error = new StringBuffer();
     }
@@ -35,23 +39,16 @@ public class Result {
         return data;
     }
 
-    public void setData(List<Map<String, Object>> data) {
-        this.data = data;
+    public Queue<Map<String, Object>> getFailData() {
+        return failData;
     }
 
     public AtomicLong getFail() {
         return fail;
     }
 
-    public void setFail(AtomicLong fail) {
-        this.fail = fail;
-    }
-
     public StringBuffer getError() {
         return error;
     }
 
-    public void setError(StringBuffer error) {
-        this.error = error;
-    }
 }

+ 7 - 14
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -8,7 +8,6 @@ import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.enums.SetterEnum;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
-import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.util.DatabaseUtil;
 import org.dbsyncer.connector.util.JDBCUtil;
 import org.slf4j.Logger;
@@ -154,11 +153,11 @@ public abstract class AbstractDatabaseConnector implements Database {
         Assert.hasText(insertSql, "插入语句不能为空.");
         if (CollectionUtils.isEmpty(fields)) {
             logger.error("writer fields can not be empty.");
-            throw new ConnectorException(String.format("writer fields can not be empty."));
+            throw new ConnectorException("writer fields can not be empty.");
         }
         if (CollectionUtils.isEmpty(data)) {
             logger.error("writer data can not be empty.");
-            return new Result(new StringBuffer("writer data can not be empty."));
+            throw new ConnectorException("writer data can not be empty.");
         }
         final int size = data.size();
         final int fSize = fields.size();
@@ -171,7 +170,7 @@ public abstract class AbstractDatabaseConnector implements Database {
             jdbcTemplate = getJdbcTemplate(cfg);
 
             // 3、设置参数
-            int[] batchUpdate = jdbcTemplate.batchUpdate(insertSql, new BatchPreparedStatementSetter() {
+            jdbcTemplate.batchUpdate(insertSql, new BatchPreparedStatementSetter() {
                 @Override
                 public void setValues(PreparedStatement preparedStatement, int i) throws SQLException {
                     batchRowsSetter(preparedStatement, fields, fSize, data.get(i));
@@ -183,18 +182,12 @@ public abstract class AbstractDatabaseConnector implements Database {
                 }
             });
 
-            // 4、返回结果集
-            int length = batchUpdate.length;
-            for (int i = 0; i < length; i++) {
-                // TODO oracle返回值可能不一样
-                if (0 == batchUpdate[i]) {
-                    result.getFail().getAndIncrement();
-                }
-            }
         } catch (Exception e) {
-            logger.error(e.getMessage());
-            result.getError().append(e.getMessage());
+            // 记录错误数据
+            result.getFailData().addAll(data);
             result.getFail().set(size);
+            result.getError().append(e.getMessage()).append("\r\n");
+            logger.error(e.getMessage());
         } finally {
             // 释放连接
             this.close(jdbcTemplate);

+ 4 - 37
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -1,7 +1,5 @@
 package org.dbsyncer.parser;
 
-import org.apache.commons.lang.math.NumberUtils;
-import org.apache.commons.lang.math.RandomUtils;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.event.RefreshEvent;
 import org.dbsyncer.common.task.Result;
@@ -31,12 +29,10 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
-import java.time.LocalDateTime;
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
  * @author AE86
@@ -192,12 +188,12 @@ public class ParserFactory implements Parser {
             }
 
             // 1、获取数据源数据
-            int pageIndex = NumberUtils.toInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
+            int pageIndex = Integer.parseInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
             Result reader = connectorFactory.reader(sConfig, command, pageIndex, pageSize);
             List<Map<String, Object>> data = reader.getData();
             if (CollectionUtils.isEmpty(data)) {
                 params.clear();
-                logger.info("完成任务:{}, 全量同步表{}>>表{}", metaId, sTableName, tTableName);
+                logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
                 break;
             }
 
@@ -310,11 +306,11 @@ public class ParserFactory implements Parser {
                     try {
                         Result w = parallelTask(batchSize, queue, config, command, fields);
                         // CAS
+                        result.getFailData().addAll(w.getFailData());
                         result.getFail().getAndAdd(w.getFail().get());
-                        result.getError().append(w.getError()).append("\r\n");
+                        result.getError().append(w.getError());
                     } catch (Exception e) {
                         result.getError().append(e.getMessage()).append("\r\n");
-                        logger.error(e.getMessage());
                     } finally {
                         latch.countDown();
                     }
@@ -358,33 +354,4 @@ public class ParserFactory implements Parser {
         return executor;
     }
 
-    public static void main(String[] args) {
-        int threadSize = 10;
-
-        ParserFactory factory = new ParserFactory();
-
-        final ThreadPoolTaskExecutor executor = factory.getThreadPoolTaskExecutor(threadSize);
-        CountDownLatch latch = new CountDownLatch(threadSize);
-        for (int i = 0; i < threadSize; i++) {
-            executor.execute(() -> {
-                try {
-                    TimeUnit.SECONDS.sleep(RandomUtils.nextInt(5));
-                    System.out.println(String.format("%s: %s完成", LocalDateTime.now(), Thread.currentThread().getName()));
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                } finally {
-                    latch.countDown();
-                }
-            });
-        }
-        try {
-            latch.await();
-            executor.shutdown();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        System.out.println("ok");
-
-    }
-
 }