1
0
Эх сурвалжийг харах

feature #I883LB when线程池隔离支持

everywhere.z 1 жил өмнө
parent
commit
1ca5861e40
13 өөрчлөгдсөн 87 нэмэгдсэн , 15 устгасан
  1. 12 2
      liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java
  2. 16 0
      liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java
  3. 2 2
      liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java
  4. 27 2
      liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java
  5. 1 1
      liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultMainExecutorBuilder.java
  6. 11 0
      liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java
  7. 5 4
      liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java
  8. 7 0
      liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json
  9. 1 0
      liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties
  10. 1 0
      liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeThreadPoolELSpringbootTest.java
  11. 0 1
      liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/FCmp.java
  12. 3 2
      liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/application2.properties
  13. 1 1
      liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow2.xml

+ 12 - 2
liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java

@@ -1,5 +1,6 @@
 package com.yomahub.liteflow.flow.parallel.strategy;
 
+import cn.hutool.core.util.BooleanUtil;
 import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
 import com.yomahub.liteflow.enums.ParallelStrategyEnum;
@@ -90,8 +91,17 @@ public abstract class ParallelStrategyExecutor {
 
         String currChainName = whenCondition.getCurrChainId();
 
-        // 此方法其实只会初始化一次 Executor,不会每次都会初始化。Executor是唯一的
-        ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
+        LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
+
+        // 如果设置了线程池隔离,则每个when都会有对应的线程池,这是为了避免多层嵌套时如果线程池数量不够时出现单个线程池死锁。用线程池隔离的方式会更加好
+        // 如果when没有超多层的嵌套,还是用默认的比较好。
+        // 默认设置不隔离。也就是说,默认情况是一个线程池类一个实例,如果什么都不配置,那也就是在when的情况下,全局一个线程池。
+        ExecutorService parallelExecutor;
+        if (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())){
+            parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutorWithHash(whenCondition.getThreadExecutorClass(), String.valueOf(whenCondition.hashCode()));
+        }else{
+            parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
+        }
 
         // 设置 whenCondition 参数
         setWhenConditionParams(whenCondition);

+ 16 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java

@@ -50,6 +50,9 @@ public class LiteflowConfig {
 
 	private TimeUnit whenMaxWaitTimeUnit;
 
+	// 异步线程池是否隔离
+	private Boolean whenThreadPoolIsolate;
+
 	// 是否打印监控log
 	private Boolean enableLog;
 
@@ -458,4 +461,17 @@ public class LiteflowConfig {
 	public void setFallbackCmpEnable(Boolean fallbackCmpEnable) {
 		this.fallbackCmpEnable = fallbackCmpEnable;
 	}
+
+	public Boolean getWhenThreadPoolIsolate() {
+		if (ObjectUtil.isNull(whenThreadPoolIsolate)) {
+			return Boolean.FALSE;
+		}
+		else {
+			return whenThreadPoolIsolate;
+		}
+	}
+
+	public void setWhenThreadPoolIsolate(Boolean whenThreadPoolIsolate) {
+		this.whenThreadPoolIsolate = whenThreadPoolIsolate;
+	}
 }

+ 2 - 2
liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java

@@ -18,8 +18,8 @@ public interface ExecutorBuilder {
 	// 构建默认的线程池对象
 	default ExecutorService buildDefaultExecutor(int corePoolSize, int maximumPoolSize, int queueCapacity,
 			String threadName) {
-		return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L,
-				TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueCapacity), new ThreadFactory() {
+		return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60,
+				TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueCapacity), new ThreadFactory() {
 					private final AtomicLong number = new AtomicLong();
 
 					@Override

+ 27 - 2
liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java

@@ -100,6 +100,20 @@ public class ExecutorHelper {
 		return getExecutorService(clazz);
 	}
 
+	// 构建when线程池 - clazz和condition的hash值共同作为缓存key
+	public ExecutorService buildWhenExecutorWithHash(String conditionHash) {
+		LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
+		return buildWhenExecutorWithHash(liteflowConfig.getThreadExecutorClass(), conditionHash);
+	}
+
+	// 构建when线程池 - clazz和condition的hash值共同作为缓存key
+	public ExecutorService buildWhenExecutorWithHash(String clazz, String conditionHash) {
+		if (StrUtil.isBlank(clazz)) {
+			return buildWhenExecutorWithHash(conditionHash);
+		}
+		return getExecutorService(clazz, conditionHash);
+	}
+
 	// 构建默认的FlowExecutor线程池,用于execute2Future方法
 	public ExecutorService buildMainExecutor() {
 		LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
@@ -119,12 +133,23 @@ public class ExecutorHelper {
 		return getExecutorService(liteflowConfig.getParallelLoopExecutorClass());
 	}
 
+	private ExecutorService getExecutorService(String clazz){
+		return getExecutorService(clazz, null);
+	}
+
 	/**
 	 * 根据线程执行构建者Class类名获取ExecutorService实例
 	 */
-	private ExecutorService getExecutorService(String clazz) {
+	private ExecutorService getExecutorService(String clazz, String conditionHash) {
 		try {
-			ExecutorService executorServiceFromCache = executorServiceMap.get(clazz);
+			String key;
+			if (StrUtil.isBlank(conditionHash)){
+				key = clazz;
+			}else{
+				key = StrUtil.format("{}_{}", clazz, conditionHash);
+			}
+
+			ExecutorService executorServiceFromCache = executorServiceMap.get(key);
 			if (ObjectUtil.isNotNull(executorServiceFromCache)) {
 				return executorServiceFromCache;
 			}

+ 1 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultMainExecutorBuilder.java

@@ -20,7 +20,7 @@ public class LiteFlowDefaultMainExecutorBuilder implements ExecutorBuilder {
 		if (ObjectUtil.isNull(liteflowConfig)) {
 			liteflowConfig = new LiteflowConfig();
 		}
-		return buildDefaultExecutor(liteflowConfig.getMainExecutorWorks(), liteflowConfig.getMainExecutorWorks(), 200,
+		return buildDefaultExecutor(liteflowConfig.getMainExecutorWorks(), liteflowConfig.getMainExecutorWorks()*2, 200,
 				"main-thread-");
 	}
 

+ 11 - 0
liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java

@@ -51,6 +51,9 @@ public class LiteflowProperty {
 	// 异步线程池最大队列数量
 	private int whenQueueLimit;
 
+	// 异步线程池是否隔离
+	private boolean whenThreadPoolIsolate;
+
 	// 是否在启动时解析规则文件
 	// 这个参数主要给编码式注册元数据的场景用的,结合FlowBus.addNode一起用
 	private boolean parseOnStart;
@@ -289,4 +292,12 @@ public class LiteflowProperty {
 	public void setFallbackCmpEnable(Boolean fallbackCmpEnable) {
 		this.fallbackCmpEnable = fallbackCmpEnable;
 	}
+
+	public boolean isWhenThreadPoolIsolate() {
+		return whenThreadPoolIsolate;
+	}
+
+	public void setWhenThreadPoolIsolate(boolean whenThreadPoolIsolate) {
+		this.whenThreadPoolIsolate = whenThreadPoolIsolate;
+	}
 }

+ 5 - 4
liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java

@@ -30,12 +30,9 @@ public class LiteflowPropertyAutoConfiguration {
 		liteflowConfig.setWhenMaxWaitSeconds(property.getWhenMaxWaitSeconds());
 		liteflowConfig.setWhenMaxWaitTime(property.getWhenMaxWaitTime());
 		liteflowConfig.setWhenMaxWaitTimeUnit(property.getWhenMaxWaitTimeUnit());
-		liteflowConfig.setEnableLog(liteflowMonitorProperty.isEnableLog());
-		liteflowConfig.setQueueLimit(liteflowMonitorProperty.getQueueLimit());
-		liteflowConfig.setDelay(liteflowMonitorProperty.getDelay());
-		liteflowConfig.setPeriod(liteflowMonitorProperty.getPeriod());
 		liteflowConfig.setWhenMaxWorkers(property.getWhenMaxWorkers());
 		liteflowConfig.setWhenQueueLimit(property.getWhenQueueLimit());
+		liteflowConfig.setWhenThreadPoolIsolate(property.isWhenThreadPoolIsolate());
 		liteflowConfig.setParseOnStart(property.isParseOnStart());
 		liteflowConfig.setEnable(property.isEnable());
 		liteflowConfig.setSupportMultipleType(property.isSupportMultipleType());
@@ -51,6 +48,10 @@ public class LiteflowPropertyAutoConfiguration {
 		liteflowConfig.setParallelQueueLimit(property.getParallelQueueLimit());
 		liteflowConfig.setParallelLoopExecutorClass(property.getParallelLoopExecutorClass());
 		liteflowConfig.setFallbackCmpEnable(property.isFallbackCmpEnable());
+		liteflowConfig.setEnableLog(liteflowMonitorProperty.isEnableLog());
+		liteflowConfig.setQueueLimit(liteflowMonitorProperty.getQueueLimit());
+		liteflowConfig.setDelay(liteflowMonitorProperty.getDelay());
+		liteflowConfig.setPeriod(liteflowMonitorProperty.getPeriod());
 		return liteflowConfig;
 	}
 

+ 7 - 0
liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json

@@ -103,6 +103,13 @@
       "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
       "defaultValue": 512
     },
+    {
+      "name": "liteflow.when-thread-pool-isolate",
+      "type": "java.lang.Boolean",
+      "description": "set whether the asynchronous thread pool is isolated.",
+      "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
+      "defaultValue": false
+    },
     {
       "name": "liteflow.parse-on-start",
       "type": "java.lang.Boolean",

+ 1 - 0
liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties

@@ -9,6 +9,7 @@ liteflow.when-max-wait-time=15000
 liteflow.when-max-wait-time-unit=MILLISECONDS
 liteflow.when-max-workers=16
 liteflow.when-queue-limit=512
+liteflow.when-thread-pool-isolate=false
 liteflow.parse-on-start=true
 liteflow.retry-count=0
 liteflow.support-multiple-type=false

+ 1 - 0
liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeThreadPoolELSpringbootTest.java

@@ -20,6 +20,7 @@ public class AsyncNodeThreadPoolELSpringbootTest {
     @Resource
     private FlowExecutor flowExecutor;
 
+    // 测试当when嵌套层数大于最大线程个数时,并开启线程池隔离机制的正确性
     @Test
     public void testAsyncFlow1() {
         LiteflowResponse response = flowExecutor.execute2Resp("chain1", "it's a base request");

+ 0 - 1
liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/FCmp.java

@@ -8,7 +8,6 @@ public class FCmp extends NodeComponent {
 
 	@Override
 	public void process() throws Exception {
-		Thread.sleep(1500);
 		System.out.println("Fcomp executed!");
 	}
 

+ 3 - 2
liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/application2.properties

@@ -1,4 +1,5 @@
 liteflow.rule-source=asyncNode/flow2.xml
 liteflow.when-max-workers=4
-liteflow.when-max-wait-time=20
-liteflow.when-max-wait-time-unit=SECONDS
+liteflow.when-max-wait-time=3600000
+liteflow.when-max-wait-time-unit=MILLISECONDS
+liteflow.when-thread-pool-isolate=true

+ 1 - 1
liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow2.xml

@@ -2,6 +2,6 @@
 <flow>
     <!-- base test -->
     <chain name="chain1">
-        WHEN(f, f, WHEN(f, WHEN(f, f, f, f, WHEN(f, f, f, f, f, f, f))), f, f);
+        WHEN(f, WHEN(f, WHEN(f, WHEN(f, WHEN(f, WHEN(f, WHEN(f)))))));
     </chain>
 </flow>