浏览代码

enhancement #I4GZ1Q 增强异步线程超时的情况下打印出具体超时节点的信息

bryan31 3 年之前
父节点
当前提交
6d113d1b2b

+ 25 - 17
liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java

@@ -22,10 +22,13 @@ import com.yomahub.liteflow.property.LiteflowConfigGetter;
 import com.yomahub.liteflow.util.ExecutorHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+
+import java.lang.reflect.Array;
+import java.util.*;
 import java.util.concurrent.*;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
 
 /**
  * chain对象,实现可执行器
@@ -122,27 +125,32 @@ public class Chain implements Executable {
     //  使用线程池执行when并发流程
     private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) {
         final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size());
-        final List<Future<Boolean>> futures = new ArrayList<>(condition.getNodeList().size());
+        final Map<String, Future<Boolean>> futureMap = new HashMap<>();
 
         //此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的
         ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutor();
 
         LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
 
-        for (int i = 0; i < condition.getNodeList().size(); i++) {
-            futures.add(parallelExecutor.submit(
-                    TtlCallable.get(new ParallelCallable(condition.getNodeList().get(i), slotIndex, requestId, latch, liteflowConfig.getRetryCount()))
-            ));
-        }
+        condition.getNodeList().forEach(executable -> {
+            Future<Boolean> future = parallelExecutor.submit(
+                    Objects.requireNonNull(TtlCallable.get(new ParallelCallable(executable, slotIndex, requestId, latch, liteflowConfig.getRetryCount())))
+            );
+            futureMap.put(executable.getExecuteName(), future);
+        });
 
         boolean interrupted = false;
         try {
             if (!latch.await(liteflowConfig.getWhenMaxWaitSeconds(), TimeUnit.SECONDS)) {
-                for (Future<Boolean> f : futures) {
-                    f.cancel(true);
-                }
+
+                futureMap.forEach((name, f) -> {
+                    boolean flag = f.cancel(true);
+                    //如果flag为true,说明线程被成功cancel掉了,需要打出这个线程对应的执行器单元的name,说明这个线程超时了
+                    if (flag){
+                        LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", requestId, name);
+                    }
+                });
                 interrupted = true;
-                LOG.warn("requestId [{}] executing async condition has reached max-wait-seconds, condition canceled.", requestId);
             }
         } catch (InterruptedException e) {
             interrupted = true;
@@ -154,15 +162,15 @@ public class Chain implements Executable {
                 throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", requestId));
             }
 
-            for (Future<Boolean> f : futures) {
+            futureMap.forEach((name, f) -> {
                 try {
                     if (!f.get()) {
-                        throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute failed. errorResume [false].", requestId));
+                        throw new WhenExecuteException(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", name, requestId));
                     }
                 } catch (InterruptedException | ExecutionException e) {
-                    throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute failed. errorResume [false].", requestId));
+                    throw new WhenExecuteException(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", name, requestId));
                 }
-            }
+            });
         } else if (interrupted) {
             //  这里由于配置了errorResume,所以只打印warn日志
             LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", requestId);

+ 41 - 0
liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/WhenTimeOutSpringbootTest.java

@@ -0,0 +1,41 @@
+package com.yomahub.liteflow.test.whenTimeOut;
+
+import cn.hutool.core.io.resource.ResourceUtil;
+import com.yomahub.liteflow.core.FlowExecutor;
+import com.yomahub.liteflow.entity.data.DefaultSlot;
+import com.yomahub.liteflow.entity.data.LiteflowResponse;
+import com.yomahub.liteflow.enums.FlowParserTypeEnum;
+import com.yomahub.liteflow.flow.FlowBus;
+import com.yomahub.liteflow.test.BaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import javax.annotation.Resource;
+
+/**
+ * springboot环境下异步线程超时日志打印测试
+ * @author Bryan.Zhang
+ * @since 2.6.4
+ */
+@RunWith(SpringRunner.class)
+@TestPropertySource(value = "classpath:/whenTimeOut/application.properties")
+@SpringBootTest(classes = WhenTimeOutSpringbootTest.class)
+@EnableAutoConfiguration
+@ComponentScan({"com.yomahub.liteflow.test.whenTimeOut.cmp"})
+public class WhenTimeOutSpringbootTest extends BaseTest {
+
+    @Resource
+    private FlowExecutor flowExecutor;
+
+    @Test
+    public void testWhenTimeOut() throws Exception{
+        LiteflowResponse<DefaultSlot> response = flowExecutor.execute2Resp("chain1", "arg");
+        Assert.assertTrue(response.isSuccess());
+    }
+}

+ 20 - 0
liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/ACmp.java

@@ -0,0 +1,20 @@
+/**
+ * <p>Title: liteflow</p>
+ * <p>Description: 轻量级的组件式流程框架</p>
+ * @author Bryan.Zhang
+ * @email weenyc31@163.com
+ * @Date 2020/4/1
+ */
+package com.yomahub.liteflow.test.whenTimeOut.cmp;
+
+import com.yomahub.liteflow.core.NodeComponent;
+import org.springframework.stereotype.Component;
+
+@Component("a")
+public class ACmp extends NodeComponent {
+
+	@Override
+	public void process() {
+		System.out.println("ACmp executed!");
+	}
+}

+ 26 - 0
liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/BCmp.java

@@ -0,0 +1,26 @@
+/**
+ * <p>Title: liteflow</p>
+ * <p>Description: 轻量级的组件式流程框架</p>
+ * @author Bryan.Zhang
+ * @email weenyc31@163.com
+ * @Date 2020/4/1
+ */
+package com.yomahub.liteflow.test.whenTimeOut.cmp;
+
+import com.yomahub.liteflow.core.NodeComponent;
+import org.springframework.stereotype.Component;
+
+@Component("b")
+public class BCmp extends NodeComponent {
+
+	@Override
+	public void process() {
+		try {
+			Thread.sleep(6000);
+		}catch (Exception ignored){
+
+		}
+		System.out.println("BCmp executed!");
+	}
+
+}

+ 26 - 0
liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/CCmp.java

@@ -0,0 +1,26 @@
+/**
+ * <p>Title: liteflow</p>
+ * <p>Description: 轻量级的组件式流程框架</p>
+ * @author Bryan.Zhang
+ * @email weenyc31@163.com
+ * @Date 2020/4/1
+ */
+package com.yomahub.liteflow.test.whenTimeOut.cmp;
+
+import com.yomahub.liteflow.core.NodeComponent;
+import org.springframework.stereotype.Component;
+
+@Component("c")
+public class CCmp extends NodeComponent {
+
+	@Override
+	public void process() {
+		try {
+			Thread.sleep(8000);
+		}catch (Exception ignored){
+
+		}
+		System.out.println("CCmp executed!");
+	}
+
+}

+ 2 - 0
liteflow-testcase-springboot/src/test/resources/whenTimeOut/application.properties

@@ -0,0 +1,2 @@
+liteflow.rule-source=whenTimeOut/flow.xml
+liteflow.when-max-wait-seconds=5

+ 6 - 0
liteflow-testcase-springboot/src/test/resources/whenTimeOut/flow.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<flow>
+    <chain name="chain1">
+        <when value="a,b,c"/>
+    </chain>
+</flow>