Przeglądaj źródła

新增异步循环condition层级线程池

jason 6 miesięcy temu
rodzic
commit
a27397c9d4

+ 23 - 7
liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/ThreadPoolOperator.java

@@ -1,27 +1,43 @@
 package com.yomahub.liteflow.builder.el.operator;
 
+import com.ql.util.express.exception.QLException;
 import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
 import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
+import com.yomahub.liteflow.flow.element.Condition;
+import com.yomahub.liteflow.flow.element.condition.LoopCondition;
 import com.yomahub.liteflow.flow.element.condition.WhenCondition;
 
 /**
- * EL规则中的threadPool的操作符
+ * EL规则中的threadPool的操作符 有四种用法  WHEN().threadPool() FOR...DO().threadPool() WHILE...DO.threadPool() ITERATOR...DO
+ * .threadPool()
  *
  * @author Bryan.Zhang
  * @since 2.8.0
  */
-public class ThreadPoolOperator extends BaseOperator<WhenCondition> {
+public class ThreadPoolOperator extends BaseOperator<Condition> {
 
 	@Override
-	public WhenCondition build(Object[] objects) throws Exception {
+	public Condition build(Object[] objects) throws Exception {
 		OperatorHelper.checkObjectSizeEqTwo(objects);
 
-		String errorMsg = "The caller must be WhenCondition item";
-		WhenCondition whenCondition = OperatorHelper.convert(objects[0], WhenCondition.class, errorMsg);
+		if (objects[0] instanceof WhenCondition) {
+			String errorMsg = "The caller must be WhenCondition item";
 
-		whenCondition.setThreadExecutorClass(OperatorHelper.convert(objects[1], String.class));
+			WhenCondition condition = OperatorHelper.convert(objects[0], WhenCondition.class, errorMsg);
 
-		return whenCondition;
+			condition.setThreadExecutorClass(OperatorHelper.convert(objects[1], String.class));
+			return condition;
+		} else if (objects[0] instanceof LoopCondition) {
+			String errorMsg = "The caller must be LoopCondition item";
+
+			LoopCondition condition = OperatorHelper.convert(objects[0], LoopCondition.class, errorMsg);
+
+			condition.setThreadPoolExecutorClass(OperatorHelper.convert(objects[1], String.class));
+			return condition;
+		} else {
+			String errorMsg = "The caller must be LoopCondition or WhenCondition item";
+			throw new QLException(errorMsg);
+		}
 	}
 
 }

+ 2 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java

@@ -77,7 +77,8 @@ public class ForCondition extends LoopCondition {
                 //存储所有的并行执行子项的CompletableFuture
                 List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
                 //获取并行循环的线程池
-                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(slotIndex);
+                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(this,
+                                                                                                           slotIndex);
                 for (int i = 0; i < forCount; i++){
                     //提交异步任务
                     CompletableFuture<LoopFutureObj> future =

+ 2 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java

@@ -80,7 +80,8 @@ public class IteratorCondition extends LoopCondition {
                 //存储所有的并行执行子项的CompletableFuture
                 List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
                 //获取并行循环的线程池
-                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(slotIndex);
+                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(this,
+                                                                                                           slotIndex);
                 while (it.hasNext()) {
                     Object itObj = it.next();
                     //提交异步任务

+ 10 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java

@@ -20,6 +20,8 @@ import java.util.function.Supplier;
 public abstract class LoopCondition extends Condition {
     //判断循环是否并行执行,默认为false
     private boolean parallel = false;
+    //loop condition层级的线程池
+    private String threadPoolExecutorClass;
 
     protected Executable getBreakItem() {
         return this.getExecutableOne(ConditionKey.BREAK_KEY);
@@ -37,6 +39,14 @@ public abstract class LoopCondition extends Condition {
         this.addExecutable(ConditionKey.DO_KEY, executable);
     }
 
+    public String getThreadPoolExecutorClass() {
+        return threadPoolExecutorClass;
+    }
+
+    public void setThreadPoolExecutorClass(String threadPoolExecutorClass) {
+        this.threadPoolExecutorClass = threadPoolExecutorClass;
+    }
+
     protected void setLoopIndex(Executable executableItem, int index) {
         if (executableItem instanceof Chain) {
             ((Chain) executableItem).getConditionList().forEach(condition -> setLoopIndex(condition, index));

+ 1 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java

@@ -61,7 +61,7 @@ public class WhileCondition extends LoopCondition {
 			//并行循环逻辑
 			List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
 			//获取并行循环的线程池
-			ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(slotIndex);
+            ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(this, slotIndex);
 			while (getWhileResult(slotIndex, index)){
 				CompletableFuture<LoopFutureObj> future =
 						CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, index), parallelExecutor);

+ 10 - 4
liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java

@@ -14,6 +14,7 @@ import cn.hutool.core.util.StrUtil;
 import com.yomahub.liteflow.exception.ThreadExecutorServiceCreateException;
 import com.yomahub.liteflow.flow.FlowBus;
 import com.yomahub.liteflow.flow.element.Chain;
+import com.yomahub.liteflow.flow.element.condition.LoopCondition;
 import com.yomahub.liteflow.log.LFLog;
 import com.yomahub.liteflow.log.LFLoggerManager;
 import com.yomahub.liteflow.property.LiteflowConfig;
@@ -134,25 +135,30 @@ public class ExecutorHelper {
 	}
 
 	//构造并行循环的线程池
-	public ExecutorService buildLoopParallelExecutor(Integer slotIndex) {
+	public ExecutorService buildLoopParallelExecutor(LoopCondition loopCondition, Integer slotIndex) {
 		ExecutorService parallelExecutor;
 		LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
 		//获取chain的hash
 		String chainId = DataBus.getSlot(slotIndex).getChainId();
 		Chain chain = FlowBus.getChain(chainId);
 
-		//condition层级线程池 TODO
+		//condition层级线程池
+		if (ObjectUtil.isNotEmpty(loopCondition.getThreadPoolExecutorClass())) {
+			parallelExecutor = getExecutorService(loopCondition.getThreadPoolExecutorClass(),
+												  String.valueOf(loopCondition.hashCode()));
 
-		//chain层级线程池
-		if (ObjectUtil.isNotEmpty(chain.getThreadPoolExecutorClass())) {
+		} else if (ObjectUtil.isNotEmpty(chain.getThreadPoolExecutorClass())) {
 			//chain层级线程池
 			parallelExecutor = getExecutorService(chain.getThreadPoolExecutorClass(),
 												  String.valueOf(chain.hashCode()));
+
 		} else {
 			//全局线程池
 			parallelExecutor = getExecutorService(Optional.ofNullable(liteflowConfig.getParallelLoopExecutorClass())
 														  .orElse(liteflowConfig.getGlobalThreadPoolExecutorClass()));
+
 		}
+
 		return parallelExecutor;
 	}
 

+ 3 - 3
liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/ConditionThreadPoolELSpringbootTest.java

@@ -50,7 +50,7 @@ public class ConditionThreadPoolELSpringbootTest extends BaseTest {
 		LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg");
 		DefaultContext context = response1.getFirstContextBean();
 		Assertions.assertTrue(response1.isSuccess());
-		Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
+		Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
 	}
 
 	/**
@@ -61,7 +61,7 @@ public class ConditionThreadPoolELSpringbootTest extends BaseTest {
 		LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg");
 		DefaultContext context = response1.getFirstContextBean();
 		Assertions.assertTrue(response1.isSuccess());
-		Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
+		Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
 	}
 
 	/**
@@ -73,7 +73,7 @@ public class ConditionThreadPoolELSpringbootTest extends BaseTest {
 		LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", list);
 		DefaultContext context = response1.getFirstContextBean();
 		Assertions.assertTrue(response1.isSuccess());
-		Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
+		Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
 	}
 
 }

+ 23 - 0
liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomLoopThreadExecutor.java

@@ -0,0 +1,23 @@
+package com.yomahub.liteflow.test.chainThreadPool;
+
+import cn.hutool.core.util.ObjectUtil;
+import com.yomahub.liteflow.property.LiteflowConfig;
+import com.yomahub.liteflow.property.LiteflowConfigGetter;
+import com.yomahub.liteflow.thread.ExecutorBuilder;
+
+import java.util.concurrent.ExecutorService;
+
+public class CustomLoopThreadExecutor implements ExecutorBuilder {
+
+	@Override
+	public ExecutorService buildExecutor() {
+		LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
+		// 只有在非spring的场景下liteflowConfig才会为null
+		if (ObjectUtil.isNull(liteflowConfig)) {
+			liteflowConfig = new LiteflowConfig();
+		}
+		return buildDefaultExecutor(16, 16,
+									512, "customer-loop-thead");
+	}
+
+}

+ 3 - 5
liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow2.el.xml

@@ -7,18 +7,16 @@
 
     <chain name="chain2"
            thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
-        FOR(5).parallel(true).DO(THEN(a,f
-        )
-        );
+        FOR(5).parallel(true).DO(THEN(a,f)).threadPool("com.yomahub.liteflow.test.chainThreadPool.CustomLoopThreadExecutor");
     </chain>
 
     <chain name="chain3"
            thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
-        WHILE(z).parallel(true).DO(THEN(w,d));
+        WHILE(z).parallel(true).DO(THEN(w,d)).threadPool("com.yomahub.liteflow.test.chainThreadPool.CustomLoopThreadExecutor");
     </chain>
 
     <chain name="chain4"
            thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
-        ITERATOR(it).parallel(true).DO(THEN(a,i));
+        ITERATOR(it).parallel(true).DO(THEN(a,i)).threadPool("com.yomahub.liteflow.test.chainThreadPool.CustomLoopThreadExecutor");
     </chain>
 </flow>