浏览代码

enhancement #I3DM92 集成asyncTool作为线程编排的核心

bryan31 3 年之前
父节点
当前提交
e84ae73018

+ 19 - 0
liteflow-async-tool/pom.xml

@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>liteflow</artifactId>
+        <groupId>com.yomahub</groupId>
+        <version>2.6.4</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>liteflow-async-tool</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+</project>

+ 20 - 0
liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/DefaultCallback.java

@@ -0,0 +1,20 @@
+package com.yomahub.liteflow.asynctool.callback;
+
+import com.yomahub.liteflow.asynctool.worker.WorkResult;
+
+/**
+ * 默认回调类,如果不设置的话,会默认给这个回调
+ * @author wuweifeng wrote on 2019-11-19.
+ */
+public class DefaultCallback<T, V> implements ICallback<T, V> {
+    @Override
+    public void begin() {
+        
+    }
+
+    @Override
+    public void result(boolean success, T param, WorkResult<V> workResult) {
+
+    }
+
+}

+ 21 - 0
liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/DefaultGroupCallback.java

@@ -0,0 +1,21 @@
+package com.yomahub.liteflow.asynctool.callback;
+
+import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper;
+
+import java.util.List;
+
+/**
+ * @author wuweifeng wrote on 2019-12-27
+ * @version 1.0
+ */
+public class DefaultGroupCallback implements IGroupCallback {
+    @Override
+    public void success(List<WorkerWrapper> workerWrappers) {
+
+    }
+
+    @Override
+    public void failure(List<WorkerWrapper> workerWrappers, Exception e) {
+
+    }
+}

+ 26 - 0
liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/ICallback.java

@@ -0,0 +1,26 @@
+package com.yomahub.liteflow.asynctool.callback;
+
+
+import com.yomahub.liteflow.asynctool.worker.WorkResult;
+
+/**
+ * 每个执行单元执行完毕后,会回调该接口</p>
+ * 需要监听执行结果的,实现该接口即可
+ *
+ * @author wuweifeng wrote on 2019-11-19.
+ */
+@FunctionalInterface
+public interface ICallback<T, V> {
+
+    /**
+     * 任务开始的监听
+     */
+    default void begin() {
+
+    }
+
+    /**
+     * 耗时操作执行完毕后,就给value注入值
+     */
+    void result(boolean success, T param, WorkResult<V> workResult);
+}

+ 20 - 0
liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IGroupCallback.java

@@ -0,0 +1,20 @@
+package com.yomahub.liteflow.asynctool.callback;
+
+import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper;
+
+import java.util.List;
+
+/**
+ * 如果是异步执行整组的话,可以用这个组回调。不推荐使用
+ * @author wuweifeng wrote on 2019-11-19.
+ */
+public interface IGroupCallback {
+    /**
+     * 成功后,可以从wrapper里去getWorkResult
+     */
+    void success(List<WorkerWrapper> workerWrappers);
+    /**
+     * 失败了,也可以从wrapper里去getWorkResult
+     */
+    void failure(List<WorkerWrapper> workerWrappers, Exception e);
+}

+ 20 - 0
liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/ITimeoutWorker.java

@@ -0,0 +1,20 @@
+package com.yomahub.liteflow.asynctool.callback;
+
+/**
+ * @author wuweifeng wrote on 2019-12-20
+ * @version 1.0
+ */
+public interface ITimeoutWorker<T, V> extends IWorker<T, V> {
+    /**
+     * 每个worker都可以设置超时时间
+     * @return 毫秒超时时间
+     */
+    long timeOut();
+
+    /**
+     * 是否开启单个执行单元的超时功能(有时是一个group设置个超时,而不具备关心单个worker的超时)
+     * <p>注意,如果开启了单个执行单元的超时检测,将使线程池数量多出一倍</p>
+     * @return 是否开启
+     */
+    boolean enableTimeOut();
+}

+ 30 - 0
liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IWorker.java

@@ -0,0 +1,30 @@
+package com.yomahub.liteflow.asynctool.callback;
+
+import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper;
+
+import java.util.Map;
+
+/**
+ * 每个最小执行单元需要实现该接口
+ *
+ * @author wuweifeng wrote on 2019-11-19.
+ */
+@FunctionalInterface
+public interface IWorker<T, V> {
+    /**
+     * 在这里做耗时操作,如rpc请求、IO等
+     *
+     * @param object      object
+     * @param allWrappers 任务包装
+     */
+    V action(T object, Map<String, WorkerWrapper> allWrappers);
+
+    /**
+     * 超时、异常时,返回的默认值
+     *
+     * @return 默认值
+     */
+    default V defaultValue() {
+        return null;
+    }
+}

+ 16 - 0
liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/exception/SkippedException.java

@@ -0,0 +1,16 @@
+package com.yomahub.liteflow.asynctool.exception;
+
+/**
+ * 如果任务在执行之前,自己后面的任务已经执行完或正在被执行,则抛该exception
+ * @author wuweifeng wrote on 2020-02-18
+ * @version 1.0
+ */
+public class SkippedException extends RuntimeException {
+    public SkippedException() {
+        super();
+    }
+
+    public SkippedException(String message) {
+        super(message);
+    }
+}

+ 160 - 0
liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/executor/Async.java

@@ -0,0 +1,160 @@
+package com.yomahub.liteflow.asynctool.executor;
+
+import com.yomahub.liteflow.asynctool.callback.DefaultGroupCallback;
+import com.yomahub.liteflow.asynctool.callback.IGroupCallback;
+import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
+/**
+ * 类入口,可以根据自己情况调整core线程的数量
+ * @author wuweifeng wrote on 2019-12-18
+ * @version 1.0
+ */
+public class Async {
+    /**
+     * 默认线程池
+     */
+    private static final ThreadPoolExecutor COMMON_POOL =
+            new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 1024,
+                    15L, TimeUnit.SECONDS,
+                    new LinkedBlockingQueue<>(),
+                    (ThreadFactory) Thread::new);
+    /**
+     * 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个
+     */
+    private static ExecutorService executorService;
+
+    /**
+     * 出发点
+     */
+    public static boolean beginWork(long timeout, ExecutorService executorService, List<WorkerWrapper> workerWrappers) throws ExecutionException, InterruptedException {
+        if(workerWrappers == null || workerWrappers.size() == 0) {
+            return false;
+        }
+        //保存线程池变量
+        Async.executorService = executorService;
+        //定义一个map,存放所有的wrapper,key为wrapper的唯一id,value是该wrapper,可以从value中获取wrapper的result
+        Map<String, WorkerWrapper> forParamUseWrappers = new ConcurrentHashMap<>();
+        CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()];
+        for (int i = 0; i < workerWrappers.size(); i++) {
+            WorkerWrapper wrapper = workerWrappers.get(i);
+            futures[i] = CompletableFuture.runAsync(() -> wrapper.work(executorService, timeout, forParamUseWrappers), executorService);
+        }
+        try {
+            CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
+            return true;
+        } catch (TimeoutException e) {
+            Set<WorkerWrapper> set = new HashSet<>();
+            totalWorkers(workerWrappers, set);
+            for (WorkerWrapper wrapper : set) {
+                wrapper.stopNow();
+            }
+            return false;
+        }
+    }
+
+    /**
+     * 如果想自定义线程池,请传pool。不自定义的话,就走默认的COMMON_POOL
+     */
+    public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
+        if(workerWrapper == null || workerWrapper.length == 0) {
+            return false;
+        }
+        List<WorkerWrapper> workerWrappers =  Arrays.stream(workerWrapper).collect(Collectors.toList());
+        return beginWork(timeout, executorService, workerWrappers);
+    }
+
+    /**
+     * 同步阻塞,直到所有都完成,或失败
+     */
+    public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
+        return beginWork(timeout, COMMON_POOL, workerWrapper);
+    }
+
+    public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
+        beginWorkAsync(timeout, COMMON_POOL, groupCallback, workerWrapper);
+    }
+
+    /**
+     * 异步执行,直到所有都完成,或失败后,发起回调
+     */
+    public static void beginWorkAsync(long timeout, ExecutorService executorService, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
+        if (groupCallback == null) {
+            groupCallback = new DefaultGroupCallback();
+        }
+        IGroupCallback finalGroupCallback = groupCallback;
+        if (executorService != null) {
+            executorService.submit(() -> {
+                try {
+                    boolean success = beginWork(timeout, executorService, workerWrapper);
+                    if (success) {
+                        finalGroupCallback.success(Arrays.asList(workerWrapper));
+                    } else {
+                        finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
+                    }
+                } catch (ExecutionException | InterruptedException e) {
+                    e.printStackTrace();
+                    finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
+                }
+            });
+        } else {
+            COMMON_POOL.submit(() -> {
+                try {
+                    boolean success = beginWork(timeout, COMMON_POOL, workerWrapper);
+                    if (success) {
+                        finalGroupCallback.success(Arrays.asList(workerWrapper));
+                    } else {
+                        finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
+                    }
+                } catch (ExecutionException | InterruptedException e) {
+                    e.printStackTrace();
+                    finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
+                }
+            });
+        }
+
+    }
+
+    /**
+     * 总共多少个执行单元
+     */
+    @SuppressWarnings("unchecked")
+    private static void totalWorkers(List<WorkerWrapper> workerWrappers, Set<WorkerWrapper> set) {
+        set.addAll(workerWrappers);
+        for (WorkerWrapper wrapper : workerWrappers) {
+            if (wrapper.getNextWrappers() == null) {
+                continue;
+            }
+            List<WorkerWrapper> wrappers = wrapper.getNextWrappers();
+            totalWorkers(wrappers, set);
+        }
+
+    }
+
+    /**
+     * 关闭线程池
+     */
+    public static void shutDown() {
+        shutDown(executorService);
+    }
+
+    /**
+     * 关闭线程池
+     */
+    public static void shutDown(ExecutorService executorService) {
+        if (executorService != null) {
+            executorService.shutdown();
+        } else {
+            COMMON_POOL.shutdown();
+        }
+    }
+
+    public static String getThreadCount() {
+        return "activeCount=" + COMMON_POOL.getActiveCount() +
+                "  completedCount " + COMMON_POOL.getCompletedTaskCount() +
+                "  largestCount " + COMMON_POOL.getLargestPoolSize();
+    }
+}

+ 51 - 0
liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/executor/timer/SystemClock.java

@@ -0,0 +1,51 @@
+package com.yomahub.liteflow.asynctool.executor.timer;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * 用于解决高并发下System.currentTimeMillis卡顿
+ * @author lry
+ */
+public class SystemClock {
+
+    private final int period;
+
+    private final AtomicLong now;
+
+    private static class InstanceHolder {
+        private static final SystemClock INSTANCE = new SystemClock(1);
+    }
+
+    private SystemClock(int period) {
+        this.period = period;
+        this.now = new AtomicLong(System.currentTimeMillis());
+        scheduleClockUpdating();
+    }
+
+    private static SystemClock instance() {
+        return InstanceHolder.INSTANCE;
+    }
+
+    private void scheduleClockUpdating() {
+        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> {
+            Thread thread = new Thread(runnable, "System Clock");
+            thread.setDaemon(true);
+            return thread;
+        });
+        scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS);
+    }
+
+    private long currentTimeMillis() {
+        return now.get();
+    }
+
+    /**
+     * 用来替换原来的System.currentTimeMillis()
+     */
+    public static long now() {
+        return instance().currentTimeMillis();
+    }
+}

+ 58 - 0
liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/DependWrapper.java

@@ -0,0 +1,58 @@
+package com.yomahub.liteflow.asynctool.worker;
+
+import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper;
+
+/**
+ * 对依赖的wrapper的封装
+ * @author wuweifeng wrote on 2019-12-20
+ * @version 1.0
+ */
+public class DependWrapper {
+    private WorkerWrapper<?, ?> dependWrapper;
+    /**
+     * 是否该依赖必须完成后才能执行自己.<p>
+     * 因为存在一个任务,依赖于多个任务,是让这多个任务全部完成后才执行自己,还是某几个执行完毕就可以执行自己
+     * 如
+     * 1
+     * ---3
+     * 2
+     * 或
+     * 1---3
+     * 2---3
+     * 这两种就不一样,上面的就是必须12都完毕,才能3
+     * 下面的就是1完毕就可以3
+     */
+    private boolean must = true;
+
+    public DependWrapper(WorkerWrapper<?, ?> dependWrapper, boolean must) {
+        this.dependWrapper = dependWrapper;
+        this.must = must;
+    }
+
+    public DependWrapper() {
+    }
+
+    public WorkerWrapper<?, ?> getDependWrapper() {
+        return dependWrapper;
+    }
+
+    public void setDependWrapper(WorkerWrapper<?, ?> dependWrapper) {
+        this.dependWrapper = dependWrapper;
+    }
+
+    public boolean isMust() {
+        return must;
+    }
+
+    public void setMust(boolean must) {
+        this.must = must;
+    }
+
+    @Override
+    public String toString() {
+        return "DependWrapper{" +
+                "dependWrapper=" + dependWrapper +
+                ", must=" + must +
+                '}';
+    }
+}

+ 12 - 0
liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/ResultState.java

@@ -0,0 +1,12 @@
+package com.yomahub.liteflow.asynctool.worker;
+
+/**
+ * 结果状态
+ * @author wuweifeng wrote on 2019-11-19.
+ */
+public enum ResultState {
+    SUCCESS,
+    TIMEOUT,
+    EXCEPTION,
+    DEFAULT  //默认状态
+}

+ 63 - 0
liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/WorkResult.java

@@ -0,0 +1,63 @@
+package com.yomahub.liteflow.asynctool.worker;
+
+/**
+ * 执行结果
+ */
+public class WorkResult<V> {
+    /**
+     * 执行的结果
+     */
+    private V result;
+    /**
+     * 结果状态
+     */
+    private ResultState resultState;
+    private Exception ex;
+
+    public WorkResult(V result, ResultState resultState) {
+        this(result, resultState, null);
+    }
+
+    public WorkResult(V result, ResultState resultState, Exception ex) {
+        this.result = result;
+        this.resultState = resultState;
+        this.ex = ex;
+    }
+
+    public static <V> WorkResult<V> defaultResult() {
+        return new WorkResult<>(null, ResultState.DEFAULT);
+    }
+
+    @Override
+    public String toString() {
+        return "WorkResult{" +
+                "result=" + result +
+                ", resultState=" + resultState +
+                ", ex=" + ex +
+                '}';
+    }
+
+    public Exception getEx() {
+        return ex;
+    }
+
+    public void setEx(Exception ex) {
+        this.ex = ex;
+    }
+
+    public V getResult() {
+        return result;
+    }
+
+    public void setResult(V result) {
+        this.result = result;
+    }
+
+    public ResultState getResultState() {
+        return resultState;
+    }
+
+    public void setResultState(ResultState resultState) {
+        this.resultState = resultState;
+    }
+}

+ 610 - 0
liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/wrapper/WorkerWrapper.java

@@ -0,0 +1,610 @@
+package com.yomahub.liteflow.asynctool.wrapper;
+
+
+import com.yomahub.liteflow.asynctool.callback.DefaultCallback;
+import com.yomahub.liteflow.asynctool.callback.ICallback;
+import com.yomahub.liteflow.asynctool.callback.IWorker;
+import com.yomahub.liteflow.asynctool.exception.SkippedException;
+import com.yomahub.liteflow.asynctool.executor.timer.SystemClock;
+import com.yomahub.liteflow.asynctool.worker.DependWrapper;
+import com.yomahub.liteflow.asynctool.worker.ResultState;
+import com.yomahub.liteflow.asynctool.worker.WorkResult;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * 对每个worker及callback进行包装,一对一
+ *
+ * @author wuweifeng wrote on 2019-11-19.
+ */
+public class WorkerWrapper<T, V> {
+    /**
+     * 该wrapper的唯一标识
+     */
+    private String id;
+    /**
+     * worker将来要处理的param
+     */
+    private T param;
+    private IWorker<T, V> worker;
+    private ICallback<T, V> callback;
+    /**
+     * 在自己后面的wrapper,如果没有,自己就是末尾;如果有一个,就是串行;如果有多个,有几个就需要开几个线程</p>
+     * -------2
+     * 1
+     * -------3
+     * 如1后面有2、3
+     */
+    private List<WorkerWrapper<?, ?>> nextWrappers;
+    /**
+     * 依赖的wrappers,有2种情况,1:必须依赖的全部完成后,才能执行自己 2:依赖的任何一个、多个完成了,就可以执行自己
+     * 通过must字段来控制是否依赖项必须完成
+     * 1
+     * -------3
+     * 2
+     * 1、2执行完毕后才能执行3
+     */
+    private List<DependWrapper> dependWrappers;
+    /**
+     * 标记该事件是否已经被处理过了,譬如已经超时返回false了,后续rpc又收到返回值了,则不再二次回调
+     * 经试验,volatile并不能保证"同一毫秒"内,多线程对该值的修改和拉取
+     * <p>
+     * 1-finish, 2-error, 3-working
+     */
+    private AtomicInteger state = new AtomicInteger(0);
+    /**
+     * 该map存放所有wrapper的id和wrapper映射
+     */
+    private Map<String, WorkerWrapper> forParamUseWrappers;
+    /**
+     * 也是个钩子变量,用来存临时的结果
+     */
+    private volatile WorkResult<V> workResult = WorkResult.defaultResult();
+    /**
+     * 是否在执行自己前,去校验nextWrapper的执行结果<p>
+     * 1   4
+     * -------3
+     * 2
+     * 如这种在4执行前,可能3已经执行完毕了(被2执行完后触发的),那么4就没必要执行了。
+     * 注意,该属性仅在nextWrapper数量<=1时有效,>1时的情况是不存在的
+     */
+    private volatile boolean needCheckNextWrapperResult = true;
+
+    private static final int FINISH = 1;
+    private static final int ERROR = 2;
+    private static final int WORKING = 3;
+    private static final int INIT = 0;
+
+    private WorkerWrapper(String id, IWorker<T, V> worker, T param, ICallback<T, V> callback) {
+        if (worker == null) {
+            throw new NullPointerException("async.worker is null");
+        }
+        this.worker = worker;
+        this.param = param;
+        this.id = id;
+        //允许不设置回调
+        if (callback == null) {
+            callback = new DefaultCallback<>();
+        }
+        this.callback = callback;
+    }
+
+    /**
+     * 开始工作
+     * fromWrapper代表这次work是由哪个上游wrapper发起的
+     */
+    private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
+        this.forParamUseWrappers = forParamUseWrappers;
+        //将自己放到所有wrapper的集合里去
+        forParamUseWrappers.put(id, this);
+        long now = SystemClock.now();
+        //总的已经超时了,就快速失败,进行下一个
+        if (remainTime <= 0) {
+            fastFail(INIT, null);
+            beginNext(executorService, now, remainTime);
+            return;
+        }
+        //如果自己已经执行过了。
+        //可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
+        if (getState() == FINISH || getState() == ERROR) {
+            beginNext(executorService, now, remainTime);
+            return;
+        }
+
+        //如果在执行前需要校验nextWrapper的状态
+        if (needCheckNextWrapperResult) {
+            //如果自己的next链上有已经出结果或已经开始执行的任务了,自己就不用继续了
+            if (!checkNextWrapperResult()) {
+                fastFail(INIT, new SkippedException());
+                beginNext(executorService, now, remainTime);
+                return;
+            }
+        }
+
+        //如果没有任何依赖,说明自己就是第一批要执行的
+        if (dependWrappers == null || dependWrappers.size() == 0) {
+            fire();
+            beginNext(executorService, now, remainTime);
+            return;
+        }
+
+        /*如果有前方依赖,存在两种情况
+         一种是前面只有一个wrapper。即 A  ->  B
+        一种是前面有多个wrapper。A C D ->   B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。
+        所以需要B来做判断,必须A、C、D都完成,自己才能执行 */
+
+        //只有一个依赖
+        if (dependWrappers.size() == 1) {
+            doDependsOneJob(fromWrapper);
+            beginNext(executorService, now, remainTime);
+        } else {
+            //有多个依赖时
+            doDependsJobs(executorService, dependWrappers, fromWrapper, now, remainTime);
+        }
+
+    }
+
+
+    public void work(ExecutorService executorService, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
+        work(executorService, null, remainTime, forParamUseWrappers);
+    }
+
+    /**
+     * 总控制台超时,停止所有任务
+     */
+    public void stopNow() {
+        if (getState() == INIT || getState() == WORKING) {
+            fastFail(getState(), null);
+        }
+    }
+
+    /**
+     * 判断自己下游链路上,是否存在已经出结果的或已经开始执行的
+     * 如果没有返回true,如果有返回false
+     */
+    private boolean checkNextWrapperResult() {
+        //如果自己就是最后一个,或者后面有并行的多个,就返回true
+        if (nextWrappers == null || nextWrappers.size() != 1) {
+            return getState() == INIT;
+        }
+        WorkerWrapper nextWrapper = nextWrappers.get(0);
+        boolean state = nextWrapper.getState() == INIT;
+        //继续校验自己的next的状态
+        return state && nextWrapper.checkNextWrapperResult();
+    }
+
+    /**
+     * 进行下一个任务
+     */
+    private void beginNext(ExecutorService executorService, long now, long remainTime) {
+        //花费的时间
+        long costTime = SystemClock.now() - now;
+        if (nextWrappers == null) {
+            return;
+        }
+        if (nextWrappers.size() == 1) {
+            nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);
+            return;
+        }
+        CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
+        for (int i = 0; i < nextWrappers.size(); i++) {
+            int finalI = i;
+            futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)
+                    .work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService);
+        }
+        try {
+            CompletableFuture.allOf(futures).get();
+        } catch (InterruptedException | ExecutionException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void doDependsOneJob(WorkerWrapper dependWrapper) {
+        if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) {
+            workResult = defaultResult();
+            fastFail(INIT, null);
+        } else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) {
+            workResult = defaultExResult(dependWrapper.getWorkResult().getEx());
+            fastFail(INIT, null);
+        } else {
+            //前面任务正常完毕了,该自己了
+            fire();
+        }
+    }
+
+    private synchronized void doDependsJobs(ExecutorService executorService, List<DependWrapper> dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) {
+        boolean nowDependIsMust = false;
+        //创建必须完成的上游wrapper集合
+        Set<DependWrapper> mustWrapper = new HashSet<>();
+        for (DependWrapper dependWrapper : dependWrappers) {
+            if (dependWrapper.isMust()) {
+                mustWrapper.add(dependWrapper);
+            }
+            if (dependWrapper.getDependWrapper().equals(fromWrapper)) {
+                nowDependIsMust = dependWrapper.isMust();
+            }
+        }
+
+        //如果全部是不必须的条件,那么只要到了这里,就执行自己。
+        if (mustWrapper.size() == 0) {
+            if (ResultState.TIMEOUT == fromWrapper.getWorkResult().getResultState()) {
+                fastFail(INIT, null);
+            } else {
+                fire();
+            }
+            beginNext(executorService, now, remainTime);
+            return;
+        }
+
+        //如果存在需要必须完成的,且fromWrapper不是必须的,就什么也不干
+        if (!nowDependIsMust) {
+            return;
+        }
+
+        //如果fromWrapper是必须的
+        boolean existNoFinish = false;
+        boolean hasError = false;
+        //先判断前面必须要执行的依赖任务的执行结果,如果有任何一个失败,那就不用走action了,直接给自己设置为失败,进行下一步就是了
+        for (DependWrapper dependWrapper : mustWrapper) {
+            WorkerWrapper workerWrapper = dependWrapper.getDependWrapper();
+            WorkResult tempWorkResult = workerWrapper.getWorkResult();
+            //为null或者isWorking,说明它依赖的某个任务还没执行到或没执行完
+            if (workerWrapper.getState() == INIT || workerWrapper.getState() == WORKING) {
+                existNoFinish = true;
+                break;
+            }
+            if (ResultState.TIMEOUT == tempWorkResult.getResultState()) {
+                workResult = defaultResult();
+                hasError = true;
+                break;
+            }
+            if (ResultState.EXCEPTION == tempWorkResult.getResultState()) {
+                workResult = defaultExResult(workerWrapper.getWorkResult().getEx());
+                hasError = true;
+                break;
+            }
+
+        }
+        //只要有失败的
+        if (hasError) {
+            fastFail(INIT, null);
+            beginNext(executorService, now, remainTime);
+            return;
+        }
+
+        //如果上游都没有失败,分为两种情况,一种是都finish了,一种是有的在working
+        //都finish的话
+        if (!existNoFinish) {
+            //上游都finish了,进行自己
+            fire();
+            beginNext(executorService, now, remainTime);
+            return;
+        }
+    }
+
+    /**
+     * 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
+     */
+    private void fire() {
+        //阻塞取结果
+        workResult = workerDoJob();
+    }
+
+    /**
+     * 快速失败
+     */
+    private boolean fastFail(int expect, Exception e) {
+        //试图将它从expect状态,改成Error
+        if (!compareAndSetState(expect, ERROR)) {
+            return false;
+        }
+
+        //尚未处理过结果
+        if (checkIsNullResult()) {
+            if (e == null) {
+                workResult = defaultResult();
+            } else {
+                workResult = defaultExResult(e);
+            }
+        }
+
+        callback.result(false, param, workResult);
+        return true;
+    }
+
+    /**
+     * 具体的单个worker执行任务
+     */
+    private WorkResult<V> workerDoJob() {
+        //避免重复执行
+        if (!checkIsNullResult()) {
+            return workResult;
+        }
+        try {
+            //如果已经不是init状态了,说明正在被执行或已执行完毕。这一步很重要,可以保证任务不被重复执行
+            if (!compareAndSetState(INIT, WORKING)) {
+                return workResult;
+            }
+
+            callback.begin();
+
+            //执行耗时操作
+            V resultValue = worker.action(param, forParamUseWrappers);
+
+            //如果状态不是在working,说明别的地方已经修改了
+            if (!compareAndSetState(WORKING, FINISH)) {
+                return workResult;
+            }
+
+            workResult.setResultState(ResultState.SUCCESS);
+            workResult.setResult(resultValue);
+            //回调成功
+            callback.result(true, param, workResult);
+
+            return workResult;
+        } catch (Exception e) {
+            //避免重复回调
+            if (!checkIsNullResult()) {
+                return workResult;
+            }
+            fastFail(WORKING, e);
+            return workResult;
+        }
+    }
+
+    public WorkResult<V> getWorkResult() {
+        return workResult;
+    }
+
+    public List<WorkerWrapper<?, ?>> getNextWrappers() {
+        return nextWrappers;
+    }
+
+    public void setParam(T param) {
+        this.param = param;
+    }
+
+    private boolean checkIsNullResult() {
+        return ResultState.DEFAULT == workResult.getResultState();
+    }
+
+    private void addDepend(WorkerWrapper<?, ?> workerWrapper, boolean must) {
+        addDepend(new DependWrapper(workerWrapper, must));
+    }
+
+    private void addDepend(DependWrapper dependWrapper) {
+        if (dependWrappers == null) {
+            dependWrappers = new ArrayList<>();
+        }
+        //如果依赖的是重复的同一个,就不重复添加了
+        for (DependWrapper wrapper : dependWrappers) {
+            if (wrapper.equals(dependWrapper)) {
+                return;
+            }
+        }
+        dependWrappers.add(dependWrapper);
+    }
+
+    private void addNext(WorkerWrapper<?, ?> workerWrapper) {
+        if (nextWrappers == null) {
+            nextWrappers = new ArrayList<>();
+        }
+        //避免添加重复
+        for (WorkerWrapper wrapper : nextWrappers) {
+            if (workerWrapper.equals(wrapper)) {
+                return;
+            }
+        }
+        nextWrappers.add(workerWrapper);
+    }
+
+    private void addNextWrappers(List<WorkerWrapper<?, ?>> wrappers) {
+        if (wrappers == null) {
+            return;
+        }
+        for (WorkerWrapper<?, ?> wrapper : wrappers) {
+            addNext(wrapper);
+        }
+    }
+
+    private void addDependWrappers(List<DependWrapper> dependWrappers) {
+        if (dependWrappers == null) {
+            return;
+        }
+        for (DependWrapper wrapper : dependWrappers) {
+            addDepend(wrapper);
+        }
+    }
+
+    private WorkResult<V> defaultResult() {
+        workResult.setResultState(ResultState.TIMEOUT);
+        workResult.setResult(worker.defaultValue());
+        return workResult;
+    }
+
+    private WorkResult<V> defaultExResult(Exception ex) {
+        workResult.setResultState(ResultState.EXCEPTION);
+        workResult.setResult(worker.defaultValue());
+        workResult.setEx(ex);
+        return workResult;
+    }
+
+
+    private int getState() {
+        return state.get();
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    private boolean compareAndSetState(int expect, int update) {
+        return this.state.compareAndSet(expect, update);
+    }
+
+    private void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
+        this.needCheckNextWrapperResult = needCheckNextWrapperResult;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)  {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        WorkerWrapper<?, ?> that = (WorkerWrapper<?, ?>) o;
+        return needCheckNextWrapperResult == that.needCheckNextWrapperResult &&
+                Objects.equals(param, that.param) &&
+                Objects.equals(worker, that.worker) &&
+                Objects.equals(callback, that.callback) &&
+                Objects.equals(nextWrappers, that.nextWrappers) &&
+                Objects.equals(dependWrappers, that.dependWrappers) &&
+                Objects.equals(state, that.state) &&
+                Objects.equals(workResult, that.workResult);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(param, worker, callback, nextWrappers, dependWrappers, state, workResult, needCheckNextWrapperResult);
+    }
+
+    public static class Builder<W, C> {
+        /**
+         * 该wrapper的唯一标识
+         */
+        private String id = UUID.randomUUID().toString();
+        /**
+         * worker将来要处理的param
+         */
+        private W param;
+        private IWorker<W, C> worker;
+        private ICallback<W, C> callback;
+        /**
+         * 自己后面的所有
+         */
+        private List<WorkerWrapper<?, ?>> nextWrappers;
+        /**
+         * 自己依赖的所有
+         */
+        private List<DependWrapper> dependWrappers;
+        /**
+         * 存储强依赖于自己的wrapper集合
+         */
+        private Set<WorkerWrapper<?, ?>> selfIsMustSet;
+
+        private boolean needCheckNextWrapperResult = true;
+
+        public Builder<W, C> worker(IWorker<W, C> worker) {
+            this.worker = worker;
+            return this;
+        }
+
+        public Builder<W, C> param(W w) {
+            this.param = w;
+            return this;
+        }
+
+        public Builder<W, C> id(String id) {
+            if (id != null) {
+                this.id = id;
+            }
+            return this;
+        }
+
+        public Builder<W, C> needCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
+            this.needCheckNextWrapperResult = needCheckNextWrapperResult;
+            return this;
+        }
+
+        public Builder<W, C> callback(ICallback<W, C> callback) {
+            this.callback = callback;
+            return this;
+        }
+
+        public Builder<W, C> depend(WorkerWrapper<?, ?>... wrappers) {
+            if (wrappers == null) {
+                return this;
+            }
+            for (WorkerWrapper<?, ?> wrapper : wrappers) {
+                depend(wrapper);
+            }
+            return this;
+        }
+
+        public Builder<W, C> depend(WorkerWrapper<?, ?> wrapper) {
+            return depend(wrapper, true);
+        }
+
+        public Builder<W, C> depend(WorkerWrapper<?, ?> wrapper, boolean isMust) {
+            if (wrapper == null) {
+                return this;
+            }
+            DependWrapper dependWrapper = new DependWrapper(wrapper, isMust);
+            if (dependWrappers == null) {
+                dependWrappers = new ArrayList<>();
+            }
+            dependWrappers.add(dependWrapper);
+            return this;
+        }
+
+        public Builder<W, C> next(WorkerWrapper<?, ?> wrapper) {
+            return next(wrapper, true);
+        }
+
+        public Builder<W, C> next(WorkerWrapper<?, ?> wrapper, boolean selfIsMust) {
+            if (nextWrappers == null) {
+                nextWrappers = new ArrayList<>();
+            }
+            nextWrappers.add(wrapper);
+
+            //强依赖自己
+            if (selfIsMust) {
+                if (selfIsMustSet == null) {
+                    selfIsMustSet = new HashSet<>();
+                }
+                selfIsMustSet.add(wrapper);
+            }
+            return this;
+        }
+
+        public Builder<W, C> next(WorkerWrapper<?, ?>... wrappers) {
+            if (wrappers == null) {
+                return this;
+            }
+            for (WorkerWrapper<?, ?> wrapper : wrappers) {
+                next(wrapper);
+            }
+            return this;
+        }
+
+        public WorkerWrapper<W, C> build() {
+            WorkerWrapper<W, C> wrapper = new WorkerWrapper<>(id, worker, param, callback);
+            wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult);
+            if (dependWrappers != null) {
+                for (DependWrapper workerWrapper : dependWrappers) {
+                    workerWrapper.getDependWrapper().addNext(wrapper);
+                    wrapper.addDepend(workerWrapper);
+                }
+            }
+            if (nextWrappers != null) {
+                for (WorkerWrapper<?, ?> workerWrapper : nextWrappers) {
+                    boolean must = false;
+                    if (selfIsMustSet != null && selfIsMustSet.contains(workerWrapper)) {
+                        must = true;
+                    }
+                    workerWrapper.addDepend(wrapper, must);
+                    wrapper.addNext(workerWrapper);
+                }
+            }
+
+            return wrapper;
+        }
+
+    }
+}

+ 5 - 0
liteflow-core/pom.xml

@@ -13,6 +13,11 @@
 	</parent>
 
 	<dependencies>
+		<dependency>
+			<groupId>com.yomahub</groupId>
+			<artifactId>liteflow-async-tool</artifactId>
+			<version>${project.version}</version>
+		</dependency>
 		<dependency>
 			<groupId>com.yomahub</groupId>
 			<artifactId>liteflow-script-common</artifactId>

+ 4 - 13
pom.xml

@@ -39,8 +39,8 @@
 	</scm>
 
     <properties>
-    	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-		<java.version>1.8</java.version>
+		<maven.compiler.source>8</maven.compiler.source>
+		<maven.compiler.target>8</maven.compiler.target>
 		<springboot.version>2.0.5.RELEASE</springboot.version>
 		<spring.version>5.0.9.RELEASE</spring.version>
 		<org.slf4j.version>1.7.21</org.slf4j.version>
@@ -175,16 +175,6 @@
 
     <build>
         <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-				<version>3.0</version>
-                <configuration>
-                    <encoding>UTF-8</encoding>
-                    <source>${java.version}</source>
-                    <target>${java.version}</target>
-                </configuration>
-            </plugin>
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-surefire-plugin</artifactId>
@@ -256,7 +246,8 @@
 		<module>liteflow-testcase-springnative</module>
 		<module>liteflow-testcase-script-qlexpress</module>
 		<module>liteflow-testcase-script-groovy</module>
-	</modules>
+        <module>liteflow-async-tool</module>
+    </modules>
 
 	<distributionManagement>
 		<snapshotRepository>