|
@@ -10,11 +10,14 @@ package com.yomahub.liteflow.entity.flow;
|
|
|
|
|
|
import cn.hutool.core.collection.CollUtil;
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
-import com.alibaba.ttl.TtlCallable;
|
|
|
+import com.alibaba.ttl.threadpool.TtlExecutors;
|
|
|
+import com.yomahub.liteflow.asynctool.executor.Async;
|
|
|
+import com.yomahub.liteflow.asynctool.worker.ResultState;
|
|
|
+import com.yomahub.liteflow.asynctool.worker.WorkResult;
|
|
|
+import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper;
|
|
|
import com.yomahub.liteflow.entity.data.DataBus;
|
|
|
import com.yomahub.liteflow.entity.data.Slot;
|
|
|
import com.yomahub.liteflow.enums.ExecuteTypeEnum;
|
|
|
-import com.yomahub.liteflow.exception.ChainEndException;
|
|
|
import com.yomahub.liteflow.exception.FlowSystemException;
|
|
|
import com.yomahub.liteflow.exception.WhenExecuteException;
|
|
|
import com.yomahub.liteflow.property.LiteflowConfig;
|
|
@@ -22,13 +25,9 @@ import com.yomahub.liteflow.property.LiteflowConfigGetter;
|
|
|
import com.yomahub.liteflow.util.ExecutorHelper;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
-
|
|
|
-import java.lang.reflect.Array;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
|
-import java.util.function.BiConsumer;
|
|
|
-import java.util.function.Consumer;
|
|
|
-import java.util.stream.IntStream;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* chain对象,实现可执行器
|
|
@@ -97,57 +96,62 @@ public class Chain implements Executable {
|
|
|
}
|
|
|
|
|
|
|
|
|
- // 使用线程池执行when并发流程
|
|
|
- private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) {
|
|
|
- final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size());
|
|
|
- final Map<String, Future<Boolean>> futureMap = new HashMap<>();
|
|
|
+ //使用线程池执行when并发流程
|
|
|
+ private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) throws Exception{
|
|
|
|
|
|
//此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的
|
|
|
- ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutor();
|
|
|
+ ExecutorService parallelExecutor = TtlExecutors.getTtlExecutorService(ExecutorHelper.loadInstance().buildExecutor());
|
|
|
+
|
|
|
|
|
|
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
|
|
|
|
|
- condition.getNodeList().forEach(executable -> {
|
|
|
- Future<Boolean> future = parallelExecutor.submit(
|
|
|
- Objects.requireNonNull(TtlCallable.get(new ParallelCallable(executable, slotIndex, requestId, latch)))
|
|
|
- );
|
|
|
- futureMap.put(executable.getExecuteName(), future);
|
|
|
- });
|
|
|
+ //封装asyncTool的workerWrapper对象
|
|
|
+ List<WorkerWrapper<Void, String>> parallelWorkerWrapperList = condition.getNodeList().stream()
|
|
|
+ .map(executable -> new WorkerWrapper.Builder<Void, String>()
|
|
|
+ .worker(new ParallelWorker(executable, slotIndex))
|
|
|
+ .next(new WorkerWrapper.Builder<Void, Void>().worker((object, allWrappers) -> Void.TYPE.newInstance()).build(), true)
|
|
|
+ .build())
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
|
boolean interrupted = false;
|
|
|
- try {
|
|
|
- if (!latch.await(liteflowConfig.getWhenMaxWaitSeconds(), TimeUnit.SECONDS)) {
|
|
|
-
|
|
|
- futureMap.forEach((name, f) -> {
|
|
|
- boolean flag = f.cancel(true);
|
|
|
- //如果flag为true,说明线程被成功cancel掉了,需要打出这个线程对应的执行器单元的name,说明这个线程超时了
|
|
|
- if (flag){
|
|
|
- LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", requestId, name);
|
|
|
- }
|
|
|
- });
|
|
|
- interrupted = true;
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
+ boolean asyncToolResult;
|
|
|
+
|
|
|
+ //这里利用asyncTool框架进行并行调用
|
|
|
+ try{
|
|
|
+ asyncToolResult = Async.beginWork(liteflowConfig.getWhenMaxWaitSeconds()*1000,
|
|
|
+ parallelExecutor,
|
|
|
+ parallelWorkerWrapperList.toArray(new WorkerWrapper[]{}));
|
|
|
+ }catch (Exception e){
|
|
|
+ throw new WhenExecuteException(StrUtil.format("requestId [{}] AsyncTool framework execution exception.", requestId));
|
|
|
+ }
|
|
|
+
|
|
|
+ //asyncToolResult为false,说明是timeout状态了
|
|
|
+ //遍历wrapper拿到worker,拿到defaultValue,其实就是nodeId,打印出来
|
|
|
+ if (!asyncToolResult){
|
|
|
+ parallelWorkerWrapperList.forEach(workerWrapper -> {
|
|
|
+ if(workerWrapper.getWorkResult().getResultState().equals(ResultState.TIMEOUT)){
|
|
|
+ LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]",
|
|
|
+ requestId, workerWrapper.getWorker().defaultValue());
|
|
|
+ }
|
|
|
+ });
|
|
|
interrupted = true;
|
|
|
}
|
|
|
|
|
|
- //当配置了errorResume = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException
|
|
|
+ //errorResume是一个condition里的参数,如果为true,表示即便出现了错误,也继续执行下一个condition
|
|
|
+ //当配置了errorResume = false,出现interrupted或者其中一个线程执行出错的情况,将抛出WhenExecuteException
|
|
|
if (!condition.isErrorResume()) {
|
|
|
if (interrupted) {
|
|
|
throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", requestId));
|
|
|
}
|
|
|
|
|
|
- futureMap.forEach((name, f) -> {
|
|
|
- try {
|
|
|
- if (!f.get()) {
|
|
|
- throw new WhenExecuteException(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", name, requestId));
|
|
|
- }
|
|
|
- } catch (InterruptedException | ExecutionException e) {
|
|
|
- throw new WhenExecuteException(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", name, requestId));
|
|
|
+ for (WorkerWrapper<Void, String> workerWrapper : parallelWorkerWrapperList){
|
|
|
+ WorkResult<String> workResult = workerWrapper.getWorkResult();
|
|
|
+ if (!workResult.getResultState().equals(ResultState.SUCCESS)){
|
|
|
+ throw workResult.getEx();
|
|
|
}
|
|
|
- });
|
|
|
+ }
|
|
|
} else if (interrupted) {
|
|
|
- // 这里由于配置了errorResume,所以只打印warn日志
|
|
|
+ // 这里由于配置了errorResume=true,所以只打印warn日志
|
|
|
LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", requestId);
|
|
|
}
|
|
|
}
|