Просмотр исходного кода

新增chain-thread-pool-isolate控制when+异步循环的线程池隔离

jason 7 месяцев назад
Родитель
Сommit
a41a4d9a5b
16 измененных файлов с 160 добавлено и 11 удалено
  1. 3 0
      liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/WhenOperator.java
  2. 1 1
      liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java
  3. 1 1
      liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java
  4. 1 1
      liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java
  5. 11 2
      liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java
  6. 1 1
      liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java
  7. 32 1
      liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java
  8. 36 4
      liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java
  9. 28 0
      liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultChainExecutorBuilder.java
  10. 1 0
      liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowAutoConfiguration.java
  11. 23 0
      liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowProperty.java
  12. 1 0
      liteflow-solon-plugin/src/main/resources/META-INF/liteflow-default.properties
  13. 12 0
      liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java
  14. 1 0
      liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java
  15. 7 0
      liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json
  16. 1 0
      liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties

+ 3 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/WhenOperator.java

@@ -26,6 +26,9 @@ public class WhenOperator extends BaseOperator<WhenCondition> {
 			OperatorHelper.checkObjMustBeCommonTypeItem(obj);
 			whenCondition.addExecutable(OperatorHelper.convert(obj, Executable.class));
 			whenCondition.setThreadExecutorClass(liteflowConfig.getThreadExecutorClass());
+			if (liteflowConfig.getChainThreadPoolIsolate()) {
+				whenCondition.setThreadExecutorClass(liteflowConfig.getChainThreadExecutorClass());
+			}
 		}
 		return whenCondition;
 	}

+ 1 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java

@@ -77,7 +77,7 @@ public class ForCondition extends LoopCondition {
                 //存储所有的并行执行子项的CompletableFuture
                 List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
                 //获取并行循环的线程池
-                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor();
+                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(slotIndex);
                 for (int i = 0; i < forCount; i++){
                     //提交异步任务
                     CompletableFuture<LoopFutureObj> future =

+ 1 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java

@@ -80,7 +80,7 @@ public class IteratorCondition extends LoopCondition {
                 //存储所有的并行执行子项的CompletableFuture
                 List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
                 //获取并行循环的线程池
-                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor();
+                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(slotIndex);
                 while (it.hasNext()) {
                     Object itObj = it.next();
                     //提交异步任务

+ 1 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java

@@ -61,7 +61,7 @@ public class WhileCondition extends LoopCondition {
 			//并行循环逻辑
 			List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
 			//获取并行循环的线程池
-			ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor();
+			ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(slotIndex);
 			while (getWhileResult(slotIndex, index)){
 				CompletableFuture<LoopFutureObj> future =
 						CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, index), parallelExecutor);

+ 11 - 2
liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java

@@ -5,6 +5,8 @@ import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
 import com.yomahub.liteflow.enums.ParallelStrategyEnum;
 import com.yomahub.liteflow.exception.WhenExecuteException;
+import com.yomahub.liteflow.flow.FlowBus;
+import com.yomahub.liteflow.flow.element.Chain;
 import com.yomahub.liteflow.flow.element.Executable;
 import com.yomahub.liteflow.flow.element.Node;
 import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
@@ -122,7 +124,7 @@ public abstract class ParallelStrategyExecutor {
      * @param whenCondition
      * @return
      */
-    protected ExecutorService getWhenExecutorService(WhenCondition whenCondition) {
+    protected ExecutorService getWhenExecutorService(WhenCondition whenCondition, Integer slotIndex) {
 
         LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
 
@@ -133,6 +135,13 @@ public abstract class ParallelStrategyExecutor {
 
         if (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())) {
             parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutorWithHash(whenCondition.getThreadExecutorClass(), String.valueOf(whenCondition.hashCode()));
+        } else if (BooleanUtil.isTrue(liteflowConfig.getChainThreadPoolIsolate())) {
+            //chain 线程池隔离
+            String chainId = DataBus.getSlot(slotIndex).getChainId();
+            Chain chain = FlowBus.getChain(chainId);
+            parallelExecutor =
+                    ExecutorHelper.loadInstance().buildWhenExecutorWithHash(whenCondition.getThreadExecutorClass(),
+                                                                            String.valueOf(chain.hashCode()));
         } else {
             parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
         }
@@ -155,7 +164,7 @@ public abstract class ParallelStrategyExecutor {
         this.setWhenConditionParams(whenCondition);
 
         // 获取 WHEN 所需线程池
-        ExecutorService parallelExecutor = getWhenExecutorService(whenCondition);
+        ExecutorService parallelExecutor = getWhenExecutorService(whenCondition, slotIndex);
 
         // 这里主要是做了封装 CompletableFuture 对象,用 lambda 表达式做了很多事情,这句代码要仔细理清
         // 根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List<CompletableFuture<WhenFutureObj>>

+ 1 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java

@@ -26,7 +26,7 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
         this.setWhenConditionParams(whenCondition);
 
         // 获取 WHEN 所需线程池
-        ExecutorService parallelExecutor = getWhenExecutorService(whenCondition);
+        ExecutorService parallelExecutor = getWhenExecutorService(whenCondition, slotIndex);
 
         // 指定完成的任务
         CompletableFuture<?> specifyTask;

+ 32 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java

@@ -13,7 +13,6 @@ import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
 import com.yomahub.liteflow.enums.ParseModeEnum;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -45,6 +44,9 @@ public class LiteflowConfig {
 	// 并行线程执行器class路径
 	private String threadExecutorClass;
 
+	// chain线程执行器class路径
+	private String chainThreadExecutorClass;
+
 	// 异步线程最大等待秒数
 	@Deprecated
 	private Integer whenMaxWaitSeconds;
@@ -124,6 +126,11 @@ public class LiteflowConfig {
 	//脚本特殊设置选项
 	private Map<String, String> scriptSetting;
 
+	// chain线程池是否隔离
+	// 每一个chain里的when和异步循环合并起来都用单独的线程池。也就是说定义了多少个chain,就有多少个线程池
+	private Boolean chainThreadPoolIsolate;
+
+
 	public Boolean getEnableMonitorFile() {
 		return enableMonitorFile;
 	}
@@ -311,6 +318,18 @@ public class LiteflowConfig {
 		this.threadExecutorClass = threadExecutorClass;
 	}
 
+	public String getChainThreadExecutorClass() {
+		if (StrUtil.isBlank(chainThreadExecutorClass)) {
+			return "com.yomahub.liteflow.thread.LiteFlowDefaultChainExecutorBuilder";
+		} else {
+			return chainThreadExecutorClass;
+		}
+	}
+
+	public void setChainThreadExecutorClass(String threadExecutorClass) {
+		this.threadExecutorClass = threadExecutorClass;
+	}
+
 	public String getNodeExecutorClass() {
 		if (StrUtil.isBlank(nodeExecutorClass)) {
 			return "com.yomahub.liteflow.flow.executor.DefaultNodeExecutor";
@@ -509,4 +528,16 @@ public class LiteflowConfig {
 	public void setScriptSetting(Map<String, String> scriptSetting) {
 		this.scriptSetting = scriptSetting;
 	}
+
+	public Boolean getChainThreadPoolIsolate() {
+		if (ObjectUtil.isNull(chainThreadPoolIsolate)) {
+			return Boolean.FALSE;
+		} else {
+			return chainThreadPoolIsolate;
+		}
+	}
+
+	public void setChainThreadPoolIsolate(Boolean chainThreadPoolIsolate) {
+		this.chainThreadPoolIsolate = chainThreadPoolIsolate;
+	}
 }

+ 36 - 4
liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java

@@ -9,19 +9,22 @@
 package com.yomahub.liteflow.thread;
 
 import cn.hutool.core.map.MapUtil;
+import cn.hutool.core.util.BooleanUtil;
 import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
 import com.yomahub.liteflow.exception.ThreadExecutorServiceCreateException;
+import com.yomahub.liteflow.flow.FlowBus;
+import com.yomahub.liteflow.flow.element.Chain;
 import com.yomahub.liteflow.log.LFLog;
 import com.yomahub.liteflow.log.LFLoggerManager;
 import com.yomahub.liteflow.property.LiteflowConfig;
 import com.yomahub.liteflow.property.LiteflowConfigGetter;
+import com.yomahub.liteflow.slot.DataBus;
 import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * 线程池工具类
@@ -128,8 +131,15 @@ public class ExecutorHelper {
 	}
 
 	//构造并行循环的线程池
-	public ExecutorService buildLoopParallelExecutor(){
+	public ExecutorService buildLoopParallelExecutor(Integer slotIndex) {
 		LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
+		//chain线程池
+		if (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())) {
+			//获取chain的hash
+			String chainId = DataBus.getSlot(slotIndex).getChainId();
+			Chain chain = FlowBus.getChain(chainId);
+			return getExecutorService(liteflowConfig.getThreadExecutorClass(), String.valueOf(chain.hashCode()));
+		}
 		return getExecutorService(liteflowConfig.getParallelLoopExecutorClass());
 	}
 
@@ -173,4 +183,26 @@ public class ExecutorHelper {
 		}
 	}
 
+	// 构建when线程池 - clazz和condition的hash值共同作为缓存key
+	public ExecutorService buildChainExecutorWithHash(String conditionHash) {
+		LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
+		return buildChainExecutorWithHash(liteflowConfig.getThreadExecutorClass(), conditionHash);
+	}
+
+	// 构建when线程池 - clazz和condition的hash值共同作为缓存key
+	public ExecutorService buildChainExecutorWithHash(String clazz, String conditionHash) {
+		if (StrUtil.isBlank(clazz)) {
+			return buildWhenExecutorWithHash(conditionHash);
+		}
+		return getExecutorService(clazz, conditionHash);
+	}
+
+	// 构建when线程池 - 支持多个when公用一个线程池
+	public ExecutorService buildChainExecutor(String clazz) {
+		if (StrUtil.isBlank(clazz)) {
+			return buildWhenExecutor();
+		}
+		return getExecutorService(clazz);
+	}
+
 }

+ 28 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultChainExecutorBuilder.java

@@ -0,0 +1,28 @@
+package com.yomahub.liteflow.thread;
+
+import cn.hutool.core.util.ObjectUtil;
+import com.yomahub.liteflow.property.LiteflowConfig;
+import com.yomahub.liteflow.property.LiteflowConfigGetter;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * LiteFlow默认的并行多线程执行器实现
+ *
+ * @author Bryan.Zhang
+ * @since 2.6.6
+ */
+public class LiteFlowDefaultChainExecutorBuilder implements ExecutorBuilder {
+
+    @Override
+    public ExecutorService buildExecutor() {
+        LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
+        // 只有在非spring的场景下liteflowConfig才会为null
+        if (ObjectUtil.isNull(liteflowConfig)) {
+            liteflowConfig = new LiteflowConfig();
+        }
+        return buildDefaultExecutor(liteflowConfig.getWhenMaxWorkers(), liteflowConfig.getWhenMaxWorkers(),
+                                    liteflowConfig.getWhenQueueLimit(), "chain-thread-");
+    }
+
+}

+ 1 - 0
liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowAutoConfiguration.java

@@ -30,6 +30,7 @@ public class LiteflowAutoConfiguration {
 		liteflowConfig.setRuleSourceExtDataMap(property.getRuleSourceExtDataMap());
 		liteflowConfig.setSlotSize(property.getSlotSize());
 		liteflowConfig.setThreadExecutorClass(property.getThreadExecutorClass());
+		liteflowConfig.setChainThreadExecutorClass(property.getChainThreadExecutorClass());
 		liteflowConfig.setWhenMaxWaitSeconds(property.getWhenMaxWaitSeconds());
 		liteflowConfig.setEnableLog(liteflowMonitorProperty.isEnableLog());
 		liteflowConfig.setQueueLimit(liteflowMonitorProperty.getQueueLimit());

+ 23 - 0
liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowProperty.java

@@ -40,6 +40,9 @@ public class LiteflowProperty {
 	// 并行线程执行器class路径
 	private String threadExecutorClass;
 
+    // chain线程执行器class路径
+    private String chainThreadExecutorClass;
+
 	// 异步线程最大等待描述
 	private int whenMaxWaitSeconds;
 
@@ -83,6 +86,10 @@ public class LiteflowProperty {
 	// 是否启用组件降级
 	private Boolean fallbackCmpEnable;
 
+    // chain线程池是否隔离
+    // 每一个chain里的when和异步循环合并起来都用单独的线程池。也就是说定义了多少个chain,就有多少个线程池
+    private boolean chainThreadPoolIsolate;
+
 	public boolean isEnable() {
 		return enable;
 	}
@@ -168,6 +175,14 @@ public class LiteflowProperty {
 		this.threadExecutorClass = threadExecutorClass;
 	}
 
+    public String getChainThreadExecutorClass() {
+        return chainThreadExecutorClass;
+    }
+
+    public void setChainThreadExecutorClass(String chainThreadExecutorClass) {
+        this.chainThreadExecutorClass = chainThreadExecutorClass;
+    }
+
 	public String getNodeExecutorClass() {
 		return nodeExecutorClass;
 	}
@@ -267,4 +282,12 @@ public class LiteflowProperty {
 	public Boolean getFallbackCmpEnable() {
 		return fallbackCmpEnable;
 	}
+
+    public void setChainThreadPoolIsolate(boolean chainThreadPoolIsolate) {
+        this.chainThreadPoolIsolate = chainThreadPoolIsolate;
+    }
+
+    public boolean isChainThreadPoolIsolate() {
+        return chainThreadPoolIsolate;
+    }
 }

+ 1 - 0
liteflow-solon-plugin/src/main/resources/META-INF/liteflow-default.properties

@@ -5,6 +5,7 @@ liteflow.main-executor-works=64
 liteflow.main-executor-class=com.yomahub.liteflow.thread.LiteFlowDefaultMainExecutorBuilder
 liteflow.request-id-generator-class=com.yomahub.liteflow.flow.id.DefaultRequestIdGenerator
 liteflow.thread-executor-class=com.yomahub.liteflow.thread.LiteFlowDefaultWhenExecutorBuilder
+liteflow.chain-thread-executor-class=com.yomahub.liteflow.thread.LiteFlowDefaultChainExecutorBuilder
 liteflow.when-max-wait-seconds=15
 liteflow.when-max-workers=16
 liteflow.when-queue-limit=512

+ 12 - 0
liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java

@@ -101,6 +101,10 @@ public class LiteflowProperty {
 	//脚本特殊设置选项
 	private Map<String, String> scriptSetting;
 
+	// chain线程池是否隔离
+	// 每一个chain里的when和异步循环合并起来都用单独的线程池。也就是说定义了多少个chain,就有多少个线程池
+	private Boolean chainThreadPoolIsolate;
+
 	public boolean isEnableMonitorFile() {
 		return enableMonitorFile;
 	}
@@ -336,4 +340,12 @@ public class LiteflowProperty {
 	public void setScriptSetting(Map<String, String> scriptSetting) {
 		this.scriptSetting = scriptSetting;
 	}
+
+	public void setChainThreadPoolIsolate(boolean chainThreadPoolIsolate) {
+		this.chainThreadPoolIsolate = chainThreadPoolIsolate;
+	}
+
+	public boolean isChainThreadPoolIsolate() {
+		return chainThreadPoolIsolate;
+	}
 }

+ 1 - 0
liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java

@@ -54,6 +54,7 @@ public class LiteflowPropertyAutoConfiguration {
 		liteflowConfig.setDelay(liteflowMonitorProperty.getDelay());
 		liteflowConfig.setPeriod(liteflowMonitorProperty.getPeriod());
 		liteflowConfig.setScriptSetting(property.getScriptSetting());
+		liteflowConfig.setChainThreadPoolIsolate(property.isChainThreadPoolIsolate());
 		return liteflowConfig;
 	}
 

+ 7 - 0
liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json

@@ -110,6 +110,13 @@
       "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
       "defaultValue": false
     },
+    {
+      "name": "liteflow.chain-thread-pool-isolate",
+      "type": "java.lang.Boolean",
+      "description": "set whether the chain thread pool is isolated.",
+      "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
+      "defaultValue": false
+    },
     {
       "name": "liteflow.parse-mode",
       "type": "com.yomahub.liteflow.enums.ParseModeEnum",

+ 1 - 0
liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties

@@ -10,6 +10,7 @@ liteflow.when-max-wait-time-unit=MILLISECONDS
 liteflow.when-max-workers=16
 liteflow.when-queue-limit=512
 liteflow.when-thread-pool-isolate=false
+liteflow.chain-thread-pool-isolate=false
 liteflow.parse-mode=PARSE_ALL_ON_START
 liteflow.retry-count=0
 liteflow.support-multiple-type=false