Jelajahi Sumber

代码优化

jason 6 bulan lalu
induk
melakukan
0a092c7c78

+ 3 - 2
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java

@@ -77,8 +77,9 @@ public class ForCondition extends LoopCondition {
                 //存储所有的并行执行子项的CompletableFuture
                 List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
                 //获取并行循环的线程池
-                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(this,
-                                                                                                           slotIndex);
+                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutorService(this,
+                                                                                                      slotIndex,
+                                                                                                      this.getConditionType());
                 for (int i = 0; i < forCount; i++){
                     //提交异步任务
                     CompletableFuture<LoopFutureObj> future =

+ 4 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java

@@ -170,7 +170,10 @@ public abstract class ParallelStrategyExecutor {
         this.setWhenConditionParams(whenCondition);
 
         // 获取 WHEN 所需线程池
-        ExecutorService parallelExecutor = getWhenExecutorService(whenCondition, slotIndex);
+        ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutorService(whenCondition,
+                                                                                              slotIndex,
+                                                                                              whenCondition.getConditionType());
+
 
         // 这里主要是做了封装 CompletableFuture 对象,用 lambda 表达式做了很多事情,这句代码要仔细理清
         // 根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List<CompletableFuture<WhenFutureObj>>

+ 5 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java

@@ -3,6 +3,7 @@ package com.yomahub.liteflow.flow.parallel.strategy;
 import cn.hutool.core.collection.CollUtil;
 import com.yomahub.liteflow.flow.element.condition.WhenCondition;
 import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
+import com.yomahub.liteflow.thread.ExecutorHelper;
 
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
@@ -26,7 +27,10 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
         this.setWhenConditionParams(whenCondition);
 
         // 获取 WHEN 所需线程池
-        ExecutorService parallelExecutor = getWhenExecutorService(whenCondition, slotIndex);
+        ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutorService(whenCondition,
+                                                                                              slotIndex,
+                                                                                              whenCondition.getConditionType());
+        ;
 
         // 指定完成的任务
         CompletableFuture<?> specifyTask;

+ 44 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorCondition.java

@@ -0,0 +1,44 @@
+package com.yomahub.liteflow.thread.ExecutorCondition;
+
+
+/**
+ * 执行器条件对象
+ */
+public class ExecutorCondition {
+    private final boolean conditionLevel;
+    private final boolean chainLevel;
+    private final String conditionExecutorClass;
+
+    private ExecutorCondition(
+            boolean conditionLevel,
+            boolean chainLevel,
+            String conditionExecutorClass) {
+        this.conditionLevel = conditionLevel;
+        this.chainLevel = chainLevel;
+        this.conditionExecutorClass = conditionExecutorClass;
+    }
+
+    public static ExecutorCondition create(
+            boolean conditionLevel,
+            boolean chainLevel,
+            String conditionExecutorClass
+    ) {
+        return new ExecutorCondition(
+                conditionLevel,
+                chainLevel,
+                conditionExecutorClass
+        );
+    }
+
+    public boolean isConditionLevel() {
+        return conditionLevel;
+    }
+
+    public boolean isChainLevel() {
+        return chainLevel;
+    }
+
+    public String getConditionExecutorClass() {
+        return conditionExecutorClass;
+    }
+}

+ 50 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorConditionBuilder.java

@@ -0,0 +1,50 @@
+package com.yomahub.liteflow.thread.ExecutorCondition;
+
+
+import cn.hutool.core.util.BooleanUtil;
+import cn.hutool.core.util.ObjectUtil;
+import com.yomahub.liteflow.enums.ConditionTypeEnum;
+import com.yomahub.liteflow.flow.element.Chain;
+import com.yomahub.liteflow.flow.element.Condition;
+import com.yomahub.liteflow.flow.element.condition.LoopCondition;
+import com.yomahub.liteflow.flow.element.condition.WhenCondition;
+import com.yomahub.liteflow.property.LiteflowConfig;
+
+public class ExecutorConditionBuilder {
+
+    /**
+     * 构建执行器条件
+     */
+    public static ExecutorCondition buildExecutorCondition(
+            Condition condition,
+            Chain chain,
+            LiteflowConfig liteflowConfig,
+            ConditionTypeEnum type) {
+
+        boolean conditionLevel;
+        String conditionExecutorClass;
+
+        switch (type) {
+            case TYPE_FOR:
+            case TYPE_WHILE:
+            case TYPE_ITERATOR:
+                LoopCondition loopCondition = (LoopCondition) condition;
+                conditionLevel = ObjectUtil.isNotEmpty(loopCondition.getThreadPoolExecutorClass());
+                conditionExecutorClass = loopCondition.getThreadPoolExecutorClass();
+                break;
+            case TYPE_WHEN:
+                WhenCondition whenCondition = (WhenCondition) condition;
+                conditionLevel = BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate());
+                conditionExecutorClass = whenCondition.getThreadExecutorClass();
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported condition type: " + type);
+        }
+
+        return ExecutorCondition.create(
+                conditionLevel,
+                ObjectUtil.isNotEmpty(chain.getThreadPoolExecutorClass()),
+                conditionExecutorClass
+        );
+    }
+}

+ 53 - 8
liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java

@@ -11,9 +11,11 @@ package com.yomahub.liteflow.thread;
 import cn.hutool.core.map.MapUtil;
 import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
+import com.yomahub.liteflow.enums.ConditionTypeEnum;
 import com.yomahub.liteflow.exception.ThreadExecutorServiceCreateException;
 import com.yomahub.liteflow.flow.FlowBus;
 import com.yomahub.liteflow.flow.element.Chain;
+import com.yomahub.liteflow.flow.element.Condition;
 import com.yomahub.liteflow.flow.element.condition.LoopCondition;
 import com.yomahub.liteflow.log.LFLog;
 import com.yomahub.liteflow.log.LFLoggerManager;
@@ -21,6 +23,8 @@ import com.yomahub.liteflow.property.LiteflowConfig;
 import com.yomahub.liteflow.property.LiteflowConfigGetter;
 import com.yomahub.liteflow.slot.DataBus;
 import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
+import com.yomahub.liteflow.thread.ExecutorCondition.ExecutorCondition;
+import com.yomahub.liteflow.thread.ExecutorCondition.ExecutorConditionBuilder;
 
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -104,17 +108,17 @@ public class ExecutorHelper {
 	}
 
 	// 构建when线程池 - clazz和condition的hash值共同作为缓存key
-	public ExecutorService buildWhenExecutorWithHash(String conditionHash) {
+	public ExecutorService buildWhenExecutorWithHash(String hash) {
 		LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
-		return buildWhenExecutorWithHash(liteflowConfig.getGlobalThreadPoolExecutorClass(), conditionHash);
+		return buildWhenExecutorWithHash(liteflowConfig.getGlobalThreadPoolExecutorClass(), hash);
 	}
 
 	// 构建when线程池 - clazz和condition的hash值共同作为缓存key
-	public ExecutorService buildWhenExecutorWithHash(String clazz, String conditionHash) {
+	public ExecutorService buildWhenExecutorWithHash(String clazz, String hash) {
 		if (StrUtil.isBlank(clazz)) {
-			return buildWhenExecutorWithHash(conditionHash);
+			return buildWhenExecutorWithHash(hash);
 		}
-		return getExecutorService(clazz, conditionHash);
+		return getExecutorService(clazz, hash);
 	}
 
 	// 构建默认的FlowExecutor线程池,用于execute2Future方法
@@ -168,13 +172,13 @@ public class ExecutorHelper {
 	/**
 	 * 根据线程执行构建者Class类名获取ExecutorService实例
 	 */
-	private ExecutorService getExecutorService(String clazz, String conditionHash) {
+	private ExecutorService getExecutorService(String clazz, String hash) {
 		try {
 			String key;
-			if (StrUtil.isBlank(conditionHash)){
+			if (StrUtil.isBlank(hash)) {
 				key = clazz;
 			}else{
-				key = StrUtil.format("{}_{}", clazz, conditionHash);
+				key = StrUtil.format("{}_{}", clazz, hash);
 			}
 
 			ExecutorService executorServiceFromCache = executorServiceMap.get(key);
@@ -201,4 +205,45 @@ public class ExecutorHelper {
 		}
 	}
 
+	/**
+	 * 构建执行器服务
+	 *
+	 * @param condition 条件对象(Loop或When条件)
+	 * @param slotIndex 槽索引
+	 * @param type      condition类型
+	 * @return ExecutorService
+	 */
+	public ExecutorService buildExecutorService(Condition condition, Integer slotIndex, ConditionTypeEnum type) {
+		ExecutorService executor;
+		LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
+		String chainId = DataBus.getSlot(slotIndex).getChainId();
+		Chain chain = FlowBus.getChain(chainId);
+
+		// 构建条件判断对象
+		ExecutorCondition execCondition = ExecutorConditionBuilder.buildExecutorCondition(
+				condition,
+				chain,
+				liteflowConfig,
+				type
+		);
+
+		// 根据条件选择执行器
+		if (execCondition.isConditionLevel()) {
+			// condition层级线程池
+			executor = getExecutorService(execCondition.getConditionExecutorClass(),
+										  String.valueOf(condition.hashCode()));
+
+		} else if (execCondition.isChainLevel()) {
+			// chain层级线程池
+			executor = getExecutorService(chain.getThreadPoolExecutorClass(),
+										  String.valueOf(chain.hashCode()));
+		} else {
+			// 全局线程池
+			executor = getExecutorService(liteflowConfig.getGlobalThreadPoolExecutorClass());
+		}
+
+		return executor;
+	}
+
+
 }