|
@@ -11,7 +11,6 @@ import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
|
|
|
import com.yomahub.liteflow.flow.element.condition.PreCondition;
|
|
|
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
|
|
|
import com.yomahub.liteflow.flow.parallel.CompletableFutureExpand;
|
|
|
-import com.yomahub.liteflow.flow.parallel.CompletableFutureTimeout;
|
|
|
import com.yomahub.liteflow.flow.parallel.ParallelSupplier;
|
|
|
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
|
|
|
import com.yomahub.liteflow.log.LFLog;
|
|
@@ -89,20 +88,23 @@ public abstract class ParallelStrategyExecutor {
|
|
|
* 过滤 WHEN 待执行任务
|
|
|
* @param executableList 所有任务列表
|
|
|
* @param slotIndex
|
|
|
+ * @param currentChainId 当前执行的 chainId
|
|
|
* @return
|
|
|
*/
|
|
|
- protected Stream<Executable> filterWhenTaskList(List<Executable> executableList, Integer slotIndex) {
|
|
|
+ protected Stream<Executable> filterWhenTaskList(List<Executable> executableList, Integer slotIndex, String currentChainId) {
|
|
|
// 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了
|
|
|
// 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了
|
|
|
Stream<Executable> stream = executableList.stream()
|
|
|
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition));
|
|
|
- return filterAccess(stream, slotIndex);
|
|
|
+ return filterAccess(stream, slotIndex, currentChainId);
|
|
|
}
|
|
|
|
|
|
// 过滤 isAccess 的方法,默认实现,同时为避免同一个 node 的 isAccess 方法重复执行,给 node 设置 isAccess 方法执行结果
|
|
|
- protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
|
|
|
+ protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex, String currentChainId) {
|
|
|
return stream.filter(executable -> {
|
|
|
try {
|
|
|
+ // 提前设置 chainId,避免无法在 isAccess 方法中获取到
|
|
|
+ executable.setCurrChainId(currentChainId);
|
|
|
boolean access = executable.isAccess(slotIndex);
|
|
|
if (executable instanceof Node) {
|
|
|
((Node) executable).setAccessResult(access);
|
|
@@ -150,14 +152,14 @@ public abstract class ParallelStrategyExecutor {
|
|
|
String currChainName = whenCondition.getCurrChainId();
|
|
|
|
|
|
// 设置 whenCondition 参数
|
|
|
- setWhenConditionParams(whenCondition);
|
|
|
+ this.setWhenConditionParams(whenCondition);
|
|
|
|
|
|
// 获取 WHEN 所需线程池
|
|
|
ExecutorService parallelExecutor = getWhenExecutorService(whenCondition);
|
|
|
|
|
|
// 这里主要是做了封装 CompletableFuture 对象,用 lambda 表达式做了很多事情,这句代码要仔细理清
|
|
|
// 根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List<CompletableFuture<WhenFutureObj>>
|
|
|
- List<CompletableFuture<WhenFutureObj>> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex)
|
|
|
+ List<CompletableFuture<WhenFutureObj>> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainName)
|
|
|
.map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex))
|
|
|
.collect(Collectors.toList());
|
|
|
|