Ver código fonte

feature #I3ZVEA 流程组件支持重试

bryan31 3 anos atrás
pai
commit
1c4d673ffb

+ 17 - 11
liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java

@@ -96,10 +96,19 @@ public class Chain implements Executable {
         for (Condition condition : conditionList) {
             if (condition instanceof ThenCondition) {
                 for (Executable executableItem : condition.getNodeList()) {
-                    try {
-                        executableItem.execute(slotIndex);
-                    } catch (Exception e) {
-                        throw e;
+                    //进行重试循环判断,如果重试次数为0,则只进行一次循环
+                    for (int i = 0; i <= liteflowConfig.getRetryCount(); i++) {
+                        try {
+                            if (i > 0){
+                                LOG.info("[{}]:component[{}] performs {} retry", slot.getRequestId(), executableItem.getExecuteName(), i+1);
+                            }
+                            executableItem.execute(slotIndex);
+                            break;
+                        } catch (Exception e) {
+                            if (i >= liteflowConfig.getRetryCount()){
+                                throw e;
+                            }
+                        }
                     }
                 }
             } else if (condition instanceof WhenCondition) {
@@ -127,7 +136,7 @@ public class Chain implements Executable {
 
         for (int i = 0; i < condition.getNodeList().size(); i++) {
             futures.add(parallelExecutor.submit(
-                    new ParallelCallable(condition.getNodeList().get(i), slotIndex, requestId, latch)
+                    new ParallelCallable(condition.getNodeList().get(i), slotIndex, requestId, latch, liteflowConfig.getRetryCount())
             ));
         }
 
@@ -147,19 +156,16 @@ public class Chain implements Executable {
         //当配置了errorResume = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException
         if (!condition.isErrorResume()) {
             if (interrupted) {
-                throw new WhenExecuteException(StrUtil.format(
-                        "requestId [{}] when execute interrupted. errorResume [false].", requestId));
+                throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", requestId));
             }
 
             for (Future<Boolean> f : futures) {
                 try {
                     if (!f.get()) {
-                        throw new WhenExecuteException(StrUtil.format(
-                                "requestId [{}] when execute failed. errorResume [false].", requestId));
+                        throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute failed. errorResume [false].", requestId));
                     }
                 } catch (InterruptedException | ExecutionException e) {
-                    throw new WhenExecuteException(StrUtil.format(
-                            "requestId [{}] when execute failed. errorResume [false].", requestId));
+                    throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute failed. errorResume [false].", requestId));
                 }
             }
         } else if (interrupted) {

+ 26 - 12
liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java

@@ -14,31 +14,45 @@ public class ParallelCallable implements Callable<Boolean> {
 
     private static final Logger LOG = LoggerFactory.getLogger(ParallelCallable.class);
 
-    private Executable executableItem;
+    private final Executable executableItem;
 
-    private Integer slotIndex;
+    private final Integer slotIndex;
 
-    private String requestId;
+    private final String requestId;
 
-    private CountDownLatch latch;
+    private final CountDownLatch latch;
 
-    public ParallelCallable(Executable executableItem, Integer slotIndex, String requestId, CountDownLatch latch) {
+    private final int retryCount;
+
+    public ParallelCallable(Executable executableItem, Integer slotIndex, String requestId, CountDownLatch latch, int retryCount) {
         this.executableItem = executableItem;
         this.slotIndex = slotIndex;
         this.requestId = requestId;
         this.latch = latch;
+        this.retryCount = retryCount;
     }
 
     @Override
     public Boolean call() throws Exception {
         try {
-            executableItem.execute(slotIndex);
-
-            return true;
-        }catch(Exception e){
-            LOG.error("requestId [{}], item [{}] execute error", requestId, executableItem.getExecuteName());
-
-            return false;
+            boolean flag = true;
+            for (int i = 0; i <= retryCount; i++) {
+                try{
+                    if (i > 0){
+                        LOG.info("[{}]:component[{}] performs {} retry", requestId, executableItem.getExecuteName(), i+1);
+                    }
+                    executableItem.execute(slotIndex);
+                    flag = true;
+                    break;
+                }catch (Exception e){
+                    if (i >= retryCount){
+                        LOG.error("requestId [{}], item [{}] execute error", requestId, executableItem.getExecuteName());
+                        flag = false;
+                        break;
+                    }
+                }
+            }
+            return flag;
         } finally {
             latch.countDown();
         }

+ 38 - 22
liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java

@@ -1,6 +1,7 @@
 /**
  * <p>Title: liteflow</p>
  * <p>Description: 轻量级的组件式流程框架</p>
+ *
  * @author Bryan.Zhang
  * @email weenyc31@163.com
  * @Date 2021/3/18
@@ -59,10 +60,13 @@ public class LiteflowConfig {
     //但是要注意,不能将主流程和子流程分配在不同类型配置文件中
     private Boolean supportMultipleType;
 
+    //重试次数
+    private Integer retryCount;
+
     public Boolean getEnable() {
-        if (ObjectUtil.isNull(enable)){
+        if (ObjectUtil.isNull(enable)) {
             return true;
-        }else{
+        } else {
             return enable;
         }
     }
@@ -80,9 +84,9 @@ public class LiteflowConfig {
     }
 
     public Integer getSlotSize() {
-        if (ObjectUtil.isNull(slotSize)){
+        if (ObjectUtil.isNull(slotSize)) {
             return 1024;
-        }else{
+        } else {
             return slotSize;
         }
     }
@@ -92,9 +96,9 @@ public class LiteflowConfig {
     }
 
     public Integer getWhenMaxWaitSeconds() {
-        if (ObjectUtil.isNull(whenMaxWaitSeconds)){
+        if (ObjectUtil.isNull(whenMaxWaitSeconds)) {
             return 15;
-        }else{
+        } else {
             return whenMaxWaitSeconds;
         }
     }
@@ -104,9 +108,9 @@ public class LiteflowConfig {
     }
 
     public Integer getQueueLimit() {
-        if (ObjectUtil.isNull(queueLimit)){
+        if (ObjectUtil.isNull(queueLimit)) {
             return 200;
-        }else{
+        } else {
             return queueLimit;
         }
     }
@@ -116,9 +120,9 @@ public class LiteflowConfig {
     }
 
     public Long getDelay() {
-        if (ObjectUtil.isNull(delay)){
+        if (ObjectUtil.isNull(delay)) {
             return 300000L;
-        }else{
+        } else {
             return delay;
         }
     }
@@ -128,9 +132,9 @@ public class LiteflowConfig {
     }
 
     public Long getPeriod() {
-        if (ObjectUtil.isNull(period)){
+        if (ObjectUtil.isNull(period)) {
             return 300000L;
-        }else{
+        } else {
             return period;
         }
     }
@@ -140,9 +144,9 @@ public class LiteflowConfig {
     }
 
     public Boolean getEnableLog() {
-        if (ObjectUtil.isNull(enableLog)){
+        if (ObjectUtil.isNull(enableLog)) {
             return false;
-        }else{
+        } else {
             return enableLog;
         }
     }
@@ -152,9 +156,9 @@ public class LiteflowConfig {
     }
 
     public Integer getWhenMaxWorkers() {
-        if (ObjectUtil.isNull(whenMaxWorkers)){
+        if (ObjectUtil.isNull(whenMaxWorkers)) {
             return Runtime.getRuntime().availableProcessors() * 2;
-        }else{
+        } else {
             return whenMaxWorkers;
         }
     }
@@ -164,9 +168,9 @@ public class LiteflowConfig {
     }
 
     public Integer getWhenQueueLimit() {
-        if (ObjectUtil.isNull(whenQueueLimit)){
+        if (ObjectUtil.isNull(whenQueueLimit)) {
             return 100;
-        }else{
+        } else {
             return whenQueueLimit;
         }
     }
@@ -176,9 +180,9 @@ public class LiteflowConfig {
     }
 
     public Boolean isParseOnStart() {
-        if (ObjectUtil.isNull(parseOnStart)){
+        if (ObjectUtil.isNull(parseOnStart)) {
             return true;
-        }else{
+        } else {
             return parseOnStart;
         }
     }
@@ -188,9 +192,9 @@ public class LiteflowConfig {
     }
 
     public Boolean isSupportMultipleType() {
-        if (ObjectUtil.isNull(supportMultipleType)){
+        if (ObjectUtil.isNull(supportMultipleType)) {
             return true;
-        }else{
+        } else {
             return supportMultipleType;
         }
     }
@@ -198,4 +202,16 @@ public class LiteflowConfig {
     public void setSupportMultipleType(Boolean supportMultipleType) {
         this.supportMultipleType = supportMultipleType;
     }
+
+    public int getRetryCount() {
+        if (ObjectUtil.isNull(retryCount) || retryCount < 0) {
+            return 0;
+        } else {
+            return retryCount;
+        }
+    }
+
+    public void setRetryCount(int retryCount) {
+        this.retryCount = retryCount;
+    }
 }

+ 11 - 0
liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java

@@ -35,6 +35,9 @@ public class LiteflowProperty {
     //但是要注意,不能将主流程和子流程分配在不同类型配置文件中
     private boolean supportMultipleType;
 
+    //重试次数
+    private int retryCount;
+
     public boolean isEnable() {
         return enable;
     }
@@ -98,4 +101,12 @@ public class LiteflowProperty {
     public void setSupportMultipleType(boolean supportMultipleType) {
         this.supportMultipleType = supportMultipleType;
     }
+
+    public int getRetryCount() {
+        return retryCount;
+    }
+
+    public void setRetryCount(int retryCount) {
+        this.retryCount = retryCount;
+    }
 }

+ 1 - 0
liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowPropertyAutoConfiguration.java

@@ -35,6 +35,7 @@ public class LiteflowPropertyAutoConfiguration {
         liteflowConfig.setParseOnStart(property.isParseOnStart());
         liteflowConfig.setEnable(property.isEnable());
         liteflowConfig.setSupportMultipleType(property.isSupportMultipleType());
+        liteflowConfig.setRetryCount(property.getRetryCount());
         return liteflowConfig;
     }
 }

+ 8 - 1
liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json

@@ -51,7 +51,14 @@
       "type": "java.lang.Boolean",
       "description": "Whether to support multiple types of configuration.",
       "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
-      "defaultValue": true
+      "defaultValue": false
+    },
+    {
+      "name": "liteflow.retry_count",
+      "type": "java.lang.Integer",
+      "description": "Number of component retries.",
+      "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty",
+      "defaultValue": 0
     },
     {
       "name": "liteflow.monitor.enable-log",

+ 1 - 0
liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties

@@ -5,6 +5,7 @@ liteflow.when-max-wait-seconds=15
 liteflow.when-max-workers=4
 liteflow.when-queue-limit=512
 liteflow.parse-on-start=true
+liteflow.retry_count=0
 liteflow.support-multiple-type=false
 liteflow.monitor.enable-log=false
 liteflow.monitor.queue-limit=200

+ 39 - 0
liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/retry/LiteflowRetrySpringbootTest.java

@@ -0,0 +1,39 @@
+package com.yomahub.liteflow.test.retry;
+
+import com.yomahub.liteflow.core.FlowExecutor;
+import com.yomahub.liteflow.entity.data.DefaultSlot;
+import com.yomahub.liteflow.entity.data.LiteflowResponse;
+import com.yomahub.liteflow.test.BaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+
+
+/**
+ * 测试springboot下的enable参数
+ * @author Bryan.Zhang
+ * @since 2.5.10
+ */
+@RunWith(SpringRunner.class)
+@TestPropertySource(value = "classpath:/retry/application.properties")
+@SpringBootTest(classes = LiteflowRetrySpringbootTest.class)
+@EnableAutoConfiguration
+@ComponentScan({"com.yomahub.liteflow.test.retry.cmp"})
+public class LiteflowRetrySpringbootTest extends BaseTest {
+
+    @Autowired
+    private FlowExecutor flowExecutor;
+
+    @Test
+    public void testConfig() {
+        LiteflowResponse<DefaultSlot> response = flowExecutor.execute2Resp("chain1", "arg");
+        Assert.assertTrue(response.isSuccess());
+        Assert.assertEquals("a==>b==>b==>b==>c==>a==>d", response.getSlot().printStep());
+    }
+}

+ 20 - 0
liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/retry/cmp/ACmp.java

@@ -0,0 +1,20 @@
+/**
+ * <p>Title: liteflow</p>
+ * <p>Description: 轻量级的组件式流程框架</p>
+ * @author Bryan.Zhang
+ * @email weenyc31@163.com
+ * @Date 2020/4/1
+ */
+package com.yomahub.liteflow.test.retry.cmp;
+
+import com.yomahub.liteflow.annotation.LiteflowComponent;
+import com.yomahub.liteflow.core.NodeComponent;
+
+@LiteflowComponent("a")
+public class ACmp extends NodeComponent {
+
+	@Override
+	public void process() {
+		System.out.println("ACmp executed!");
+	}
+}

+ 27 - 0
liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/retry/cmp/BCmp.java

@@ -0,0 +1,27 @@
+/**
+ * <p>Title: liteflow</p>
+ * <p>Description: 轻量级的组件式流程框架</p>
+ * @author Bryan.Zhang
+ * @email weenyc31@163.com
+ * @Date 2020/4/1
+ */
+package com.yomahub.liteflow.test.retry.cmp;
+
+import com.yomahub.liteflow.annotation.LiteflowComponent;
+import com.yomahub.liteflow.core.NodeComponent;
+
+@LiteflowComponent("b")
+public class BCmp extends NodeComponent {
+
+	private int flag = 0;
+
+	@Override
+	public void process() {
+		System.out.println("BCmp executed!");
+		if (flag < 2){
+			flag++;
+			throw new RuntimeException("demo exception");
+		}
+	}
+
+}

+ 21 - 0
liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/retry/cmp/CCmp.java

@@ -0,0 +1,21 @@
+/**
+ * <p>Title: liteflow</p>
+ * <p>Description: 轻量级的组件式流程框架</p>
+ * @author Bryan.Zhang
+ * @email weenyc31@163.com
+ * @Date 2020/4/1
+ */
+package com.yomahub.liteflow.test.retry.cmp;
+
+import com.yomahub.liteflow.annotation.LiteflowComponent;
+import com.yomahub.liteflow.core.NodeComponent;
+
+@LiteflowComponent("c")
+public class CCmp extends NodeComponent {
+
+	@Override
+	public void process() {
+		System.out.println("CCmp executed!");
+	}
+
+}

+ 21 - 0
liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/retry/cmp/DCmp.java

@@ -0,0 +1,21 @@
+/**
+ * <p>Title: liteflow</p>
+ * <p>Description: 轻量级的组件式流程框架</p>
+ * @author Bryan.Zhang
+ * @email weenyc31@163.com
+ * @Date 2020/4/1
+ */
+package com.yomahub.liteflow.test.retry.cmp;
+
+import com.yomahub.liteflow.annotation.LiteflowComponent;
+import com.yomahub.liteflow.core.NodeComponent;
+
+@LiteflowComponent("d")
+public class DCmp extends NodeComponent {
+
+	@Override
+	public void process() {
+		System.out.println("DCmp executed!");
+	}
+
+}

+ 2 - 0
liteflow-spring-boot-starter/src/test/resources/retry/application.properties

@@ -0,0 +1,2 @@
+liteflow.rule-source=retry/flow.xml
+liteflow.retry-count=3

+ 10 - 0
liteflow-spring-boot-starter/src/test/resources/retry/flow.xml

@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<flow>
+    <chain name="chain1">
+        <then value="a,b,chain2"/>
+    </chain>
+
+    <chain name="chain2">
+        <then value="c,a,d"/>
+    </chain>
+</flow>