浏览代码

优化全量写入

AE86 3 年之前
父节点
当前提交
c269ab9859
共有 1 个文件被更改,包括 18 次插入21 次删除
  1. 18 21
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

+ 18 - 21
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -1,10 +1,8 @@
 package org.dbsyncer.parser;
 
 import org.dbsyncer.cache.CacheService;
-import org.dbsyncer.parser.event.FullRefreshEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Result;
-import org.dbsyncer.parser.model.Task;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
@@ -18,6 +16,7 @@ import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.listener.enums.QuartzFilterEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.enums.ParserEnum;
+import org.dbsyncer.parser.event.FullRefreshEvent;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.*;
@@ -39,8 +38,10 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import java.time.Instant;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 
@@ -405,15 +406,24 @@ public class ParserFactory implements Parser {
         // 批量任务, 拆分
         int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
 
-        // 转换为消息队列,并发写入
-        Queue<Map> queue = new ConcurrentLinkedQueue<>(target);
-
         final Result result = new Result();
         final CountDownLatch latch = new CountDownLatch(taskSize);
+        int fromIndex = 0;
+        int toIndex = batchSize;
         for (int i = 0; i < taskSize; i++) {
+            final List<Map> data;
+            if (toIndex > total) {
+                toIndex = fromIndex + (total % batchSize);
+                data = target.subList(fromIndex, toIndex);
+            } else {
+                data = target.subList(fromIndex, toIndex);
+                fromIndex += batchSize;
+                toIndex += batchSize;
+            }
+
             taskExecutor.execute(() -> {
                 try {
-                    Result w = parallelTask(batchSize, queue, connectorMapper, command, fields);
+                    Result w = connectorFactory.writer(connectorMapper, new WriterBatchConfig(command, fields, data));
                     // CAS
                     result.getFailData().addAll(w.getFailData());
                     result.getFail().getAndAdd(w.getFail().get());
@@ -433,17 +443,4 @@ public class ParserFactory implements Parser {
         return result;
     }
 
-    private Result parallelTask(int batchSize, Queue<Map> queue, ConnectorMapper connectorMapper, Map<String, String> command,
-                                List<Field> fields) {
-        List<Map> data = new ArrayList<>();
-        for (int j = 0; j < batchSize; j++) {
-            Map poll = queue.poll();
-            if (null == poll) {
-                break;
-            }
-            data.add(poll);
-        }
-        return connectorFactory.writer(connectorMapper, new WriterBatchConfig(command, fields, data));
-    }
-
 }