Ver Fonte

规范问题修复

jason há 6 meses atrás
pai
commit
eb38cbb0c5
21 ficheiros alterados com 60 adições e 97 exclusões
  1. 8 10
      liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java
  2. 1 0
      liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/ThreadPoolOperator.java
  3. 1 4
      liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/WhenOperator.java
  4. 1 0
      liteflow-core/src/main/java/com/yomahub/liteflow/common/ChainConstant.java
  5. 1 0
      liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Chain.java
  6. 3 3
      liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java
  7. 9 2
      liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java
  8. 1 1
      liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java
  9. 3 1
      liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java
  10. 1 39
      liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java
  11. 1 1
      liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java
  12. 1 0
      liteflow-core/src/main/java/com/yomahub/liteflow/parser/helper/ParserHelper.java
  13. 1 0
      liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java
  14. 5 1
      liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorCondition.java
  15. 17 3
      liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorConditionBuilder.java
  16. 1 32
      liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java
  17. 1 0
      liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowAutoConfiguration.java
  18. 1 0
      liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowProperty.java
  19. 1 0
      liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java
  20. 1 0
      liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java
  21. 1 0
      liteflow-testcase-el/liteflow-testcase-el-builder/src/test/java/com/yomahub/liteflow/test/builder/customTreadExecutor/CustomThreadExecutor1.java

+ 8 - 10
liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java

@@ -1,10 +1,6 @@
 package com.yomahub.liteflow.builder.el;
 
 import cn.hutool.core.collection.CollUtil;
-import cn.hutool.core.util.ArrayUtil;
-import cn.hutool.core.util.CharUtil;
-import cn.hutool.core.util.ObjectUtil;
-import cn.hutool.core.util.StrUtil;
 import cn.hutool.core.io.FileUtil;
 import cn.hutool.core.util.*;
 import cn.hutool.crypto.digest.MD5;
@@ -31,19 +27,15 @@ import com.yomahub.liteflow.log.LFLog;
 import com.yomahub.liteflow.log.LFLoggerManager;
 import com.yomahub.liteflow.property.LiteflowConfig;
 import com.yomahub.liteflow.property.LiteflowConfigGetter;
-import com.yomahub.liteflow.util.ElRegexUtil;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
 import java.io.File;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.yomahub.liteflow.common.ChainConstant.NODE_INSTANCE_PATH;
 import static com.yomahub.liteflow.common.ChainConstant.USER_DIR;
-import static com.yomahub.liteflow.util.JsonUtil.*;
+import static com.yomahub.liteflow.util.JsonUtil.parseObject;
+import static com.yomahub.liteflow.util.JsonUtil.toJsonString;
 import static com.yomahub.liteflow.util.SerialsUtil.generateShortUUID;
 
 
@@ -52,6 +44,7 @@ import static com.yomahub.liteflow.util.SerialsUtil.generateShortUUID;
  *
  * @author Bryan.Zhang
  * @author Jay li
+ * @author jason
  * @since 2.8.0
  */
 public class LiteFlowChainELBuilder {
@@ -328,6 +321,11 @@ public class LiteFlowChainELBuilder {
 		return this;
 	}
 
+	public LiteFlowChainELBuilder setThreadPoolExecutorClass(String threadPoolExecutorClass) {
+		this.chain.setThreadPoolExecutorClass(threadPoolExecutorClass);
+		return this;
+	}
+
     /**
      * EL表达式校验,此方法已经过时,请使用 {@link LiteFlowChainELBuilder#validateWithEx(String)}
      *

+ 1 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/ThreadPoolOperator.java

@@ -12,6 +12,7 @@ import com.yomahub.liteflow.flow.element.condition.WhenCondition;
  * .threadPool()
  *
  * @author Bryan.Zhang
+ * @author jason
  * @since 2.8.0
  */
 public class ThreadPoolOperator extends BaseOperator<Condition> {

+ 1 - 4
liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/WhenOperator.java

@@ -4,13 +4,12 @@ import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
 import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
 import com.yomahub.liteflow.flow.element.Executable;
 import com.yomahub.liteflow.flow.element.condition.WhenCondition;
-import com.yomahub.liteflow.property.LiteflowConfig;
-import com.yomahub.liteflow.property.LiteflowConfigGetter;
 
 /**
  * EL规则中的WHEN的操作符
  *
  * @author Bryan.Zhang
+ * @author jason
  * @since 2.8.0
  */
 public class WhenOperator extends BaseOperator<WhenCondition> {
@@ -21,11 +20,9 @@ public class WhenOperator extends BaseOperator<WhenCondition> {
 
 		WhenCondition whenCondition = new WhenCondition();
 
-		LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
 		for (Object obj : objects) {
 			OperatorHelper.checkObjMustBeCommonTypeItem(obj);
 			whenCondition.addExecutable(OperatorHelper.convert(obj, Executable.class));
-			whenCondition.setThreadExecutorClass(liteflowConfig.getGlobalThreadPoolExecutorClass());
 		}
 		return whenCondition;
 	}

+ 1 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/common/ChainConstant.java

@@ -4,6 +4,7 @@ package com.yomahub.liteflow.common;
  * Chain 常量
  *
  * @author tangkc
+ * @author jason
  */
 public interface ChainConstant {
 	String PARALLEL = "parallel";

+ 1 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Chain.java

@@ -28,6 +28,7 @@ import java.util.List;
  * chain对象,实现可执行器
  *
  * @author Bryan.Zhang
+ * @author jason
  */
 public class Chain implements Executable{
 

+ 3 - 3
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java

@@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService;
  * 循环次数Condition
  *
  * @author Bryan.Zhang
+ * @author jason
  * @since 2.9.0
  */
 public class ForCondition extends LoopCondition {
@@ -77,9 +78,8 @@ public class ForCondition extends LoopCondition {
                 //存储所有的并行执行子项的CompletableFuture
                 List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
                 //获取并行循环的线程池
-                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutorService(this,
-                                                                                                      slotIndex,
-                                                                                                      this.getConditionType());
+                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutorService(this, slotIndex
+                        , this.getConditionType());
                 for (int i = 0; i < forCount; i++){
                     //提交异步任务
                     CompletableFuture<LoopFutureObj> future =

+ 9 - 2
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java

@@ -17,6 +17,13 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 
+/**
+ * 迭代次数Condition
+ *
+ * @author jason
+ * @since 2.9.0
+ */
+
 public class IteratorCondition extends LoopCondition {
 
     @Override
@@ -80,8 +87,8 @@ public class IteratorCondition extends LoopCondition {
                 //存储所有的并行执行子项的CompletableFuture
                 List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
                 //获取并行循环的线程池
-                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(this,
-                                                                                                           slotIndex);
+                ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutorService(this, slotIndex
+                        , this.getConditionType());
                 while (it.hasNext()) {
                     Object itObj = it.next();
                     //提交异步任务

+ 1 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java

@@ -8,13 +8,13 @@ import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
 
 /**
  * 循环Condition的抽象类 主要继承对象有ForCondition和WhileCondition
  *
  * @author Bryan.Zhang
+ * @author jason
  * @since 2.9.0
  */
 public abstract class LoopCondition extends Condition {

+ 3 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java

@@ -15,6 +15,7 @@ import java.util.concurrent.ExecutorService;
  * 循环条件Condition
  *
  * @author Bryan.Zhang
+ * @author jason
  * @since 2.9.0
  */
 public class WhileCondition extends LoopCondition {
@@ -61,7 +62,8 @@ public class WhileCondition extends LoopCondition {
 			//并行循环逻辑
 			List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
 			//获取并行循环的线程池
-            ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(this, slotIndex);
+			ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutorService(this, slotIndex,
+																								  this.getConditionType());
 			while (getWhileResult(slotIndex, index)){
 				CompletableFuture<LoopFutureObj> future =
 						CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, index), parallelExecutor);

+ 1 - 39
liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java

@@ -1,12 +1,9 @@
 package com.yomahub.liteflow.flow.parallel.strategy;
 
-import cn.hutool.core.util.BooleanUtil;
 import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
 import com.yomahub.liteflow.enums.ParallelStrategyEnum;
 import com.yomahub.liteflow.exception.WhenExecuteException;
-import com.yomahub.liteflow.flow.FlowBus;
-import com.yomahub.liteflow.flow.element.Chain;
 import com.yomahub.liteflow.flow.element.Executable;
 import com.yomahub.liteflow.flow.element.Node;
 import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
@@ -36,6 +33,7 @@ import java.util.stream.Stream;
  *
  * @author luo yi
  * @author Bryan.Zhang
+ * @author jason
  * @since 2.11.0
  */
 public abstract class ParallelStrategyExecutor {
@@ -119,42 +117,6 @@ public abstract class ParallelStrategyExecutor {
         });
     }
 
-    /**
-     * 获取 WHEN 所需线程池
-     * @param whenCondition
-     * @return
-     */
-    protected ExecutorService getWhenExecutorService(WhenCondition whenCondition, Integer slotIndex) {
-
-        LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
-        //线程池的优先级 condition层级>chain层级>全局体系
-        // 1、如果设置了线程池隔离,则每个 when 都会有对应的线程池,这是为了避免多层嵌套时如果线程池数量不够时出现单个线程池死锁。用线程池隔离的方式会更加好
-        // 2、如果在chain上自定义线程池,同一个chain下的when+异步线程池共享一个线程池
-        // 3、默认全局一个线程池,所有的when+异步共享一个线程池
-        ExecutorService parallelExecutor;
-
-        String chainId = DataBus.getSlot(slotIndex).getChainId();
-
-        Chain chain = FlowBus.getChain(chainId);
-
-        if (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())) {
-            //condition层级线程池
-            parallelExecutor =
-                    ExecutorHelper.loadInstance().buildWhenExecutorWithHash(whenCondition.getThreadExecutorClass(),
-                                                                            String.valueOf(whenCondition.hashCode()));
-        } else if (ObjectUtil.isNotEmpty(chain.getThreadPoolExecutorClass())) {
-            //chain层级线程池
-            parallelExecutor =
-                    ExecutorHelper.loadInstance().buildWhenExecutorWithHash(chain.getThreadPoolExecutorClass(),
-                                                                            String.valueOf(chain.hashCode()));
-        } else {
-            //全局线程池
-            parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
-        }
-
-        return parallelExecutor;
-
-    }
 
     /**
      * 获取所有任务 CompletableFuture 集合

+ 1 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java

@@ -14,6 +14,7 @@ import java.util.concurrent.ExecutorService;
  *
  * @author luo yi
  * @author Bryan.Zhang
+ * @author jason
  * @since 2.11.0
  */
 public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
@@ -30,7 +31,6 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
         ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutorService(whenCondition,
                                                                                               slotIndex,
                                                                                               whenCondition.getConditionType());
-        ;
 
         // 指定完成的任务
         CompletableFuture<?> specifyTask;

+ 1 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/parser/helper/ParserHelper.java

@@ -25,6 +25,7 @@ import static com.yomahub.liteflow.common.ChainConstant.*;
  * Parser 通用 Helper
  *
  * @author tangkc
+ * @author jason
  * @author zy
  */
 public class ParserHelper {

+ 1 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java

@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
  * 所以为了要有null值出现,这里采用包装类型
  *
  * @author Bryan.Zhang
+ * @author jason
  */
 public class LiteflowConfig {
 

+ 5 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorCondition.java

@@ -2,7 +2,11 @@ package com.yomahub.liteflow.thread.ExecutorCondition;
 
 
 /**
- * 执行器条件对象
+ * <p>Title: ExecutorCondition</p>
+ * <p>Description: 执行器条件对象</p>
+ *
+ * @author jason
+ * @Date 2024/11/12
  */
 public class ExecutorCondition {
     private final boolean conditionLevel;

+ 17 - 3
liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorCondition/ExecutorConditionBuilder.java

@@ -10,6 +10,14 @@ import com.yomahub.liteflow.flow.element.condition.LoopCondition;
 import com.yomahub.liteflow.flow.element.condition.WhenCondition;
 import com.yomahub.liteflow.property.LiteflowConfig;
 
+/**
+ * <p>Title: ExecutorConditionBuilder</p>
+ * <p>Description: 执行器构建对象</p>
+ *
+ * @author jason
+ * @Date 2024/11/12
+ */
+
 public class ExecutorConditionBuilder {
 
     /**
@@ -22,6 +30,7 @@ public class ExecutorConditionBuilder {
             ConditionTypeEnum type) {
 
         boolean conditionLevel;
+        boolean chainLevel;
         String conditionExecutorClass;
 
         switch (type) {
@@ -31,11 +40,16 @@ public class ExecutorConditionBuilder {
                 LoopCondition loopCondition = (LoopCondition) condition;
                 conditionLevel = ObjectUtil.isNotEmpty(loopCondition.getThreadPoolExecutorClass());
                 conditionExecutorClass = loopCondition.getThreadPoolExecutorClass();
+                chainLevel = ObjectUtil.isNotEmpty(chain.getThreadPoolExecutorClass());
                 break;
             case TYPE_WHEN:
                 WhenCondition whenCondition = (WhenCondition) condition;
-                conditionLevel = BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate());
-                conditionExecutorClass = whenCondition.getThreadExecutorClass();
+                conditionLevel =
+                        (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())) || (ObjectUtil.isNotEmpty(whenCondition.getThreadExecutorClass()));
+                //当whenThreadPoolIsolate为true,需要有默认值
+                conditionExecutorClass = whenCondition.getThreadExecutorClass() == null ?
+                        liteflowConfig.getGlobalThreadPoolExecutorClass() : whenCondition.getThreadExecutorClass();
+                chainLevel = ObjectUtil.isNotEmpty(chain.getThreadPoolExecutorClass());
                 break;
             default:
                 throw new IllegalArgumentException("Unsupported condition type: " + type);
@@ -43,7 +57,7 @@ public class ExecutorConditionBuilder {
 
         return ExecutorCondition.create(
                 conditionLevel,
-                ObjectUtil.isNotEmpty(chain.getThreadPoolExecutorClass()),
+                chainLevel,
                 conditionExecutorClass
         );
     }

+ 1 - 32
liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java

@@ -16,7 +16,6 @@ import com.yomahub.liteflow.exception.ThreadExecutorServiceCreateException;
 import com.yomahub.liteflow.flow.FlowBus;
 import com.yomahub.liteflow.flow.element.Chain;
 import com.yomahub.liteflow.flow.element.Condition;
-import com.yomahub.liteflow.flow.element.condition.LoopCondition;
 import com.yomahub.liteflow.log.LFLog;
 import com.yomahub.liteflow.log.LFLoggerManager;
 import com.yomahub.liteflow.property.LiteflowConfig;
@@ -34,6 +33,7 @@ import java.util.concurrent.TimeUnit;
  * 线程池工具类
  *
  * @author Bryan.Zhang
+ * @author jason
  */
 public class ExecutorHelper {
 
@@ -134,37 +134,6 @@ public class ExecutorHelper {
 		return getExecutorService(clazz);
 	}
 
-	//构造并行循环的线程池
-	public ExecutorService buildLoopParallelExecutor(LoopCondition loopCondition, Integer slotIndex) {
-		ExecutorService parallelExecutor;
-		LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
-		String chainId = DataBus.getSlot(slotIndex).getChainId();
-		Chain chain = FlowBus.getChain(chainId);
-
-		//线程池的优先级 condition层级>chain层级>全局体系
-		// 1、如果设置了线程池隔离,则每个 异步 都会有对应的线程池,这是为了避免多层嵌套时如果线程池数量不够时出现单个线程池死锁。用线程池隔离的方式会更加好
-		// 2、如果在chain上自定义线程池,同一个chain下的when+异步线程池共享一个线程池
-		// 3、默认全局一个线程池,所有的when+异步共享一个线程池
-
-		if (ObjectUtil.isNotEmpty(loopCondition.getThreadPoolExecutorClass())) {
-			//condition层级线程池
-			parallelExecutor = getExecutorService(loopCondition.getThreadPoolExecutorClass(),
-												  String.valueOf(loopCondition.hashCode()));
-
-		} else if (ObjectUtil.isNotEmpty(chain.getThreadPoolExecutorClass())) {
-			//chain层级线程池
-			parallelExecutor = getExecutorService(chain.getThreadPoolExecutorClass(),
-												  String.valueOf(chain.hashCode()));
-
-		} else {
-			//全局线程池
-			parallelExecutor = getExecutorService(liteflowConfig.getGlobalThreadPoolExecutorClass());
-
-		}
-
-		return parallelExecutor;
-	}
-
 	private ExecutorService getExecutorService(String clazz){
 		return getExecutorService(clazz, null);
 	}

+ 1 - 0
liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowAutoConfiguration.java

@@ -14,6 +14,7 @@ import org.noear.solon.annotation.Inject;
  *
  * @author Bryan.Zhang
  * @author noear
+ * @author jason
  * @since 2.9
  */
 @Configuration

+ 1 - 0
liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/solon/config/LiteflowProperty.java

@@ -13,6 +13,7 @@ import java.util.Map;
  *
  * @author Bryan.Zhang
  * @author noear
+ * @author jason
  * @since 2.9
  */
 @Inject("${liteflow}")

+ 1 - 0
liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java

@@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit;
  * 执行流程主要的参数类
  *
  * @author Bryan.Zhang
+ * @author jason
  */
 @ConfigurationProperties(prefix = "liteflow", ignoreUnknownFields = true)
 public class LiteflowProperty {

+ 1 - 0
liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java

@@ -13,6 +13,7 @@ import org.springframework.context.annotation.PropertySource;
  * 同时这里设置了默认的参数路径,如果在springboot的application.properties/yml里没取到的话,就取默认值
  *
  * @author Bryan.Zhang
+ * @author jason
  */
 @Configuration
 @EnableConfigurationProperties({ LiteflowProperty.class, LiteflowMonitorProperty.class })

+ 1 - 0
liteflow-testcase-el/liteflow-testcase-el-builder/src/test/java/com/yomahub/liteflow/test/builder/customTreadExecutor/CustomThreadExecutor1.java

@@ -11,6 +11,7 @@ import java.util.concurrent.ExecutorService;
  * EL表达式装配并执行测试
  *
  * @author gezuao
+ * @author jason
  * @since 2.11.1
  */
 public class CustomThreadExecutor1 implements ExecutorBuilder {