|
@@ -14,8 +14,8 @@ import cn.hutool.core.util.ReUtil;
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
import com.google.common.collect.Lists;
|
|
|
import com.yomahub.liteflow.slot.DataBus;
|
|
|
-import com.yomahub.liteflow.slot.DefaultSlot;
|
|
|
import com.yomahub.liteflow.flow.LiteflowResponse;
|
|
|
+import com.yomahub.liteflow.slot.DefaultContext;
|
|
|
import com.yomahub.liteflow.slot.Slot;
|
|
|
import com.yomahub.liteflow.flow.element.Chain;
|
|
|
import com.yomahub.liteflow.flow.element.Node;
|
|
@@ -30,10 +30,7 @@ import com.yomahub.liteflow.thread.ExecutorHelper;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.HashSet;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Set;
|
|
|
+import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
|
/**
|
|
@@ -169,10 +166,6 @@ public class FlowExecutor {
|
|
|
|
|
|
/**
|
|
|
* 匹配路径配置,生成对应的解析器
|
|
|
- *
|
|
|
- * @param path 配置路径
|
|
|
- * @param pattern 格式
|
|
|
- * @return
|
|
|
*/
|
|
|
private FlowParser matchFormatParser(String path, FlowParserTypeEnum pattern) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
|
|
|
boolean isLocalFile = isLocalConfig(path);
|
|
@@ -217,9 +210,6 @@ public class FlowExecutor {
|
|
|
|
|
|
/**
|
|
|
* 判定是否为本地文件
|
|
|
- *
|
|
|
- * @param path
|
|
|
- * @return
|
|
|
*/
|
|
|
private boolean isLocalConfig(String path) {
|
|
|
return ReUtil.isMatch(LOCAL_XML_CONFIG_REGEX, path)
|
|
@@ -229,9 +219,6 @@ public class FlowExecutor {
|
|
|
|
|
|
/**
|
|
|
* 判定是否为自定义class配置
|
|
|
- *
|
|
|
- * @param path
|
|
|
- * @return
|
|
|
*/
|
|
|
private boolean isClassConfig(String path) {
|
|
|
return ReUtil.isMatch(CLASS_CONFIG_REGEX, path);
|
|
@@ -239,9 +226,6 @@ public class FlowExecutor {
|
|
|
|
|
|
/**
|
|
|
* 判定是否为zk配置
|
|
|
- *
|
|
|
- * @param path
|
|
|
- * @return
|
|
|
*/
|
|
|
private boolean isZKConfig(String path) {
|
|
|
return ReUtil.isMatch(ZK_CONFIG_REGEX, path);
|
|
@@ -249,9 +233,6 @@ public class FlowExecutor {
|
|
|
|
|
|
/**
|
|
|
* 匹配文本格式,支持xml,json和yml
|
|
|
- *
|
|
|
- * @param path
|
|
|
- * @return
|
|
|
*/
|
|
|
private FlowParserTypeEnum matchFormatConfig(String path) {
|
|
|
if (ReUtil.isMatch(LOCAL_XML_CONFIG_REGEX, path) || ReUtil.isMatch(FORMATE_XML_CONFIG_REGEX, path)) {
|
|
@@ -262,7 +243,7 @@ public class FlowExecutor {
|
|
|
return FlowParserTypeEnum.TYPE_YML;
|
|
|
} else if (isClassConfig(path)) {
|
|
|
try {
|
|
|
- Class clazz = Class.forName(path);
|
|
|
+ Class<?> clazz = Class.forName(path);
|
|
|
if (ClassXmlFlowParser.class.isAssignableFrom(clazz)) {
|
|
|
return FlowParserTypeEnum.TYPE_XML;
|
|
|
} else if (ClassJsonFlowParser.class.isAssignableFrom(clazz)) {
|
|
@@ -284,36 +265,35 @@ public class FlowExecutor {
|
|
|
}
|
|
|
|
|
|
//隐式流程的调用方法
|
|
|
- public <T extends Slot> void invoke(String chainId, Object param, Class<T> slotClazz,
|
|
|
- Integer slotIndex) throws Exception {
|
|
|
- this.execute(chainId, param, slotClazz, slotIndex, true);
|
|
|
+ public void invoke(String chainId, Object param, Integer slotIndex) throws Exception {
|
|
|
+ this.execute(chainId, param, null, slotIndex, true);
|
|
|
}
|
|
|
|
|
|
- public <T extends Slot> LiteflowResponse<T> invoke2Resp(String chainId, Object param, Class<T> slotClazz,
|
|
|
- Integer slotIndex){
|
|
|
- return this.execute2Resp(chainId, param, slotClazz, slotIndex, true);
|
|
|
+ public <T> LiteflowResponse<T> invoke2Resp(String chainId, Object param, Integer slotIndex){
|
|
|
+ return this.execute2Resp(chainId, param, null, slotIndex, true);
|
|
|
}
|
|
|
|
|
|
- public <T extends Slot> void invoke(String nodeId, Integer slotIndex) throws Exception {
|
|
|
+ //单独调用某一个node
|
|
|
+ public void invoke(String nodeId, Integer slotIndex) throws Exception {
|
|
|
Node node = FlowBus.getNode(nodeId);
|
|
|
node.execute(slotIndex);
|
|
|
}
|
|
|
|
|
|
- public DefaultSlot execute(String chainId) throws Exception {
|
|
|
- return this.execute(chainId, null, DefaultSlot.class, null, false);
|
|
|
+ public Slot<DefaultContext> execute(String chainId) throws Exception {
|
|
|
+ return this.execute(chainId, null, DefaultContext.class, null, false);
|
|
|
}
|
|
|
|
|
|
- public DefaultSlot execute(String chainId, Object param) throws Exception {
|
|
|
- return this.execute(chainId, param, DefaultSlot.class, null, false);
|
|
|
+ public Slot<DefaultContext> execute(String chainId, Object param) throws Exception {
|
|
|
+ return this.execute(chainId, param, DefaultContext.class, null, false);
|
|
|
}
|
|
|
|
|
|
- public <T extends Slot> T execute(String chainId, Object param, Class<T> slotClazz) throws Exception {
|
|
|
- return this.execute(chainId, param, slotClazz, null, false);
|
|
|
+ public <T> Slot<T> execute(String chainId, Object param, Class<T> contextBeanClazz) throws Exception {
|
|
|
+ return this.execute(chainId, param, contextBeanClazz, null, false);
|
|
|
}
|
|
|
|
|
|
- public <T extends Slot> T execute(String chainId, Object param, Class<T> slotClazz,
|
|
|
+ public <T> Slot<T> execute(String chainId, Object param, Class<T> contextBeanClazz,
|
|
|
Integer slotIndex, boolean isInnerChain) throws Exception {
|
|
|
- T slot = this.doExecute(chainId, param, slotClazz, slotIndex, isInnerChain);
|
|
|
+ Slot<T> slot = this.doExecute(chainId, param, contextBeanClazz, slotIndex, isInnerChain);
|
|
|
if (ObjectUtil.isNotNull(slot.getException())) {
|
|
|
throw slot.getException();
|
|
|
} else {
|
|
@@ -321,29 +301,29 @@ public class FlowExecutor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public LiteflowResponse<DefaultSlot> execute2Resp(String chainId) {
|
|
|
- return this.execute2Resp(chainId, null, DefaultSlot.class);
|
|
|
+ public LiteflowResponse<DefaultContext> execute2Resp(String chainId) {
|
|
|
+ return this.execute2Resp(chainId, null, DefaultContext.class);
|
|
|
}
|
|
|
|
|
|
- public LiteflowResponse<DefaultSlot> execute2Resp(String chainId, Object param) {
|
|
|
- return this.execute2Resp(chainId, param, DefaultSlot.class);
|
|
|
+ public LiteflowResponse<DefaultContext> execute2Resp(String chainId, Object param) {
|
|
|
+ return this.execute2Resp(chainId, param, DefaultContext.class);
|
|
|
}
|
|
|
|
|
|
- public <T extends Slot> LiteflowResponse<T> execute2Resp(String chainId, Object param, Class<T> slotClazz) {
|
|
|
- return this.execute2Resp(chainId, param, slotClazz, null, false);
|
|
|
+ public <T> LiteflowResponse<T> execute2Resp(String chainId, Object param, Class<T> contextBeanClazz) {
|
|
|
+ return this.execute2Resp(chainId, param, contextBeanClazz, null, false);
|
|
|
}
|
|
|
|
|
|
- public <T extends Slot> Future<LiteflowResponse<T>> execute2Future(String chainId, Object param, Class<T> slotClazz) {
|
|
|
+ public <T> Future<LiteflowResponse<T>> execute2Future(String chainId, Object param, Class<T> contextBeanClazz) {
|
|
|
return ExecutorHelper.loadInstance().buildMainExecutor(liteflowConfig.getMainExecutorClass()).submit(()
|
|
|
- -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, slotClazz, null, false));
|
|
|
+ -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazz, null, false));
|
|
|
|
|
|
}
|
|
|
|
|
|
- public <T extends Slot> LiteflowResponse<T> execute2Resp(String chainId, Object param, Class<T> slotClazz, Integer slotIndex,
|
|
|
- boolean isInnerChain) {
|
|
|
+ public <T> LiteflowResponse<T> execute2Resp(String chainId, Object param, Class<T> contextBeanClazz,
|
|
|
+ Integer slotIndex, boolean isInnerChain) {
|
|
|
LiteflowResponse<T> response = new LiteflowResponse<>();
|
|
|
|
|
|
- T slot = doExecute(chainId, param, slotClazz, slotIndex, isInnerChain);
|
|
|
+ Slot<T> slot = doExecute(chainId, param, contextBeanClazz, slotIndex, isInnerChain);
|
|
|
|
|
|
if (ObjectUtil.isNotNull(slot.getException())) {
|
|
|
response.setSuccess(false);
|
|
@@ -356,14 +336,14 @@ public class FlowExecutor {
|
|
|
return response;
|
|
|
}
|
|
|
|
|
|
- private <T extends Slot> T doExecute(String chainId, Object param, Class<T> slotClazz, Integer slotIndex,
|
|
|
+ private <T> Slot<T> doExecute(String chainId, Object param, Class<T> contextBeanClazz, Integer slotIndex,
|
|
|
boolean isInnerChain) {
|
|
|
if (FlowBus.needInit()) {
|
|
|
init();
|
|
|
}
|
|
|
|
|
|
if (!isInnerChain && ObjectUtil.isNull(slotIndex)) {
|
|
|
- slotIndex = DataBus.offerSlot(slotClazz);
|
|
|
+ slotIndex = DataBus.offerSlot(contextBeanClazz);
|
|
|
LOG.info("slot[{}] offered", slotIndex);
|
|
|
}
|
|
|
|
|
@@ -371,7 +351,7 @@ public class FlowExecutor {
|
|
|
throw new NoAvailableSlotException("there is no available slot");
|
|
|
}
|
|
|
|
|
|
- T slot = DataBus.getSlot(slotIndex);
|
|
|
+ Slot<T> slot = DataBus.getSlot(slotIndex);
|
|
|
if (ObjectUtil.isNull(slot)) {
|
|
|
throw new NoAvailableSlotException(StrUtil.format("the slot[{}] is not exist", slotIndex));
|
|
|
}
|