Przeglądaj źródła

!246 when 线程池隔离
Merge pull request !246 from luoyi/dev

铂赛东 1 rok temu
rodzic
commit
e50082b7d6

+ 49 - 24
liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java

@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * 并发策略执行器抽象类
@@ -82,45 +83,69 @@ public abstract class ParallelStrategyExecutor {
     }
 
     /**
-     * 获取所有任务 CompletableFuture 集合
-     * @param whenCondition
+     * 过滤 WHEN 待执行任务
+     * @param executableList 所有任务列表
      * @param slotIndex
      * @return
      */
-    protected List<CompletableFuture<WhenFutureObj>> getWhenAllTaskList(WhenCondition whenCondition, Integer slotIndex) {
+    protected Stream<Executable> filterWhenTaskList(List<Executable> executableList, Integer slotIndex) {
+        // 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了
+        // 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了
+        return executableList.stream()
+                .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
+                .filter(executable -> {
+                    try {
+                        return executable.isAccess(slotIndex);
+                    } catch (Exception e) {
+                        LOG.error("there was an error when executing the when component isAccess", e);
+                        return false;
+                    }
+                });
+    }
 
-        String currChainName = whenCondition.getCurrChainId();
+    /**
+     * 获取 WHEN 所需线程池
+     * @param whenCondition
+     * @return
+     */
+    protected ExecutorService getWhenExecutorService(WhenCondition whenCondition) {
 
         LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
 
-        // 如果设置了线程池隔离,则每个when都会有对应的线程池,这是为了避免多层嵌套时如果线程池数量不够时出现单个线程池死锁。用线程池隔离的方式会更加好
-        // 如果when没有超多层的嵌套,还是用默认的比较好。
-        // 默认设置不隔离。也就是说,默认情况是一个线程池类一个实例,如果什么都不配置,那也就是在when的情况下,全局一个线程池。
+        // 如果设置了线程池隔离,则每个 when 都会有对应的线程池,这是为了避免多层嵌套时如果线程池数量不够时出现单个线程池死锁。用线程池隔离的方式会更加好
+        // 如果 when 没有超多层的嵌套,还是用默认的比较好。
+        // 默认设置不隔离。也就是说,默认情况是一个线程池类一个实例,如果什么都不配置,那也就是在 when 的情况下,全局一个线程池。
         ExecutorService parallelExecutor;
-        if (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())){
+
+        if (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())) {
             parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutorWithHash(whenCondition.getThreadExecutorClass(), String.valueOf(whenCondition.hashCode()));
-        }else{
+        } else {
             parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
         }
 
+        return parallelExecutor;
+
+    }
+
+    /**
+     * 获取所有任务 CompletableFuture 集合
+     * @param whenCondition
+     * @param slotIndex
+     * @return
+     */
+    protected List<CompletableFuture<WhenFutureObj>> getWhenAllTaskList(WhenCondition whenCondition, Integer slotIndex) {
+
+        String currChainName = whenCondition.getCurrChainId();
+
         // 设置 whenCondition 参数
         setWhenConditionParams(whenCondition);
 
-        // 这里主要是做了封装 CompletableFuture 对象,用 lumbda 表达式做了很多事情,这句代码要仔细理清
-        // 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了
-        // 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了
-        // 3.根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List<CompletableFuture<WhenFutureObj>>
-        List<CompletableFuture<WhenFutureObj>> completableFutureList = whenCondition.getExecutableList()
-                .stream()
-                .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
-                .filter(executable -> {
-                    try {
-                        return executable.isAccess(slotIndex);
-                    } catch (Exception e) {
-                        LOG.error("there was an error when executing the when component isAccess", e);
-                        return false;
-                    }
-                })
+        // 获取 WHEN 所需线程池
+        ExecutorService parallelExecutor = getWhenExecutorService(whenCondition);
+
+        // 这里主要是做了封装 CompletableFuture 对象,用 lambda 表达式做了很多事情,这句代码要仔细理清
+        // 根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List<CompletableFuture<WhenFutureObj>>
+        List<CompletableFuture<WhenFutureObj>> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex)
                 .map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex))
                 .collect(Collectors.toList());
 

+ 3 - 16
liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java

@@ -1,11 +1,8 @@
 package com.yomahub.liteflow.flow.parallel.strategy;
 
 import cn.hutool.core.collection.CollUtil;
-import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
-import com.yomahub.liteflow.flow.element.condition.PreCondition;
 import com.yomahub.liteflow.flow.element.condition.WhenCondition;
 import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
-import com.yomahub.liteflow.thread.ExecutorHelper;
 
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
@@ -27,8 +24,8 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
         // 设置 whenCondition 参数
         this.setWhenConditionParams(whenCondition);
 
-        // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor 是唯一的
-        ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
+        // 获取 WHEN 所需线程池
+        ExecutorService parallelExecutor = getWhenExecutorService(whenCondition);
 
         // 指定完成的任务
         CompletableFuture<?> specifyTask;
@@ -43,17 +40,7 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
         List<CompletableFuture<WhenFutureObj>> allTaskList = new ArrayList<>();
 
         // 遍历 when 所有 node,进行筛选及处理
-        whenCondition.getExecutableList()
-                .stream()
-                .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
-                .filter(executable -> {
-                    try {
-                        return executable.isAccess(slotIndex);
-                    } catch (Exception e) {
-                        LOG.error("there was an error when executing the when component isAccess", e);
-                        return false;
-                    }
-                })
+        filterWhenTaskList(whenCondition.getExecutableList(), slotIndex)
                 .forEach(executable -> {
                     // 处理 task,封装成 CompletableFuture 对象
                     CompletableFuture<WhenFutureObj> completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex);