Sfoglia il codice sorgente

enhancement 支持etcd分离chain以及脚本的存储结构

zendwang 2 anni fa
parent
commit
c71efffa6a

+ 57 - 0
liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdClient.java

@@ -5,16 +5,20 @@ import io.etcd.jetcd.ByteSequence;
 import io.etcd.jetcd.Client;
 import io.etcd.jetcd.KeyValue;
 import io.etcd.jetcd.Watch;
+import io.etcd.jetcd.options.GetOption;
+import io.etcd.jetcd.options.WatchOption;
 import io.etcd.jetcd.watch.WatchEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /**
  * Etcd 客户端封装类.
@@ -79,6 +83,42 @@ public class EtcdClient {
 		return prevKv;
 	}
 
+	/**
+	 * get node sub nodes.
+	 *
+	 * @param prefix    node prefix.
+	 * @param separator separator char
+	 * @return sub nodes
+	 * @throws ExecutionException   the exception
+	 * @throws InterruptedException the exception
+	 */
+	public List<String> getChildrenKeys(final String prefix, final String separator) throws ExecutionException, InterruptedException {
+		ByteSequence prefixByteSequence = ByteSequence.from(prefix, StandardCharsets.UTF_8);
+		GetOption getOption = GetOption.newBuilder()
+				.withPrefix(prefixByteSequence)
+				.withSortField(GetOption.SortTarget.KEY)
+				.withSortOrder(GetOption.SortOrder.ASCEND)
+				.build();
+
+		List<KeyValue> keyValues = client.getKVClient()
+				.get(prefixByteSequence, getOption)
+				.get()
+				.getKvs();
+
+		return keyValues.stream()
+				.map(e -> getSubNodeKeyName(prefix, e.getKey().toString(StandardCharsets.UTF_8), separator))
+				.distinct()
+				.filter(e -> Objects.nonNull(e))
+				.collect(Collectors.toList());
+	}
+
+	private String getSubNodeKeyName(final String prefix, final String fullPath, final String separator) {
+		if (prefix.length() > fullPath.length()) {
+			return null;
+		}
+		String pathWithoutPrefix = fullPath.substring(prefix.length());
+		return pathWithoutPrefix.contains(separator) ? pathWithoutPrefix.substring(1) : pathWithoutPrefix;
+	}
 	/**
 	 * subscribe data change.
 	 *
@@ -94,6 +134,23 @@ public class EtcdClient {
 		watchCache.put(key, watch);
 	}
 
+	/**
+	 * subscribe sub node change.
+	 *
+	 * @param key           param node name.
+	 * @param updateHandler sub node handler of update
+	 * @param deleteHandler sub node delete of delete
+	 */
+	public void watchChildChange(final String key,
+								 final BiConsumer<String, String> updateHandler,
+								 final Consumer<String> deleteHandler) {
+		Watch.Listener listener = watch(updateHandler, deleteHandler);
+		WatchOption option = WatchOption.newBuilder()
+				.withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8))
+				.build();
+		Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), option, listener);
+		watchCache.put(key, watch);
+	}
 	private Watch.Listener watch(final BiConsumer<String, String> updateHandler,
 	                             final Consumer<String> deleteHandler) {
 		return Watch.listener(response -> {

+ 16 - 13
liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/EtcdXmlELParser.java

@@ -39,11 +39,11 @@ public class EtcdXmlELParser extends ClassXmlFlowELParser {
 				throw new EtcdException("rule-source-ext-data is empty");
 			}
 
-			if (StrUtil.isBlank(etcdParserVO.getNodePath())){
-				etcdParserVO.setNodePath("/lite-flow/flow");
+			if (StrUtil.isBlank(etcdParserVO.getChainPath())){
+				throw new EtcdException("You must configure the chainPath property");
 			}
-			if (StrUtil.isBlank(etcdParserVO.getConnectStr())){
-				throw new EtcdException("Etcd connect string is empty");
+			if (StrUtil.isBlank(etcdParserVO.getEndpoints())){
+				throw new EtcdException("etcd endpoints is empty");
 			}
 
 			etcdParserHelper = new EtcdParserHelper(etcdParserVO);
@@ -54,17 +54,20 @@ public class EtcdXmlELParser extends ClassXmlFlowELParser {
 
 	@Override
 	public String parseCustom() {
-		Consumer<String> parseConsumer = t -> {
-			try {
-				parse(t);
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-		};
+
 		try {
 			String content = etcdParserHelper.getContent();
-			etcdParserHelper.checkContent(content);
-			etcdParserHelper.listen(parseConsumer);
+
+			Consumer<String> listenerConsumer = t -> {
+				try {
+					parse(t);
+				} catch (Exception e) {
+					throw new RuntimeException(e);
+				}
+			};
+
+			etcdParserHelper.listen(listenerConsumer);
+
 			return content;
 		} catch (Exception e){
 			throw new EtcdException(e.getMessage());

+ 161 - 19
liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/util/EtcdParserHelper.java

@@ -1,17 +1,23 @@
 package com.yomahub.liteflow.parser.etcd.util;
 
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.collection.CollectionUtil;
+import cn.hutool.core.util.CharsetUtil;
+import cn.hutool.core.util.ReUtil;
 import cn.hutool.core.util.StrUtil;
-import com.yomahub.liteflow.exception.ParseException;
-import com.yomahub.liteflow.parser.el.XmlFlowELParser;
 import com.yomahub.liteflow.parser.etcd.EtcdClient;
 import com.yomahub.liteflow.parser.etcd.exception.EtcdException;
 import com.yomahub.liteflow.parser.etcd.vo.EtcdParserVO;
 import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
+import io.etcd.jetcd.ByteSequence;
 import io.etcd.jetcd.Client;
+import io.etcd.jetcd.ClientBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
 import java.util.function.Consumer;
 
 /**
@@ -22,22 +28,38 @@ public class EtcdParserHelper {
 
 	private static final Logger LOG = LoggerFactory.getLogger(EtcdParserHelper.class);
 
+	private final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
+
+	private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
+
+	private final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
+
+	private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
+
+	private static final String SEPARATOR = "/";
+
 	private final EtcdParserVO etcdParserVO;
 
-	private EtcdClient etcdClient;
+	private EtcdClient client;
 
 	public EtcdParserHelper(EtcdParserVO etcdParserVO) {
 		this.etcdParserVO = etcdParserVO;
 
 		try{
 			try{
-				this.etcdClient  = ContextAwareHolder.loadContextAware().getBean(EtcdClient.class);
+				this.client  = ContextAwareHolder.loadContextAware().getBean(EtcdClient.class);
 			}catch (Exception ignored){}
-			if (this.etcdClient == null) {
-				Client client = Client.builder()
-						.endpoints(etcdParserVO.getConnectStr().split(","))
-						.build();
-				this.etcdClient = new EtcdClient(client);
+			if (this.client == null) {
+				ClientBuilder clientBuilder = Client.builder()
+						.endpoints(etcdParserVO.getEndpoints().split(","));
+				if (StrUtil.isNotBlank(etcdParserVO.getNamespace())) {
+					clientBuilder.namespace(ByteSequence.from(etcdParserVO.getNamespace(), CharsetUtil.CHARSET_UTF_8));
+				}
+				if (StrUtil.isAllNotBlank(etcdParserVO.getUser(), etcdParserVO.getPassword())) {
+					clientBuilder.user(ByteSequence.from(etcdParserVO.getUser(), CharsetUtil.CHARSET_UTF_8));
+					clientBuilder.password(ByteSequence.from(etcdParserVO.getPassword(), CharsetUtil.CHARSET_UTF_8));
+				}
+				this.client = new EtcdClient(clientBuilder.build());
 			}
 		}catch (Exception e){
 			throw new EtcdException(e.getMessage());
@@ -46,29 +68,149 @@ public class EtcdParserHelper {
 
 	public String getContent(){
 		try{
-			return this.etcdClient.get(etcdParserVO.getNodePath());
+			//检查zk上有没有chainPath节点
+//			if (client.get(etcdParserVO.getChainPath()) == null) {
+//				throw new EtcdException(StrUtil.format("etcd node[{}] is not exist", etcdParserVO.getChainPath()));
+//			}
+
+			//检查chainPath路径下有没有子节点
+			List<String> chainNameList = client.getChildrenKeys(etcdParserVO.getChainPath(), SEPARATOR);
+			if (CollectionUtil.isEmpty(chainNameList)){
+				throw new EtcdException(StrUtil.format("There are no chains in path [{}]", etcdParserVO.getChainPath()));
+			}
+
+			//获取chainPath路径下的所有子节点内容List
+			List<String> chainItemContentList = new ArrayList<>();
+			for (String chainName : chainNameList){
+				String chainData = client.get(StrUtil.format("{}/{}", etcdParserVO.getChainPath(), chainName));
+				if (StrUtil.isNotBlank(chainData)) {
+					chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData));
+				}
+			}
+			//合并成所有chain的xml内容
+			String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
+
+			//检查是否有脚本内容,如果有,进行脚本内容的获取
+			String scriptAllContent = StrUtil.EMPTY;
+			if (hasScript()){
+				List<String> scriptNodeValueList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR);
+
+				List<String> scriptItemContentList = new ArrayList<>();
+				for (String scriptNodeValue: scriptNodeValueList){
+					NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
+					if (Objects.isNull(nodeSimpleVO)){
+						throw new EtcdException(StrUtil.format("The name of the etcd node is invalid:{}", scriptNodeValue));
+					}
+					String scriptData = client.get(StrUtil.format("{}/{}", etcdParserVO.getScriptPath(), scriptNodeValue));
+
+					scriptItemContentList.add(
+							StrUtil.format(NODE_ITEM_XML_PATTERN,
+									nodeSimpleVO.getNodeId(),
+									nodeSimpleVO.getName(),
+									nodeSimpleVO.getType(),
+									scriptData)
+					);
+				}
+
+				scriptAllContent = StrUtil.format(NODE_XML_PATTERN, CollUtil.join(scriptItemContentList, StrUtil.EMPTY));
+			}
+
+			return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
 		}catch (Exception e){
 			throw new EtcdException(e.getMessage());
 		}
 	}
 
-	/**
-	 * 检查 content 是否合法
-	 */
-	public void checkContent(String content) {
-		if (StrUtil.isBlank(content)) {
-			String error = MessageFormat.format("the node[{0}] value is empty", etcdParserVO.getNodePath());
-			throw new ParseException(error);
+	public boolean hasScript(){
+		//没有配置scriptPath
+		if (StrUtil.isBlank(etcdParserVO.getScriptPath())){
+			return false;
+		}
+
+		try{
+			//配置了,但是不存在这个节点
+//			if (client.get(etcdParserVO.getScriptPath()) == null){
+//				return false;
+//			}
+
+			//存在这个节点,但是子节点不存在
+			List<String> chainNameList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR);
+			if (CollUtil.isEmpty(chainNameList)){
+				return false;
+			}
+
+			return true;
+		}catch (Exception e){
+			return false;
 		}
 	}
 
+
 	/**
 	 * 监听 etcd 节点
 	 */
 	public void listen(Consumer<String> parseConsumer) {
-		this.etcdClient.watchDataChange(this.etcdParserVO.getNodePath(), (updatePath, updateValue) -> {
+		this.client.watchChildChange(this.etcdParserVO.getChainPath(), (updatePath, updateValue) -> {
 			LOG.info("starting load flow config....");
 			parseConsumer.accept(updateValue);
 			}, null);
+		this.client.watchChildChange(this.etcdParserVO.getScriptPath(), (updatePath, updateValue) -> {
+			LOG.info("starting load flow config....");
+			parseConsumer.accept(updateValue);
+		}, null);
+	}
+
+	public NodeSimpleVO convert(String str){
+		//不需要去理解这串正则,就是一个匹配冒号的
+		//一定得是a:b,或是a:b:c...这种完整类型的字符串的
+		List<String> matchItemList = ReUtil.findAllGroup0("(?<=[^:]:)[^:]+|[^:]+(?=:[^:])", str);
+		if (CollUtil.isEmpty(matchItemList)){
+			return null;
+		}
+
+		NodeSimpleVO nodeSimpleVO = new NodeSimpleVO();
+		if (matchItemList.size() > 1){
+			nodeSimpleVO.setNodeId(matchItemList.get(0));
+			nodeSimpleVO.setType(matchItemList.get(1));
+		}
+
+		if (matchItemList.size() > 2){
+			nodeSimpleVO.setName(matchItemList.get(2));
+		}
+
+		return nodeSimpleVO;
+	}
+
+	private static class NodeSimpleVO{
+
+		private String nodeId;
+
+		private String type;
+
+		private String name="";
+
+		public String getNodeId() {
+			return nodeId;
+		}
+
+		public void setNodeId(String nodeId) {
+			this.nodeId = nodeId;
+		}
+
+		public String getType() {
+			return type;
+		}
+
+		public void setType(String type) {
+			this.type = type;
+		}
+
+		public String getName() {
+			return name;
+		}
+
+		public void setName(String name) {
+			this.name = name;
+		}
 	}
 }

+ 50 - 10
liteflow-rule-plugin/liteflow-rule-etcd/src/main/java/com/yomahub/liteflow/parser/etcd/vo/EtcdParserVO.java

@@ -7,23 +7,63 @@ package com.yomahub.liteflow.parser.etcd.vo;
  */
 public class EtcdParserVO {
 
-    private String connectStr;
+    private String endpoints;
 
-    private String nodePath;
+    private String user;
 
-    public String getConnectStr() {
-        return connectStr;
+    private String password;
+
+    private String namespace;
+
+    private String chainPath;
+
+    private String scriptPath;
+
+    public String getEndpoints() {
+        return endpoints;
+    }
+
+    public void setEndpoints(String endpoints) {
+        this.endpoints = endpoints;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public String getChainPath() {
+        return chainPath;
     }
 
-    public void setConnectStr(String connectStr) {
-        this.connectStr = connectStr;
+    public void setChainPath(String chainPath) {
+        this.chainPath = chainPath;
     }
 
-    public String getNodePath() {
-        return nodePath;
+    public String getScriptPath() {
+        return scriptPath;
     }
 
-    public void setNodePath(String nodePath) {
-        this.nodePath = nodePath;
+    public void setScriptPath(String scriptPath) {
+        this.scriptPath = scriptPath;
     }
 }

+ 6 - 0
liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/pom.xml

@@ -26,6 +26,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.yomahub</groupId>
+            <artifactId>liteflow-script-groovy</artifactId>
+            <version>${revision}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-test</artifactId>

+ 17 - 14
liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/java/com/yomahub/liteflow/test/etcd/EtcdWithXmlELSpringbootTest.java

@@ -8,6 +8,7 @@ import com.yomahub.liteflow.flow.LiteflowResponse;
 import com.yomahub.liteflow.parser.etcd.EtcdClient;
 import com.yomahub.liteflow.parser.etcd.EtcdXmlELParser;
 import com.yomahub.liteflow.parser.etcd.util.EtcdParserHelper;
+import com.yomahub.liteflow.slot.DefaultContext;
 import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
 import com.yomahub.liteflow.test.BaseTest;
 import org.junit.*;
@@ -34,14 +35,14 @@ import static org.mockito.Mockito.*;
  * springboot环境下的etcd 规则解析器 测试
  */
 @RunWith(SpringRunner.class)
-@TestPropertySource(value = "classpath:/etcd/application-xml-cluster.properties")
+@TestPropertySource(value = "classpath:/etcd/application-xml.properties")
 @SpringBootTest(classes = EtcdWithXmlELSpringbootTest.class)
 @EnableAutoConfiguration
 @ComponentScan({"com.yomahub.liteflow.test.etcd.cmp"})
 public class EtcdWithXmlELSpringbootTest extends BaseTest {
 
-    @MockBean
-    private EtcdClient etcdClient;
+    //@MockBean
+    //private EtcdClient etcdClient;
 
     @Resource
     private FlowExecutor flowExecutor;
@@ -58,29 +59,31 @@ public class EtcdWithXmlELSpringbootTest extends BaseTest {
 
     @Test
     public void testEtcdNodeWithXml1() throws Exception {
-        String flowXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow><chain name=\"chain1\">THEN(a, b, c);</chain></flow>";
-        when(etcdClient.get(anyString())).thenReturn(flowXml);
+        //String flowXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow><chain name=\"chain1\">THEN(a, b, c);</chain></flow>";
+        //when(etcdClient.get(anyString())).thenReturn(flowXml);
 
         LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
+        DefaultContext context = response.getFirstContextBean();
         Assert.assertTrue(response.isSuccess());
-        Assert.assertEquals("a==>b==>c", response.getExecuteStepStr());
+        Assert.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr());
+        Assert.assertEquals("hello", context.getData("test"));
     }
 
     @Test
     public void testEtcdNodeWithXml2() throws Exception {
-        String flowXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow><chain name=\"chain1\">THEN(a, b, c);</chain></flow>";
-        String changedFlowXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow><chain name=\"chain1\">THEN(a, c);</chain></flow>";
-        when(etcdClient.get(anyString())).thenReturn(flowXml).thenReturn(changedFlowXml);
-
+//        String flowXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow><chain name=\"chain1\">THEN(a, b, c);</chain></flow>";
+//        String changedFlowXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow><chain name=\"chain1\">THEN(a, c);</chain></flow>";
+//        when(etcdClient.get(anyString())).thenReturn(flowXml).thenReturn(changedFlowXml);
+//
         LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
         Assert.assertTrue(response.isSuccess());
-        Assert.assertEquals("a==>b==>c", response.getExecuteStepStr());
+        Assert.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr());
 
         // 手动触发一次 模拟节点数据变更
-        FlowBus.refreshFlowMetaData(FlowParserTypeEnum.TYPE_EL_XML,changedFlowXml);
-
+        //FlowBus.refreshFlowMetaData(FlowParserTypeEnum.TYPE_EL_XML,changedFlowXml);
+        Thread.sleep(9000);
         LiteflowResponse response2 = flowExecutor.execute2Resp("chain1", "arg");
         Assert.assertTrue(response2.isSuccess());
-        Assert.assertEquals("a==>c", response2.getExecuteStepStr());
+        Assert.assertEquals("a==>b==>s1[脚本s1]", response2.getExecuteStepStr());
     }
 }

+ 5 - 1
liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml-cluster.properties

@@ -1,2 +1,6 @@
-liteflow.rule-source-ext-data={"connectStr":"http://localhost:2379,http://localhost:3379,http://localhost:4379"}
+liteflow.rule-source-ext-data={\
+  "endpoints":"http://127.0.0.1:2379,http://127.0.0.1:3379,http://127.0.0.1:4379",\
+  "chainPath": "/liteflow/chain",\
+  "scriptPath": "/liteflow/script"\
+  }
 liteflow.parse-on-start=false

+ 5 - 1
liteflow-testcase-el/liteflow-testcase-el-etcd-springboot/src/test/resources/etcd/application-xml.properties

@@ -1 +1,5 @@
-liteflow.rule-source-ext-data={"connectStr":"http://localhost:2379"}
+liteflow.rule-source-ext-data={\
+  "endpoints":"http://127.0.0.1:2379",\
+  "chainPath": "/liteflow/chain",\
+  "scriptPath": "/liteflow/script"\
+  }