Browse Source

enhancement #I4FSHW 优雅刷新配置的支持

bryan31 3 years ago
parent
commit
8be6d00c82

+ 2 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java

@@ -257,8 +257,9 @@ public class FlowExecutor {
         return null;
     }
 
+    //此方法就是从原有的配置源主动拉取新的进行刷新
+    //和FlowBus.refreshFlowMetaData的区别就是一个为主动拉取,一个为被动监听到新的内容进行刷新
     public void reloadRule() {
-        FlowBus.cleanCache();
         init();
     }
 

+ 9 - 8
liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java

@@ -25,9 +25,12 @@ import com.yomahub.liteflow.exception.NodeTypeNotSupportException;
 import com.yomahub.liteflow.parser.LocalJsonFlowParser;
 import com.yomahub.liteflow.parser.LocalXmlFlowParser;
 import com.yomahub.liteflow.parser.LocalYmlFlowParser;
+import com.yomahub.liteflow.property.LiteflowConfig;
+import com.yomahub.liteflow.property.LiteflowConfigGetter;
 import com.yomahub.liteflow.script.ScriptExecutor;
 import com.yomahub.liteflow.script.ScriptExecutorFactory;
 import com.yomahub.liteflow.script.exception.ScriptSpiException;
+import com.yomahub.liteflow.util.CopyOnWriteHashMap;
 import com.yomahub.liteflow.util.SpringAware;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,9 +47,9 @@ public class FlowBus {
 
     private static final Logger LOG = LoggerFactory.getLogger(FlowBus.class);
 
-    private static final Map<String, Chain> chainMap = new HashMap<>();
+    private static final Map<String, Chain> chainMap = new CopyOnWriteHashMap<>();
 
-    private static final Map<String, Node> nodeMap = new HashMap<>();
+    private static final Map<String, Node> nodeMap = new CopyOnWriteHashMap<>();
 
     private FlowBus() {
     }
@@ -75,12 +78,10 @@ public class FlowBus {
     }
 
     public static void addSpringScanNode(String nodeId, NodeComponent nodeComponent) {
-        if (containNode(nodeId)) return;
         nodeMap.put(nodeId, new Node(ComponentInitializer.loadInstance().initComponent(nodeComponent, NodeTypeEnum.COMMON, null, nodeId)));
     }
 
     public static void addCommonNode(String nodeId, String name, String cmpClazzStr) throws Exception {
-        if (containNode(nodeId)) return;
         Class<NodeComponent> cmpClazz = (Class<NodeComponent>) Class.forName(cmpClazzStr);
         addNode(nodeId, name, NodeTypeEnum.COMMON, cmpClazz, null);
     }
@@ -90,17 +91,14 @@ public class FlowBus {
     }
 
     public static void addCommonScriptNode(String nodeId, String name, String script){
-        if (containNode(nodeId)) return;
         addNode(nodeId, name, NodeTypeEnum.SCRIPT, ScriptComponent.class, script);
     }
 
     public static void addCondScriptNode(String nodeId, String name, String script){
-        if (containNode(nodeId)) return;
         addNode(nodeId, name, NodeTypeEnum.COND_SCRIPT, ScriptCondComponent.class, script);
     }
 
     private static void addNode(String nodeId, String name, NodeTypeEnum type, Class<? extends NodeComponent> cmpClazz, String script) {
-        if (containNode(nodeId)) return;
         try {
             //以node方式配置,本质上是为了适配无spring的环境,如果有spring环境,其实不用这么配置
             //这里的逻辑是判断是否能从spring上下文中取到,如果没有spring,则就是new instance了
@@ -149,6 +147,10 @@ public class FlowBus {
     public static void cleanCache() {
         chainMap.clear();
         nodeMap.clear();
+        cleanScriptCache();
+    }
+
+    public static void cleanScriptCache() {
         //如果引入了脚本组件SPI,则还需要清理脚本的缓存
         try{
             ScriptExecutor scriptExecutor = ScriptExecutorFactory.loadInstance().getScriptExecutor();
@@ -160,7 +162,6 @@ public class FlowBus {
 
     //目前这种方式刷新不完全平滑
     public static void refreshFlowMetaData(FlowParserTypeEnum type, String content) throws Exception {
-        FlowBus.cleanCache();
         if (type.equals(FlowParserTypeEnum.TYPE_XML)) {
             new LocalXmlFlowParser().parse(content);
         } else if (type.equals(FlowParserTypeEnum.TYPE_JSON)) {

+ 14 - 21
liteflow-core/src/main/java/com/yomahub/liteflow/parser/JsonFlowParser.java

@@ -85,24 +85,20 @@ public abstract class JsonFlowParser extends FlowParser {
                         //这里区分是普通java节点还是脚本节点
                         //如果是脚本节点,又区分是普通脚本节点,还是条件脚本节点
                         if (nodeTypeEnum.equals(NodeTypeEnum.COMMON) && StrUtil.isNotBlank(clazz)){
-                            if (!FlowBus.containNode(id)){
-                                FlowBus.addCommonNode(id, name, clazz);
-                            }
+                            FlowBus.addCommonNode(id, name, clazz);
                         }else if(nodeTypeEnum.equals(NodeTypeEnum.SCRIPT) || nodeTypeEnum.equals(NodeTypeEnum.COND_SCRIPT)){
-                            if (!FlowBus.containNode(id)){
-                                //如果file字段不为空,则优先从file里面读取脚本文本
-                                if (StrUtil.isNotBlank(file)){
-                                    script = ResourceUtil.readUtf8Str(StrUtil.format("classpath: {}", file));
-                                }else{
-                                    script = nodeObject.getString("value");
-                                }
-
-                                //根据节点类型把脚本添加到元数据里
-                                if (nodeTypeEnum.equals(NodeTypeEnum.SCRIPT)){
-                                    FlowBus.addCommonScriptNode(id, name, script);
-                                }else {
-                                    FlowBus.addCondScriptNode(id, name, script);
-                                }
+                            //如果file字段不为空,则优先从file里面读取脚本文本
+                            if (StrUtil.isNotBlank(file)){
+                                script = ResourceUtil.readUtf8Str(StrUtil.format("classpath: {}", file));
+                            }else{
+                                script = nodeObject.getString("value");
+                            }
+
+                            //根据节点类型把脚本添加到元数据里
+                            if (nodeTypeEnum.equals(NodeTypeEnum.SCRIPT)){
+                                FlowBus.addCommonScriptNode(id, name, script);
+                            }else {
+                                FlowBus.addCondScriptNode(id, name, script);
                             }
                         }
                     }
@@ -112,10 +108,7 @@ public abstract class JsonFlowParser extends FlowParser {
                 JSONArray chainArray = flowJsonObject.getJSONObject("flow").getJSONArray("chain");
                 for (int i = 0; i < chainArray.size(); i++) {
                     JSONObject jsonObject = chainArray.getJSONObject(i);
-                    String chainName = jsonObject.getString("name");
-                    if (!FlowBus.containChain(chainName)) {
-                        parseOneChain(jsonObject, flowJsonObjectList);
-                    }
+                    parseOneChain(jsonObject, flowJsonObjectList);
                 }
             }
         } catch (Exception e) {

+ 13 - 20
liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java

@@ -89,24 +89,20 @@ public abstract class XmlFlowParser extends FlowParser {
                     //这里区分是普通java节点还是脚本节点
                     //如果是脚本节点,又区分是普通脚本节点,还是条件脚本节点
                     if (nodeTypeEnum.equals(NodeTypeEnum.COMMON) && StrUtil.isNotBlank(clazz)){
-                        if (!FlowBus.containNode(id)){
-                            FlowBus.addCommonNode(id, name, clazz);
-                        }
+                        FlowBus.addCommonNode(id, name, clazz);
                     }else if(nodeTypeEnum.equals(NodeTypeEnum.SCRIPT) || nodeTypeEnum.equals(NodeTypeEnum.COND_SCRIPT)){
-                        if (!FlowBus.containNode(id)){
-                            //如果file字段不为空,则优先从file里面读取脚本文本
-                            if (StrUtil.isNotBlank(file)){
-                                script = ResourceUtil.readUtf8Str(StrUtil.format("classpath: {}", file));
-                            }else{
-                                script = e.getTextTrim();
-                            }
+                        //如果file字段不为空,则优先从file里面读取脚本文本
+                        if (StrUtil.isNotBlank(file)){
+                            script = ResourceUtil.readUtf8Str(StrUtil.format("classpath: {}", file));
+                        }else{
+                            script = e.getTextTrim();
+                        }
 
-                            //根据节点类型把脚本添加到元数据里
-                            if (nodeTypeEnum.equals(NodeTypeEnum.SCRIPT)){
-                                FlowBus.addCommonScriptNode(id, name, script);
-                            }else {
-                                FlowBus.addCondScriptNode(id, name, script);
-                            }
+                        //根据节点类型把脚本添加到元数据里
+                        if (nodeTypeEnum.equals(NodeTypeEnum.SCRIPT)){
+                            FlowBus.addCommonScriptNode(id, name, script);
+                        }else {
+                            FlowBus.addCondScriptNode(id, name, script);
                         }
                     }
                 }
@@ -115,10 +111,7 @@ public abstract class XmlFlowParser extends FlowParser {
             // 解析chain节点
             List<Element> chainList = rootElement.elements("chain");
             for (Element e : chainList) {
-                String chainName = e.attributeValue("name");
-                if (!FlowBus.containChain(chainName)) {
-                    parseOneChain(e, documentList);
-                }
+                parseOneChain(e, documentList);
             }
         }
     }

+ 0 - 1
liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperXmlFlowParser.java

@@ -59,7 +59,6 @@ public class ZookeeperXmlFlowParser extends XmlFlowParser{
         cache.getListenable().addListener(() -> {
             String content1 = new String(cache.getCurrentData().getData());
             LOG.info("stating load flow config....");
-            FlowBus.cleanCache();
             parse(content1);
         });
 	}

+ 171 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/util/CopyOnWriteHashMap.java

@@ -0,0 +1,171 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
+ *
+ * Copyright (c) 2010-2015 Oracle and/or its affiliates. All rights reserved.
+ *
+ * The contents of this file are subject to the terms of either the GNU
+ * General Public License Version 2 only ("GPL") or the Common Development
+ * and Distribution License("CDDL") (collectively, the "License").  You
+ * may not use this file except in compliance with the License.  You can
+ * obtain a copy of the License at
+ * http://glassfish.java.net/public/CDDL+GPL_1_1.html
+ * or packager/legal/LICENSE.txt.  See the License for the specific
+ * language governing permissions and limitations under the License.
+ *
+ * When distributing the software, include this License Header Notice in each
+ * file and include the License file at packager/legal/LICENSE.txt.
+ *
+ * GPL Classpath Exception:
+ * Oracle designates this particular file as subject to the "Classpath"
+ * exception as provided by Oracle in the GPL Version 2 section of the License
+ * file that accompanied this code.
+ *
+ * Modifications:
+ * If applicable, add the following below the License Header, with the fields
+ * enclosed by brackets [] replaced by your own identifying information:
+ * "Portions Copyright [year] [name of copyright owner]"
+ *
+ * Contributor(s):
+ * If you wish your version of this file to be governed by only the CDDL or
+ * only the GPL Version 2, indicate your decision by adding "[Contributor]
+ * elects to include this software in this distribution under the [CDDL or GPL
+ * Version 2] license."  If you don't indicate a single choice of license, a
+ * recipient has the option to distribute your version of this file under
+ * either the CDDL, the GPL Version 2 or to extend the choice of license to
+ * its licensees as provided above.  However, if you add GPL Version 2 code
+ * and therefore, elected the GPL Version 2 license, then the option applies
+ * only if the new code is made subject to such option by the copyright
+ * holder.
+ */
+
+package com.yomahub.liteflow.util;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A basic copy on write HashMap.
+ * <p>
+ * If an instance is cloned then any methods invoked on the instance or clone
+ * that result in state modification will result in copying of the state
+ * before modification.
+ *
+ * @author Paul.Sandoz@Oracle.Com
+ * @author pavel.bucek@oracle.com
+ */
+public class CopyOnWriteHashMap<K,V> implements Map<K,V> {
+    private volatile Map<K,V> core;
+
+    volatile Map<K,V> view;
+
+    private final AtomicBoolean requiresCopyOnWrite;
+
+    public CopyOnWriteHashMap() {
+        this.core = new HashMap<K, V>();
+        this.requiresCopyOnWrite = new AtomicBoolean(false);
+    }
+
+    private CopyOnWriteHashMap(CopyOnWriteHashMap<K,V> that) {
+        this.core = that.core;
+        this.requiresCopyOnWrite = new AtomicBoolean(true);
+    }
+
+    @Override
+    public CopyOnWriteHashMap<K,V> clone() {
+        try {
+            return new CopyOnWriteHashMap(this);
+        } finally {
+            requiresCopyOnWrite.set(true);
+        }
+    }
+
+    private void copy() {
+        if (requiresCopyOnWrite.compareAndSet(true, false)) {
+            core = new HashMap<K, V>(core);
+            view = null;
+        }
+    }
+
+    @Override
+    public int size() {
+        return core.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return core.isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return core.containsKey(key);
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        return core.containsValue(value);
+    }
+
+    @Override
+    public V get(Object key) {
+        return core.get(key);
+    }
+
+    @Override
+    public V put(K key, V value) {
+        copy();
+        return core.put(key, value);
+    }
+
+    @Override
+    public V remove(Object key) {
+        copy();
+        return core.remove(key);
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> t) {
+        copy();
+        core.putAll(t);
+    }
+
+    @Override
+    public void clear() {
+        core = new HashMap<K, V>();
+        view = null;
+        copy();
+    }
+
+    @Override
+    public Set<K> keySet() {
+        return getView().keySet();
+    }
+
+    @Override
+    public Collection<V> values() {
+        return getView().values();
+    }
+
+    @Override
+    public Set<Entry<K,V>> entrySet() {
+        return getView().entrySet();
+    }
+
+    @Override
+    public String toString() {
+        return core.toString();
+    }
+
+    private Map<K, V> getView() {
+        Map<K, V> result = view; // volatile read
+        if (result == null) {
+            result = Collections.unmodifiableMap(core);
+            view = result; // volatile write
+        }
+        return result;
+    }
+}

+ 3 - 4
liteflow-script-groovy/src/main/java/com/yomahub/liteflow/script/groovy/GroovyScriptExecutor.java

@@ -6,6 +6,7 @@ import com.yomahub.liteflow.entity.data.Slot;
 import com.yomahub.liteflow.script.ScriptExecutor;
 import com.yomahub.liteflow.script.exception.ScriptExecuteException;
 import com.yomahub.liteflow.script.exception.ScriptLoadException;
+import com.yomahub.liteflow.util.CopyOnWriteHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -24,7 +25,7 @@ public class GroovyScriptExecutor implements ScriptExecutor {
 
     private ScriptEngine scriptEngine;
 
-    private final Map<String, CompiledScript> compiledScriptMap = new HashMap<>();
+    private final Map<String, CompiledScript> compiledScriptMap = new CopyOnWriteHashMap<>();
 
     @Override
     public ScriptExecutor init() {
@@ -37,9 +38,7 @@ public class GroovyScriptExecutor implements ScriptExecutor {
     public void load(String nodeId, String script) {
         try{
             CompiledScript compiledScript = ((Compilable) scriptEngine).compile(script);
-            if (!compiledScriptMap.containsKey(nodeId)){
-                compiledScriptMap.put(nodeId, compiledScript);
-            }
+            compiledScriptMap.put(nodeId, compiledScript);
         }catch (Exception e){
             String errorMsg = StrUtil.format("script loading error for node[{}], error msg:{}", nodeId, e.getMessage());
             throw new ScriptLoadException(errorMsg);

+ 15 - 3
liteflow-script-qlexpress/src/main/java/com/yomahub/liteflow/script/qlexpress/QLExpressScriptExecutor.java

@@ -1,21 +1,23 @@
 package com.yomahub.liteflow.script.qlexpress;
 
-import cn.hutool.core.io.resource.ResourceUtil;
 import cn.hutool.core.util.ReflectUtil;
 import cn.hutool.core.util.StrUtil;
 import com.ql.util.express.DefaultContext;
 import com.ql.util.express.ExpressLoader;
 import com.ql.util.express.ExpressRunner;
+import com.ql.util.express.InstructionSet;
 import com.yomahub.liteflow.entity.data.DataBus;
 import com.yomahub.liteflow.entity.data.Slot;
 import com.yomahub.liteflow.script.ScriptExecutor;
 import com.yomahub.liteflow.script.exception.ScriptExecuteException;
 import com.yomahub.liteflow.script.exception.ScriptLoadException;
+import com.yomahub.liteflow.util.CopyOnWriteHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * 阿里QLExpress脚本语言的执行器实现
@@ -28,6 +30,8 @@ public class QLExpressScriptExecutor implements ScriptExecutor {
 
     private ExpressRunner expressRunner;
 
+    private final Map<String, InstructionSet> compiledScriptMap = new CopyOnWriteHashMap<>();
+
     @Override
     public ScriptExecutor init() {
         expressRunner = new ExpressRunner();
@@ -37,7 +41,8 @@ public class QLExpressScriptExecutor implements ScriptExecutor {
     @Override
     public void load(String nodeId, String script) {
         try{
-            expressRunner.loadMutilExpress(nodeId, script);
+            InstructionSet instructionSet = expressRunner.getInstructionSetFromLocalCache(script);
+            compiledScriptMap.put(nodeId, instructionSet);
         }catch (Exception e){
             String errorMsg = StrUtil.format("script loading error for node[{}],error msg:{}", nodeId, e.getMessage());
             throw new ScriptLoadException(errorMsg);
@@ -48,10 +53,16 @@ public class QLExpressScriptExecutor implements ScriptExecutor {
     public Object execute(String nodeId, int slotIndex) {
         List<String> errorList = new ArrayList<>();
         try{
+            if (!compiledScriptMap.containsKey(nodeId)){
+                String errorMsg = StrUtil.format("script for node[{}] is not loaded", nodeId);
+                throw new RuntimeException(errorMsg);
+            }
+
+            InstructionSet instructionSet = compiledScriptMap.get(nodeId);
             Slot slot = DataBus.getSlot(slotIndex);
             DefaultContext<String, Object> context = new DefaultContext<>();
             context.put("slot", slot);
-            return expressRunner.executeByExpressName(nodeId, context, errorList, true, false, null);
+            return expressRunner.execute(instructionSet, context, errorList, true, false, null);
         }catch (Exception e){
             for (String scriptErrorMsg : errorList){
                 log.error("\n{}", scriptErrorMsg);
@@ -63,6 +74,7 @@ public class QLExpressScriptExecutor implements ScriptExecutor {
 
     @Override
     public void cleanCache() {
+        compiledScriptMap.clear();
         expressRunner.clearExpressCache();
         ReflectUtil.setFieldValue(expressRunner,"loader",new ExpressLoader(expressRunner));
     }

+ 1 - 0
liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java

@@ -1,5 +1,6 @@
 package com.yomahub.liteflow.test;
 
+import com.yomahub.liteflow.entity.data.DataBus;
 import com.yomahub.liteflow.flow.FlowBus;
 import com.yomahub.liteflow.spring.ComponentScanner;
 import org.junit.AfterClass;

+ 29 - 2
liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/refreshRule/RefreshRuleSpringbootTest.java

@@ -33,11 +33,38 @@ public class RefreshRuleSpringbootTest extends BaseTest {
     @Resource
     private FlowExecutor flowExecutor;
 
+    //测试普通刷新流程的场景
     @Test
-    public void testRefresh() throws Exception{
-        String content = ResourceUtil.readUtf8Str("classpath: /refreshRule/flow.xml");
+    public void testRefresh1() throws Exception{
+        String content = ResourceUtil.readUtf8Str("classpath: /refreshRule/flow_update.xml");
         FlowBus.refreshFlowMetaData(FlowParserTypeEnum.TYPE_XML, content);
         LiteflowResponse<DefaultSlot> response = flowExecutor.execute2Resp("chain1", "arg");
         Assert.assertTrue(response.isSuccess());
     }
+
+    //测试优雅刷新的场景
+    @Test
+    public void testRefresh2() throws Exception{
+        new Thread(() -> {
+            try {
+                Thread.sleep(1000L);
+                String content = ResourceUtil.readUtf8Str("classpath: /refreshRule/flow_update.xml");
+                FlowBus.refreshFlowMetaData(FlowParserTypeEnum.TYPE_XML, content);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }).start();
+
+        for (int i = 0; i < 500; i++) {
+            LiteflowResponse<DefaultSlot> response = flowExecutor.execute2Resp("chain1", "arg");
+            Assert.assertTrue(response.isSuccess());
+            try {
+                Thread.sleep(10L);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
 }

+ 1 - 2
liteflow-testcase-springboot/src/test/resources/refreshRule/application.properties

@@ -1,2 +1 @@
-liteflow.rule-source=refreshRule/flow.xml
-liteflow.parse-on-start=false
+liteflow.rule-source=refreshRule/flow.xml

+ 6 - 0
liteflow-testcase-springboot/src/test/resources/refreshRule/flow_update.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<flow>
+    <chain name="chain1">
+        <then value="c,b,a"/>
+    </chain>
+</flow>