Przeglądaj źródła

feature #I4892Y slot这种结构体系,对于多个子线程进入同一个组件的情况下,不容易区分不同的传值。无法做到重用组件

bryan31 3 lat temu
rodzic
commit
004c639eb8

+ 8 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/core/NodeComponent.java

@@ -183,4 +183,12 @@ public abstract class NodeComponent {
 	public void setType(NodeTypeEnum type) {
 		this.type = type;
 	}
+
+	public <T> void sendPrivateDeliveryData(String nodeId, T t){
+		this.getSlot().setPrivateDeliveryData(nodeId, t);
+	}
+
+	public <T> T getPrivateDeliveryData(){
+		return this.getSlot().getPrivateDeliveryData(this.getNodeId());
+	}
 }

+ 37 - 10
liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/AbsSlot.java

@@ -7,6 +7,7 @@
  */
 package com.yomahub.liteflow.entity.data;
 
+import cn.hutool.core.util.StrUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Deque;
@@ -24,23 +25,25 @@ public abstract class AbsSlot implements Slot {
 
 	private static final Logger LOG = LoggerFactory.getLogger(Slot.class);
 
-	private static final String REQUEST = "request";
+	private static final String REQUEST = "_request";
 
-	private static final String RESPONSE = "response";
+	private static final String RESPONSE = "_response";
 
-	private static final String CHAINNAME = "chain_name";
+	private static final String CHAINNAME = "_chain_name";
 
-	private static final String COND_NODE_PREFIX = "cond_";
+	private static final String COND_NODE_PREFIX = "_cond_";
 
-	private static final String NODE_INPUT_PREFIX = "input_";
+	private static final String NODE_INPUT_PREFIX = "_input_";
 
-	private static final String NODE_OUTPUT_PREFIX = "output_";
+	private static final String NODE_OUTPUT_PREFIX = "_output_";
 
-	private static final String CHAIN_REQ_PREFIX = "chain_req_";
+	private static final String CHAIN_REQ_PREFIX = "_chain_req_";
 
-	private static final String REQUEST_ID = "req_id";
+	private static final String REQUEST_ID = "_req_id";
 
-	private static final String EXCEPTION = "exception";
+	private static final String EXCEPTION = "_exception";
+
+	private static final String PRIVATE_DELIVERY_PREFIX = "_private_d_";
 
 	private final Queue<CmpStep> executeSteps = new ConcurrentLinkedQueue<>();
 
@@ -94,6 +97,30 @@ public abstract class AbsSlot implements Slot {
 		dataMap.put(key, t);
 	}
 
+	public <T> void setPrivateDeliveryData(String nodeId, T t){
+		String privateDKey = PRIVATE_DELIVERY_PREFIX + nodeId;
+		synchronized (nodeId){
+			if (dataMap.containsKey(privateDKey)){
+				Queue<T> queue = (Queue<T>) dataMap.get(privateDKey);
+				queue.add(t);
+			}else{
+				Queue<T> queue = new ConcurrentLinkedQueue<>();
+				queue.add(t);
+				this.setData(privateDKey, queue);
+			}
+		}
+	}
+
+	public <T> T getPrivateDeliveryData(String nodeId){
+		String privateDKey = PRIVATE_DELIVERY_PREFIX + nodeId;
+		if(dataMap.containsKey(privateDKey)){
+			Queue<T> queue = (Queue<T>) dataMap.get(privateDKey);
+			return queue.poll();
+		}else{
+			return null;
+		}
+	}
+
 	public <T> void setCondResult(String key, T t){
 		dataMap.put(COND_NODE_PREFIX + key, t);
 	}
@@ -124,7 +151,7 @@ public abstract class AbsSlot implements Slot {
 				str.append("==>");
 			}
 		}
-		LOG.info("[{}]:CHAIN_NAME[{}]\n{}",getRequestId(),this.getChainName(),str.toString());
+		LOG.info("[{}]:CHAIN_NAME[{}]\n{}",getRequestId(),this.getChainName(), str);
 		return str.toString();
 	}
 

+ 4 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/Slot.java

@@ -55,4 +55,8 @@ public interface Slot {
 	void setException(Exception e);
 
 	Exception getException();
+
+	<T> void setPrivateDeliveryData(String nodeId, T t);
+
+	<T> T getPrivateDeliveryData(String nodeId);
 }

+ 51 - 0
liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/privateDelivery/PirvateDeliverySpringbootTest.java

@@ -0,0 +1,51 @@
+package com.yomahub.liteflow.test.privateDelivery;
+
+import cn.hutool.core.util.ReflectUtil;
+import com.yomahub.liteflow.core.FlowExecutor;
+import com.yomahub.liteflow.entity.data.DataBus;
+import com.yomahub.liteflow.entity.data.DefaultSlot;
+import com.yomahub.liteflow.entity.data.LiteflowResponse;
+import com.yomahub.liteflow.entity.data.Slot;
+import com.yomahub.liteflow.test.BaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import javax.annotation.Resource;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * springboot环境下隐私投递的测试
+ * @author Bryan.Zhang
+ * @since 2.5.0
+ */
+@RunWith(SpringRunner.class)
+@TestPropertySource(value = "classpath:/privateDelivery/application.properties")
+@SpringBootTest(classes = PirvateDeliverySpringbootTest.class)
+@EnableAutoConfiguration
+@ComponentScan({"com.yomahub.liteflow.test.privateDelivery.cmp"})
+public class PirvateDeliverySpringbootTest extends BaseTest {
+
+    @Resource
+    private FlowExecutor flowExecutor;
+
+    @Test
+    public void testSpringboot() throws Exception{
+        LiteflowResponse<DefaultSlot> response = flowExecutor.execute2Resp("chain1", "arg");
+        Set<Integer> set = response.getSlot().getData("testSet");
+        Assert.assertTrue(response.isSuccess());
+        Assert.assertEquals(100, set.size());
+    }
+}

+ 29 - 0
liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/privateDelivery/cmp/ACmp.java

@@ -0,0 +1,29 @@
+/**
+ * <p>Title: liteflow</p>
+ * <p>Description: 轻量级的组件式流程框架</p>
+ * @author Bryan.Zhang
+ * @email weenyc31@163.com
+ * @Date 2020/4/1
+ */
+package com.yomahub.liteflow.test.privateDelivery.cmp;
+
+import com.yomahub.liteflow.core.NodeComponent;
+import com.yomahub.liteflow.entity.data.Slot;
+import org.springframework.stereotype.Component;
+
+import java.util.HashSet;
+
+@Component("a")
+public class ACmp extends NodeComponent {
+
+	@Override
+	public void process() {
+		System.out.println("ACmp executed!");
+		Slot slot = getSlot();
+		slot.setData("testSet", new HashSet<>());
+
+		for (int i = 0; i < 100; i++) {
+			this.sendPrivateDeliveryData("b",i+1);
+		}
+	}
+}

+ 25 - 0
liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/privateDelivery/cmp/BCmp.java

@@ -0,0 +1,25 @@
+/**
+ * <p>Title: liteflow</p>
+ * <p>Description: 轻量级的组件式流程框架</p>
+ * @author Bryan.Zhang
+ * @email weenyc31@163.com
+ * @Date 2020/4/1
+ */
+package com.yomahub.liteflow.test.privateDelivery.cmp;
+
+import com.yomahub.liteflow.core.NodeComponent;
+import org.springframework.stereotype.Component;
+
+import java.util.Set;
+
+@Component("b")
+public class BCmp extends NodeComponent {
+
+	@Override
+	public void process() {
+		System.out.println("BCmp executed!");
+		Integer value = this.getPrivateDeliveryData();
+		Set<Integer> testSet = this.getSlot().getData("testSet");
+		testSet.add(value);
+	}
+}

+ 21 - 0
liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/privateDelivery/cmp/CCmp.java

@@ -0,0 +1,21 @@
+/**
+ * <p>Title: liteflow</p>
+ * <p>Description: 轻量级的组件式流程框架</p>
+ * @author Bryan.Zhang
+ * @email weenyc31@163.com
+ * @Date 2020/4/1
+ */
+package com.yomahub.liteflow.test.privateDelivery.cmp;
+
+import com.yomahub.liteflow.core.NodeComponent;
+import org.springframework.stereotype.Component;
+
+@Component("c")
+public class CCmp extends NodeComponent {
+
+	@Override
+	public void process() {
+		System.out.println("CCmp executed!");
+	}
+
+}

+ 1 - 0
liteflow-testcase-springboot/src/test/resources/privateDelivery/application.properties

@@ -0,0 +1 @@
+liteflow.rule-source=privateDelivery/flow.xml

+ 18 - 0
liteflow-testcase-springboot/src/test/resources/privateDelivery/flow.xml

@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<flow>
+    <chain name="chain1">
+        <then value="a"/>
+        <!-- 100个b组件并发 -->
+        <when value="b,b,b,b,b,b,b,b,b,b"/>
+        <when value="b,b,b,b,b,b,b,b,b,b"/>
+        <when value="b,b,b,b,b,b,b,b,b,b"/>
+        <when value="b,b,b,b,b,b,b,b,b,b"/>
+        <when value="b,b,b,b,b,b,b,b,b,b"/>
+        <when value="b,b,b,b,b,b,b,b,b,b"/>
+        <when value="b,b,b,b,b,b,b,b,b,b"/>
+        <when value="b,b,b,b,b,b,b,b,b,b"/>
+        <when value="b,b,b,b,b,b,b,b,b,b"/>
+        <when value="b,b,b,b,b,b,b,b,b,b"/>
+        <then value="c"/>
+    </chain>
+</flow>