Browse Source

add thread pooll

AE86 5 years ago
parent
commit
014ebce4c8

+ 2 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -101,6 +101,8 @@ public class MappingChecker extends AbstractChecker implements ApplicationContex
         }
 
         // 全量配置
+        String readNum = params.get("readNum");
+        mapping.setReadNum(NumberUtils.toInt(readNum, mapping.getReadNum()));
         String threadNum = params.get("threadNum");
         mapping.setThreadNum(NumberUtils.toInt(threadNum, mapping.getThreadNum()));
         String batchNum = params.get("batchNum");

+ 61 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser;
 
 import org.apache.commons.lang.math.NumberUtils;
+import org.apache.commons.lang.math.RandomUtils;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.event.RefreshEvent;
 import org.dbsyncer.common.task.Result;
@@ -27,13 +28,16 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.*;
 
 /**
  * @author AE86
@@ -178,8 +182,9 @@ public class ParserFactory implements Parser {
         // 检查分页参数
         Map<String, String> params = getMeta(metaId).getMap();
         params.putIfAbsent(ParserEnum.PAGE_INDEX.getCode(), ParserEnum.PAGE_INDEX.getDefaultValue());
-        int pageSize = mapping.getBatchNum();
+        int pageSize = mapping.getReadNum();
         int threadSize = mapping.getThreadNum();
+        int batchSize = mapping.getBatchNum();
 
         for (; ; ) {
             if (!task.isRunning()) {
@@ -208,7 +213,7 @@ public class ParserFactory implements Parser {
             pluginFactory.convert(plugin, data, target);
 
             // 5、写入目标源
-            Result writer = executeBatch(tConfig, command, picker.getTargetFields(), target, threadSize);
+            Result writer = executeBatch(tConfig, command, picker.getTargetFields(), target, threadSize, batchSize);
 
             // 6、更新结果
             flush(task, writer, target.size());
@@ -275,13 +280,65 @@ public class ParserFactory implements Parser {
      * @param targetFields
      * @param target
      * @param threadSize
+     * @param batchSize
      * @return
      */
-    private Result executeBatch(ConnectorConfig tConfig, Map<String, String> command, List<Field> targetFields, List<Map<String, Object>> target, int threadSize) {
+    private Result executeBatch(ConnectorConfig tConfig, Map<String, String> command, List<Field> targetFields, List<Map<String, Object>> target, int threadSize, int batchSize) {
+        // TODO 拆分任务
+        // 总数
+        int total = target.size();
+        int taskSize = 1;
 
-        // TODO 多线程run
 
+        // 单次任务
+        if(taskSize <= 1){
+            return connectorFactory.writer(tConfig, command, targetFields, target);
+        }
+
+        // TODO 批量任务走线程池
         return connectorFactory.writer(tConfig, command, targetFields, target);
     }
 
+    private ThreadPoolTaskExecutor getThreadPoolTaskExecutor(int threadSize){
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(threadSize);
+        executor.setMaxPoolSize(threadSize * 2);
+        executor.setQueueCapacity(50);
+        executor.setKeepAliveSeconds(30);
+        executor.setAwaitTerminationSeconds(30);
+        executor.setThreadNamePrefix("ParserExecutor");
+        executor.setWaitForTasksToCompleteOnShutdown(true);
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
+        return executor;
+    }
+
+    public static void main(String[] args) {
+        int threadSize = 10;
+
+        ParserFactory factory = new ParserFactory();
+
+        ThreadPoolTaskExecutor executor = factory.getThreadPoolTaskExecutor(threadSize);
+        CountDownLatch latch = new CountDownLatch(threadSize);
+        for (int i = 0; i < threadSize; i++) {
+            executor.execute(() -> {
+                try {
+                    TimeUnit.SECONDS.sleep(RandomUtils.nextInt(5));
+                    System.out.println(String.format("%s: %s完成", LocalDateTime.now(), Thread.currentThread().getName()));
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        executor.shutdown();
+        try {
+            latch.wait();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        System.out.println("ok");
+
+    }
+
 }

+ 11 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Mapping.java

@@ -40,6 +40,9 @@ public class Mapping extends AbstractConfigModel {
     // 元信息ID
     private String metaId;
 
+    // 一次最多读取数
+    private int readNum = 10000;
+
     // 批量数
     private int batchNum = 200;
 
@@ -106,6 +109,14 @@ public class Mapping extends AbstractConfigModel {
         this.metaId = metaId;
     }
 
+    public int getReadNum() {
+        return readNum;
+    }
+
+    public void setReadNum(int readNum) {
+        this.readNum = readNum;
+    }
+
     public int getBatchNum() {
         return batchNum;
     }