Bladeren bron

enhancement #I6Q061 WHEN组件最大等待时间希望能支持到毫秒级别,或者可以通过配置TimeUnit来指定

everywhere.z 2 jaren geleden
bovenliggende
commit
3518464fb3

+ 39 - 33
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java

@@ -7,6 +7,7 @@
  */
 package com.yomahub.liteflow.flow.element.condition;
 
+import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
 import com.yomahub.liteflow.common.LocalDefaultFlowConstant;
 import com.yomahub.liteflow.enums.ConditionTypeEnum;
@@ -70,7 +71,7 @@ public class WhenCondition extends Condition {
 
 		// 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的
 		ExecutorService parallelExecutor = ExecutorHelper.loadInstance()
-			.buildWhenExecutor(this.getThreadExecutorClass());
+				.buildWhenExecutor(this.getThreadExecutorClass());
 
 		// 获得liteflow的参数
 		LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
@@ -85,24 +86,34 @@ public class WhenCondition extends Condition {
 		// 3.根据condition.getNodeList()的集合进行流处理,用map进行把executable对象转换成List<CompletableFuture<WhenFutureObj>>
 		// 4.在转的过程中,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象
 		// 5.第2个参数是主要的本体CompletableFuture,传入了ParallelSupplier和线程池对象
+		Integer whenMaxWaitTime;
+		TimeUnit whenMaxWaitTimeUnit;
+
+		if (ObjectUtil.isNotNull(liteflowConfig.getWhenMaxWaitSeconds())){
+			whenMaxWaitTime = liteflowConfig.getWhenMaxWaitSeconds();
+			whenMaxWaitTimeUnit = TimeUnit.SECONDS;
+		}else{
+			whenMaxWaitTime = liteflowConfig.getWhenMaxWaitTime();
+			whenMaxWaitTimeUnit = liteflowConfig.getWhenMaxWaitTimeUnit();
+		}
+
 		List<CompletableFuture<WhenFutureObj>> completableFutureList = this.getExecutableList()
-			.stream()
-			.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
-			.filter(executable -> {
-				try {
-					return executable.isAccess(slotIndex);
-				}
-				catch (Exception e) {
-					LOG.error("there was an error when executing the when component isAccess", e);
-					return false;
-				}
-			})
-			.map(executable -> CompletableFutureTimeout.completeOnTimeout(
-					WhenFutureObj.timeOut(executable.getId()),
-					CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex),
-							parallelExecutor),
-					liteflowConfig.getWhenMaxWaitSeconds(), TimeUnit.SECONDS))
-			.collect(Collectors.toList());
+				.stream()
+				.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
+				.filter(executable -> {
+					try {
+						return executable.isAccess(slotIndex);
+					} catch (Exception e) {
+						LOG.error("there was an error when executing the when component isAccess", e);
+						return false;
+					}
+				})
+				.map(executable -> CompletableFutureTimeout.completeOnTimeout(
+						WhenFutureObj.timeOut(executable.getId()),
+						CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex),
+								parallelExecutor),
+						whenMaxWaitTime, whenMaxWaitTimeUnit))
+				.collect(Collectors.toList());
 
 		CompletableFuture<?> resultCompletableFuture;
 
@@ -112,19 +123,17 @@ public class WhenCondition extends Condition {
 		if (this.isAny()) {
 			// 把这些CompletableFuture通过anyOf合成一个CompletableFuture
 			resultCompletableFuture = CompletableFuture
-				.anyOf(completableFutureList.toArray(new CompletableFuture[] {}));
-		}
-		else {
+					.anyOf(completableFutureList.toArray(new CompletableFuture[] {}));
+		} else {
 			// 把这些CompletableFuture通过allOf合成一个CompletableFuture
 			resultCompletableFuture = CompletableFuture
-				.allOf(completableFutureList.toArray(new CompletableFuture[] {}));
+					.allOf(completableFutureList.toArray(new CompletableFuture[] {}));
 		}
 
 		try {
 			// 进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回
 			resultCompletableFuture.get();
-		}
-		catch (InterruptedException | ExecutionException e) {
+		} catch (InterruptedException | ExecutionException e) {
 			LOG.error("there was an error when executing the CompletableFuture", e);
 			interrupted[0] = true;
 		}
@@ -137,16 +146,14 @@ public class WhenCondition extends Condition {
 			// 过滤出已经完成的,没完成的就直接终止
 			if (f.isDone()) {
 				return true;
-			}
-			else {
+			} else {
 				f.cancel(true);
 				return false;
 			}
 		}).map(f -> {
 			try {
 				return f.get();
-			}
-			catch (InterruptedException | ExecutionException e) {
+			} catch (InterruptedException | ExecutionException e) {
 				interrupted[0] = true;
 				return null;
 			}
@@ -155,8 +162,8 @@ public class WhenCondition extends Condition {
 		// 判断超时,上面已经拿到了所有已经完成的CompletableFuture
 		// 那我们只要过滤出超时的CompletableFuture
 		List<WhenFutureObj> timeOutWhenFutureObjList = allCompletableWhenFutureObjList.stream()
-			.filter(WhenFutureObj::isTimeout)
-			.collect(Collectors.toList());
+				.filter(WhenFutureObj::isTimeout)
+				.collect(Collectors.toList());
 
 		// 输出超时信息
 		timeOutWhenFutureObjList.forEach(whenFutureObj -> LOG.warn(
@@ -167,7 +174,7 @@ public class WhenCondition extends Condition {
 		if (!this.isIgnoreError()) {
 			if (interrupted[0]) {
 				throw new WhenExecuteException(StrUtil
-					.format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId()));
+						.format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId()));
 			}
 
 			// 循环判断CompletableFuture的返回值,如果异步执行失败,则抛出相应的业务异常
@@ -178,8 +185,7 @@ public class WhenCondition extends Condition {
 					throw whenFutureObj.getEx();
 				}
 			}
-		}
-		else if (interrupted[0]) {
+		} else if (interrupted[0]) {
 			// 这里由于配置了ignoreError,所以只打印warn日志
 			LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.",
 					slot.getRequestId());

+ 1 - 2
liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/WhenFutureObj.java

@@ -43,8 +43,7 @@ public class WhenFutureObj {
 		result.setTimeout(true);
 		result.setExecutorName(executorName);
 		result.setEx(new WhenTimeoutException(
-				StrUtil.format("Timed out when executing the component[{}],when-max-timeout-seconds config is:{}(s)",
-						executorName, LiteflowConfigGetter.get().getWhenMaxWaitSeconds())));
+				StrUtil.format("Timed out when executing the component[{}]",executorName)));
 		return result;
 	}
 

+ 32 - 5
liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java

@@ -12,6 +12,7 @@ import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * liteflow的配置实体类 这个类中的属性为什么不用基本类型,而用包装类型呢
@@ -42,8 +43,13 @@ public class LiteflowConfig {
 	private String threadExecutorClass;
 
 	// 异步线程最大等待秒数
+	@Deprecated
 	private Integer whenMaxWaitSeconds;
 
+	private Integer whenMaxWaitTime;
+
+	private TimeUnit whenMaxWaitTimeUnit;
+
 	// 是否打印监控log
 	private Boolean enableLog;
 
@@ -139,15 +145,15 @@ public class LiteflowConfig {
 		this.slotSize = slotSize;
 	}
 
+	@Deprecated
 	public Integer getWhenMaxWaitSeconds() {
-		if (ObjectUtil.isNull(whenMaxWaitSeconds)) {
-			return 15;
-		}
-		else {
-			return whenMaxWaitSeconds;
+		if (whenMaxWaitSeconds == null || whenMaxWaitSeconds == 0){
+			return null;
 		}
+		return whenMaxWaitSeconds;
 	}
 
+	@Deprecated
 	public void setWhenMaxWaitSeconds(Integer whenMaxWaitSeconds) {
 		this.whenMaxWaitSeconds = whenMaxWaitSeconds;
 	}
@@ -382,4 +388,25 @@ public class LiteflowConfig {
 		this.ruleSourceExtDataMap = ruleSourceExtDataMap;
 	}
 
+	public Integer getWhenMaxWaitTime() {
+		if (ObjectUtil.isNull(whenMaxWaitTime)){
+			return 15000;
+		}
+		return whenMaxWaitTime;
+	}
+
+	public void setWhenMaxWaitTime(Integer whenMaxWaitTime) {
+		this.whenMaxWaitTime = whenMaxWaitTime;
+	}
+
+	public TimeUnit getWhenMaxWaitTimeUnit() {
+		if (ObjectUtil.isNull(whenMaxWaitTimeUnit)){
+			return TimeUnit.MILLISECONDS;
+		}
+		return whenMaxWaitTimeUnit;
+	}
+
+	public void setWhenMaxWaitTimeUnit(TimeUnit whenMaxWaitTimeUnit) {
+		this.whenMaxWaitTimeUnit = whenMaxWaitTimeUnit;
+	}
 }

+ 23 - 0
liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java

@@ -3,6 +3,7 @@ package com.yomahub.liteflow.springboot;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * 执行流程主要的参数类
@@ -37,8 +38,13 @@ public class LiteflowProperty {
 	private String threadExecutorClass;
 
 	// 异步线程最大等待描述
+	@Deprecated
 	private int whenMaxWaitSeconds;
 
+	private int whenMaxWaitTime;
+
+	private TimeUnit whenMaxWaitTimeUnit;
+
 	// 异步线程池最大线程数
 	private int whenMaxWorkers;
 
@@ -106,10 +112,12 @@ public class LiteflowProperty {
 		this.slotSize = slotSize;
 	}
 
+	@Deprecated
 	public int getWhenMaxWaitSeconds() {
 		return whenMaxWaitSeconds;
 	}
 
+	@Deprecated
 	public void setWhenMaxWaitSeconds(int whenMaxWaitSeconds) {
 		this.whenMaxWaitSeconds = whenMaxWaitSeconds;
 	}
@@ -234,4 +242,19 @@ public class LiteflowProperty {
 		this.ruleSourceExtDataMap = ruleSourceExtDataMap;
 	}
 
+	public int getWhenMaxWaitTime() {
+		return whenMaxWaitTime;
+	}
+
+	public void setWhenMaxWaitTime(int whenMaxWaitTime) {
+		this.whenMaxWaitTime = whenMaxWaitTime;
+	}
+
+	public TimeUnit getWhenMaxWaitTimeUnit() {
+		return whenMaxWaitTimeUnit;
+	}
+
+	public void setWhenMaxWaitTimeUnit(TimeUnit whenMaxWaitTimeUnit) {
+		this.whenMaxWaitTimeUnit = whenMaxWaitTimeUnit;
+	}
 }

+ 2 - 0
liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java

@@ -28,6 +28,8 @@ public class LiteflowPropertyAutoConfiguration {
 		liteflowConfig.setSlotSize(property.getSlotSize());
 		liteflowConfig.setThreadExecutorClass(property.getThreadExecutorClass());
 		liteflowConfig.setWhenMaxWaitSeconds(property.getWhenMaxWaitSeconds());
+		liteflowConfig.setWhenMaxWaitTime(property.getWhenMaxWaitTime());
+		liteflowConfig.setWhenMaxWaitTimeUnit(property.getWhenMaxWaitTimeUnit());
 		liteflowConfig.setEnableLog(liteflowMonitorProperty.isEnableLog());
 		liteflowConfig.setQueueLimit(liteflowMonitorProperty.getQueueLimit());
 		liteflowConfig.setDelay(liteflowMonitorProperty.getDelay());

+ 15 - 1
liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json

@@ -73,7 +73,21 @@
       "type": "java.lang.Integer",
       "description": "Set the async thread max wait seconds on \" when \" mode.",
       "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
-      "defaultValue": 15
+      "defaultValue": 0
+    },
+    {
+      "name": "liteflow.when-max-wait-time",
+      "type": "java.lang.Integer",
+      "description": "Set the async thread max wait time on \" when \" mode.",
+      "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
+      "defaultValue": 15000
+    },
+    {
+      "name": "liteflow.when-max-wait-time-unit",
+      "type": "java.util.concurrent.TimeUnit",
+      "description": "Set the async thread max wait time unit on \" when \" mode.",
+      "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
+      "defaultValue": "MILLISECONDS"
     },
     {
       "name": "liteflow.when-max-workers",

+ 2 - 1
liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties

@@ -5,7 +5,8 @@ liteflow.main-executor-works=64
 liteflow.main-executor-class=com.yomahub.liteflow.thread.LiteFlowDefaultMainExecutorBuilder
 liteflow.request-id-generator-class=com.yomahub.liteflow.flow.id.DefaultRequestIdGenerator
 liteflow.thread-executor-class=com.yomahub.liteflow.thread.LiteFlowDefaultWhenExecutorBuilder
-liteflow.when-max-wait-seconds=15
+liteflow.when-max-wait-time=15000
+liteflow.when-max-wait-time-unit=MILLISECONDS
 liteflow.when-max-workers=16
 liteflow.when-queue-limit=512
 liteflow.parse-on-start=true

+ 4 - 1
liteflow-testcase-el/liteflow-testcase-el-nospring/src/test/java/com/yomahub/liteflow/test/config/LiteflowConfigTest1.java

@@ -10,6 +10,8 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * 非spring环境下参数单元测试
  *
@@ -33,7 +35,8 @@ public class LiteflowConfigTest1 extends BaseTest {
 		LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
 		Assert.assertTrue(response.isSuccess());
 		Assert.assertEquals("config/flow.el.xml", config.getRuleSource());
-		Assert.assertEquals(15, config.getWhenMaxWaitSeconds().intValue());
+		Assert.assertEquals(15000, config.getWhenMaxWaitTime().intValue());
+		Assert.assertEquals(TimeUnit.MILLISECONDS, config.getWhenMaxWaitTimeUnit());
 		Assert.assertEquals(200, config.getQueueLimit().intValue());
 		Assert.assertEquals(300000L, config.getDelay().longValue());
 		Assert.assertEquals(300000L, config.getPeriod().longValue());

+ 3 - 1
liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/config/LiteflowConfigELSpringTest.java

@@ -13,6 +13,7 @@ import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import javax.annotation.Resource;
+import java.util.concurrent.TimeUnit;
 
 /**
  * spring环境下参数单元测试
@@ -36,7 +37,8 @@ public class LiteflowConfigELSpringTest extends BaseTest {
 		LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
 		Assert.assertTrue(response.isSuccess());
 		Assert.assertEquals("config/flow.el.json", config.getRuleSource());
-		Assert.assertEquals(15, config.getWhenMaxWaitSeconds().intValue());
+		Assert.assertEquals(15000, config.getWhenMaxWaitTime().intValue());
+		Assert.assertEquals(TimeUnit.MILLISECONDS, config.getWhenMaxWaitTimeUnit());
 		Assert.assertEquals(200, config.getQueueLimit().intValue());
 		Assert.assertEquals(300000L, config.getDelay().longValue());
 		Assert.assertEquals(300000L, config.getPeriod().longValue());