AE86 3 سال پیش
والد
کامیت
7d5edc94d2

+ 34 - 29
dbsyncer-common/src/main/java/org/dbsyncer/common/model/Result.java

@@ -1,54 +1,59 @@
 package org.dbsyncer.common.model;
 
+import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
 
-public class Result {
+public class Result<T> {
 
-    // 读取数据
-    private List<Map> data;
+    // 成功数据
+    private List<T> successData = new LinkedList<>();
 
     // 错误数据
-    private Queue<Map> failData;
-
-    // 错误数
-    private AtomicLong fail;
+    private List<T> failData = new LinkedList<>();
 
     // 错误日志
-    private StringBuffer error;
+    private StringBuffer error = new StringBuffer();
 
-    public Result() {
-        init();
-    }
+    private final Object LOCK = new Object();
 
-    public Result(List<Map> data) {
-        init();
-        this.data = data;
+    public Result() {
     }
 
-    private void init(){
-        this.failData = new ConcurrentLinkedQueue<>();
-        this.fail = new AtomicLong(0);
-        this.error = new StringBuffer();
+    public Result(List<T> data) {
+        this.successData.addAll(data);
     }
 
-    public List<Map> getData() {
-        return data;
+    public List<T> getSuccessData() {
+        return successData;
     }
 
-    public Queue<Map> getFailData() {
+    public List<T> getFailData() {
         return failData;
     }
 
-    public AtomicLong getFail() {
-        return fail;
-    }
-
     public StringBuffer getError() {
         return error;
     }
 
+    /**
+     * 线程安全添加集合
+     *
+     * @param failData
+     */
+    public void addFailData(List failData) {
+        synchronized (LOCK) {
+            this.failData.addAll(failData);
+        }
+    }
+
+    /**
+     * 线程安全添加集合
+     *
+     * @param successData
+     */
+    public void addSuccessData(List successData) {
+        synchronized (LOCK) {
+            this.successData.addAll(successData);
+        }
+    }
 }

+ 23 - 11
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -98,7 +98,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         List<Map<String, Object>> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
 
         // 4、返回结果集
-        return new Result(new ArrayList<>(list));
+        return new Result(list);
     }
 
     @Override
@@ -130,10 +130,11 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         final int size = data.size();
         final int fSize = fields.size();
         Result result = new Result();
+        int[] execute = null;
         try {
             // 2、设置参数
-            connectorMapper.execute(databaseTemplate -> {
-                databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
+            execute = connectorMapper.execute(databaseTemplate -> {
+                int[] batchUpdate = databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
                     @Override
                     public void setValues(PreparedStatement preparedStatement, int i) {
                         batchRowsSetter(databaseTemplate.getConnection(), preparedStatement, fields, fSize, data.get(i));
@@ -144,13 +145,25 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
                         return size;
                     }
                 });
-                return true;
+                return batchUpdate;
             });
         } catch (Exception e) {
-            // 记录错误数据
-            result.getFail().set(size);
-            result.getError().append(e.getMessage()).append(System.lineSeparator());
-            logger.error(e.getMessage());
+            result.getError().append("SQL:").append(executeSql).append(System.lineSeparator())
+                    .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
+            logger.error(result.getError().toString());
+        }
+
+        if (null != execute) {
+            int batchSize = execute.length;
+            for (int i = 0; i < batchSize; i++) {
+                if (execute[i] == 0) {
+                    result.getFailData().add(data.get(i));
+                    result.getError().append("SQL:").append(executeSql).append(System.lineSeparator())
+                            .append("DATA:").append(data.get(i)).append(System.lineSeparator());
+                    continue;
+                }
+                result.getSuccessData().add(data.get(i));
+            }
         }
         return result;
     }
@@ -191,11 +204,10 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             // 记录错误数据
             if (!config.isForceUpdate()) {
                 result.getFailData().add(data);
-                result.getFail().set(1);
                 result.getError().append("SQL:").append(sql).append(System.lineSeparator())
                         .append("DATA:").append(data).append(System.lineSeparator())
                         .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
-                logger.error("SQL:{}, DATA:{}, ERROR:{}", sql, data, e.getMessage());
+                logger.error(result.getError().toString());
             }
         }
 
@@ -506,7 +518,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     private boolean existRow(DatabaseConnectorMapper connectorMapper, String sql, Object value) {
         int rowNum = 0;
         try {
-            rowNum = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, new Object[]{value}, Integer.class));
+            rowNum = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, new Object[] {value}, Integer.class));
         } catch (Exception e) {
             logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, value);
         }

+ 2 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -160,7 +160,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             for (SearchHit hit : searchHits) {
                 list.add(hit.getSourceAsMap());
             }
-            return new Result(new ArrayList<>(list));
+            return new Result(list);
         } catch (IOException e) {
             logger.error(e.getMessage());
             throw new ConnectorException(e.getMessage());
@@ -193,7 +193,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             }
         } catch (Exception e) {
             // 记录错误数据
-            result.getFail().set(data.size());
+            result.addFailData(data);
             result.getError().append(e.getMessage()).append(System.lineSeparator());
             logger.error(e.getMessage());
         }
@@ -331,7 +331,6 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         } catch (Exception e) {
             // 记录错误数据
             result.getFailData().add(data);
-            result.getFail().set(1);
             result.getError().append("INDEX:").append(config.getIndex()).append(System.lineSeparator())
                     .append("TYPE:").append(config.getType()).append(System.lineSeparator())
                     .append("ID:").append(id).append(System.lineSeparator())

+ 1 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -85,7 +85,7 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
             data.forEach(row -> connectorMapper.getConnection().send(topic, String.valueOf(row.get(pk)), row));
         } catch (Exception e) {
             // 记录错误数据
-            result.getFail().set(data.size());
+            result.addFailData(data);
             result.getError().append(e.getMessage()).append(System.lineSeparator());
             logger.error(e.getMessage());
         }
@@ -103,7 +103,6 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
         } catch (Exception e) {
             // 记录错误数据
             result.getFailData().add(data);
-            result.getFail().set(data.size());
             result.getError().append(e.getMessage()).append(System.lineSeparator());
             logger.error(e.getMessage());
         }

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -100,7 +100,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
         int pageIndex = 1;
         for (; ; ) {
             Result reader = connectorFactory.reader(connectionMapper, new ReaderConfig(point.getCommand(), point.getArgs(), pageIndex++, readNum));
-            List<Map> data = reader.getData();
+            List<Map> data = reader.getSuccessData();
             if (CollectionUtils.isEmpty(data)) {
                 break;
             }

+ 8 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -259,7 +259,7 @@ public class ParserFactory implements Parser {
             // 1、获取数据源数据
             int pageIndex = Integer.parseInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
             Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), pageIndex, pageSize));
-            List<Map> data = reader.getData();
+            List<Map> data = reader.getSuccessData();
             if (CollectionUtils.isEmpty(data)) {
                 params.clear();
                 logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
@@ -267,7 +267,7 @@ public class ParserFactory implements Parser {
             }
 
             // 2、映射字段
-            List<Map> target = picker.pickData(reader.getData());
+            List<Map> target = picker.pickData(data);
 
             // 3、参数转换
             ConvertUtil.convert(group.getConvert(), target);
@@ -279,7 +279,7 @@ public class ParserFactory implements Parser {
             Result writer = writeBatch(tConnectorMapper, command, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
 
             // 6、更新结果
-            flush(task, writer, target);
+            flush(task, writer);
 
             // 7、更新分页数
             params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
@@ -349,12 +349,11 @@ public class ParserFactory implements Parser {
             taskExecutor.execute(() -> {
                 try {
                     Result w = connectorFactory.writer(connectorMapper, new WriterBatchConfig(event, command, fields, data));
-                    // CAS
-                    result.getFailData().addAll(w.getFailData());
-                    result.getFail().getAndAdd(w.getFail().get());
+                    result.addSuccessData(w.getSuccessData());
+                    result.addFailData(w.getFailData());
                     result.getError().append(w.getError());
                 } catch (Exception e) {
-                    result.getError().append(e.getMessage()).append(System.lineSeparator());
+                    logger.error(e.getMessage());
                 } finally {
                     latch.countDown();
                 }
@@ -373,10 +372,9 @@ public class ParserFactory implements Parser {
      *
      * @param task
      * @param writer
-     * @param data
      */
-    private void flush(Task task, Result writer, List<Map> data) {
-        flushStrategy.flushFullData(task.getId(), writer, ConnectorConstant.OPERTION_INSERT, data);
+    private void flush(Task task, Result writer) {
+        flushStrategy.flushFullData(task.getId(), writer, ConnectorConstant.OPERTION_INSERT);
 
         // 发布刷新事件给FullExtractor
         task.setEndTime(Instant.now().toEpochMilli());

+ 16 - 19
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -2,13 +2,11 @@ package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.parser.model.Meta;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.Assert;
 
-import java.util.List;
-import java.util.Map;
-
 /**
  * @author AE86
  * @version 1.0.0
@@ -23,31 +21,30 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     private CacheService cacheService;
 
     @Override
-    public void flushFullData(String metaId, Result result, String event, List<Map> dataList) {
-        flush(metaId, result, event, dataList);
+    public void flushFullData(String metaId, Result result, String event) {
+        flush(metaId, result, event);
     }
 
     @Override
-    public void flushIncrementData(String metaId, Result result, String event, List<Map> dataList) {
-        flush(metaId, result, event, dataList);
+    public void flushIncrementData(String metaId, Result result, String event) {
+        flush(metaId, result, event);
     }
 
-    protected void flush(String metaId, Result result, String event, List<Map> dataList) {
-        refreshTotal(metaId, result, dataList);
+    protected void flush(String metaId, Result result, String event) {
+        refreshTotal(metaId, result);
 
-        boolean success = 0 == result.getFail().get();
-        if (!success) {
-            dataList.clear();
-            dataList.addAll(result.getFailData());
+        if (!CollectionUtils.isEmpty(result.getFailData())) {
+            flushService.asyncWrite(metaId, event, false, result.getFailData(), result.getError().toString());
+        }
+        if (!CollectionUtils.isEmpty(result.getSuccessData())) {
+            flushService.asyncWrite(metaId, event, true, result.getSuccessData(), "");
         }
-        flushService.asyncWrite(metaId, event, success, dataList, result.getError().toString());
     }
 
-    protected void refreshTotal(String metaId, Result writer, List<Map> dataList){
-        long fail = writer.getFail().get();
+    protected void refreshTotal(String metaId, Result writer) {
         Meta meta = getMeta(metaId);
-        meta.getFail().getAndAdd(fail);
-        meta.getSuccess().getAndAdd(dataList.size() - fail);
+        meta.getFail().getAndAdd(writer.getFailData().size());
+        meta.getSuccess().getAndAdd(writer.getSuccessData().size());
     }
 
     protected Meta getMeta(String metaId) {
@@ -57,4 +54,4 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
         return meta;
     }
 
-}
+}

+ 2 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushStrategy.java

@@ -2,9 +2,6 @@ package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.common.model.Result;
 
-import java.util.List;
-import java.util.Map;
-
 /**
  * 记录同步数据策略
  *
@@ -20,9 +17,8 @@ public interface FlushStrategy {
      * @param metaId
      * @param result
      * @param event
-     * @param dataList
      */
-    void flushFullData(String metaId, Result result, String event, List<Map> dataList);
+    void flushFullData(String metaId, Result result, String event);
 
     /**
      * 记录增量同步数据
@@ -30,8 +26,7 @@ public interface FlushStrategy {
      * @param metaId
      * @param result
      * @param event
-     * @param dataList
      */
-    void flushIncrementData(String metaId, Result result, String event, List<Map> dataList);
+    void flushIncrementData(String metaId, Result result, String event);
 
 }

+ 4 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/DisableFullFlushStrategy.java

@@ -1,14 +1,12 @@
 package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.parser.flush.AbstractFlushStrategy;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.springframework.beans.factory.annotation.Autowired;
 
-import java.util.List;
-import java.util.Map;
-
 /**
  * 不记录全量数据, 只记录增量同步数据, 将异常记录到系统日志中
  *
@@ -22,11 +20,11 @@ public final class DisableFullFlushStrategy extends AbstractFlushStrategy {
     private LogService logService;
 
     @Override
-    public void flushFullData(String metaId, Result result, String event, List<Map> dataList) {
+    public void flushFullData(String metaId, Result result, String event) {
         // 不记录全量数据,只统计成功失败总数
-        refreshTotal(metaId, result, dataList);
+        refreshTotal(metaId, result);
 
-        if (0 < result.getFail().get()) {
+        if (!CollectionUtils.isEmpty(result.getFailData())) {
             LogType logType = LogType.TableGroupLog.FULL_FAILED;
             logService.log(logType, "%s:%s", logType.getMessage(), result.getError().toString());
         }

+ 4 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -3,8 +3,8 @@ package org.dbsyncer.parser.flush.impl;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.parser.ParserFactory;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
-import org.dbsyncer.parser.flush.model.AbstractResponse;
 import org.dbsyncer.parser.flush.FlushStrategy;
+import org.dbsyncer.parser.flush.model.AbstractResponse;
 import org.dbsyncer.parser.flush.model.WriterRequest;
 import org.dbsyncer.parser.flush.model.WriterResponse;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -59,7 +59,8 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
 
     @Override
     protected void pull(WriterResponse response) {
-        Result result = parserFactory.writeBatch(response.getConnectorMapper(), response.getCommand(), response.getEvent(), response.getFields(), response.getDataList(), BATCH_SIZE);
-        flushStrategy.flushIncrementData(response.getMetaId(), result, response.getEvent(), response.getDataList());
+        Result result = parserFactory.writeBatch(response.getConnectorMapper(), response.getCommand(), response.getEvent(),
+                response.getFields(), response.getDataList(), BATCH_SIZE);
+        flushStrategy.flushIncrementData(response.getMetaId(), result, response.getEvent());
     }
 }