1
0
AE86 1 жил өмнө
parent
commit
f0903cf9ad

+ 2 - 3
dbsyncer-connector/dbsyncer-connector-base/src/test/java/ConnectionTest.java

@@ -235,7 +235,7 @@ public class ConnectionTest {
 
         long begin = Instant.now().toEpochMilli();
         final int threadSize = 10;
-        final int num = 10;
+        final int num = 100;
         final int batch = 2;
         final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
         final CountDownLatch latch = new CountDownLatch(threadSize);
@@ -244,7 +244,6 @@ public class ConnectionTest {
         final String delete = "DELETE from `test`.`vote_records_test` WHERE `id` = ?";
 
         // 模拟单表增删改事件,每个事件间隔2条数据
-        logger.info("-----------------模拟总数:{}", threadSize * batch * num * 3);
         for (int i = 0; i < threadSize; i++) {
             final int offset = i == 0 ? 0 : i * threadSize;
             pool.submit(() -> {
@@ -266,7 +265,7 @@ public class ConnectionTest {
             logger.error(e.getMessage());
         }
         pool.shutdown();
-        logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
+        logger.info("总数:{}, 耗时:{}秒", (threadSize * batch * num * 3), (Instant.now().toEpochMilli() - begin) / 1000);
     }
 
     private void mockData(DatabaseConnectorInstance connectorInstance, int num, int batch, int offset, String insert, String update, String delete) {

+ 21 - 9
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
 package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.common.config.BufferActuatorConfig;
@@ -122,6 +125,23 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
      */
     protected abstract void pull(Response response);
 
+    /**
+     * 批量处理分区数据
+     *
+     * @param map
+     */
+    protected void process(Map<String, Response> map){
+        map.forEach((key, response) -> {
+            long now = Instant.now().toEpochMilli();
+            try {
+                pull(response);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+            logger.info("[{}{}]{}, {}ms", key, response.getSuffixName(), response.getTaskSize(), (Instant.now().toEpochMilli() - now));
+        });
+    }
+
     /**
      * 提交任务失败
      *
@@ -190,15 +210,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
             }
         }
 
-        map.forEach((key, response) -> {
-            long now = Instant.now().toEpochMilli();
-            try {
-                pull(response);
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-            }
-            logger.info("[{}{}]{}, {}ms", key, response.getSuffixName(), response.getTaskSize(), (Instant.now().toEpochMilli() - now));
-        });
+        process(map);
         map.clear();
         map = null;
         batchCounter = null;