|
@@ -18,6 +18,7 @@ import com.yomahub.liteflow.flow.LiteflowResponse;
|
|
|
import com.yomahub.liteflow.flow.element.Chain;
|
|
|
import com.yomahub.liteflow.flow.element.Node;
|
|
|
import com.yomahub.liteflow.flow.id.IdGeneratorHolder;
|
|
|
+import com.yomahub.liteflow.monitor.MonitorFile;
|
|
|
import com.yomahub.liteflow.parser.base.FlowParser;
|
|
|
import com.yomahub.liteflow.parser.factory.FlowParserProvider;
|
|
|
import com.yomahub.liteflow.parser.spi.ParserClassNameSpi;
|
|
@@ -27,6 +28,7 @@ import com.yomahub.liteflow.slot.DataBus;
|
|
|
import com.yomahub.liteflow.slot.DefaultContext;
|
|
|
import com.yomahub.liteflow.slot.Slot;
|
|
|
import com.yomahub.liteflow.spi.holder.ContextCmpInitHolder;
|
|
|
+import com.yomahub.liteflow.spi.holder.PathContentParserHolder;
|
|
|
import com.yomahub.liteflow.thread.ExecutorHelper;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -89,11 +91,11 @@ public class FlowExecutor {
|
|
|
//所有的Parser的SPI实现都是以custom形式放入的,且只支持xml形式
|
|
|
ServiceLoader<ParserClassNameSpi> loader = ServiceLoader.load(ParserClassNameSpi.class);
|
|
|
Iterator<ParserClassNameSpi> it = loader.iterator();
|
|
|
- if (it.hasNext()){
|
|
|
+ if (it.hasNext()) {
|
|
|
ParserClassNameSpi parserClassNameSpi = it.next();
|
|
|
ruleSource = "el_xml:" + parserClassNameSpi.getSpiClassName();
|
|
|
liteflowConfig.setRuleSource(ruleSource);
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
//ruleSource为空,而且没有spi形式的扩展,那么说明真的没有ruleSource
|
|
|
//这种情况有可能是基于代码动态构建的
|
|
|
return;
|
|
@@ -124,6 +126,9 @@ public class FlowExecutor {
|
|
|
|
|
|
//支持多类型的配置文件,分别解析
|
|
|
if (BooleanUtil.isTrue(liteflowConfig.isSupportMultipleType())) {
|
|
|
+ // 添加监听文件路径
|
|
|
+ addMonitorFilePaths(ListUtil.toList(path));
|
|
|
+ // 解析文件
|
|
|
parser.parseMain(ListUtil.toList(path));
|
|
|
}
|
|
|
} catch (CyclicDependencyException e) {
|
|
@@ -148,6 +153,9 @@ public class FlowExecutor {
|
|
|
//进行多个配置文件的一起解析
|
|
|
try {
|
|
|
if (parser != null) {
|
|
|
+ // 添加监听文件路径
|
|
|
+ addMonitorFilePaths(rulePathList);
|
|
|
+ // 解析文件
|
|
|
parser.parseMain(rulePathList);
|
|
|
} else {
|
|
|
throw new ConfigErrorException("parse error, please check liteflow config property");
|
|
@@ -167,30 +175,37 @@ public class FlowExecutor {
|
|
|
}
|
|
|
|
|
|
//如果是ruleSource方式的,最后判断下有没有解析出来,如果没有解析出来则报错
|
|
|
- if (StrUtil.isBlank(liteflowConfig.getRuleSourceExtData()) && MapUtil.isEmpty(liteflowConfig.getRuleSourceExtDataMap())){
|
|
|
- if (FlowBus.getChainMap().isEmpty()){
|
|
|
+ if (StrUtil.isBlank(liteflowConfig.getRuleSourceExtData()) && MapUtil.isEmpty(liteflowConfig.getRuleSourceExtDataMap())) {
|
|
|
+ if (FlowBus.getChainMap().isEmpty()) {
|
|
|
String errMsg = StrUtil.format("no valid rule config found in rule path [{}]", liteflowConfig.getRuleSource());
|
|
|
throw new ConfigErrorException(errMsg);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//执行钩子
|
|
|
- if(hook){
|
|
|
+ if (hook) {
|
|
|
FlowInitHook.executeHook();
|
|
|
}
|
|
|
+
|
|
|
+ // 文件监听
|
|
|
+ if (liteflowConfig.getEnableMonitorFile()) {
|
|
|
+ MonitorFile.getInstance().create();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
//此方法就是从原有的配置源主动拉取新的进行刷新
|
|
|
//和FlowBus.refreshFlowMetaData的区别就是一个为主动拉取,一个为被动监听到新的内容进行刷新
|
|
|
public void reloadRule() {
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
init(false);
|
|
|
+ LOG.info("reload rules takes {}ms", System.currentTimeMillis() - start);
|
|
|
}
|
|
|
|
|
|
//隐式流程的调用方法
|
|
|
@Deprecated
|
|
|
public void invoke(String chainId, Object param, Integer slotIndex) throws Exception {
|
|
|
LiteflowResponse response = this.invoke2Resp(chainId, param, slotIndex, InnerChainTypeEnum.IN_SYNC);
|
|
|
- if (!response.isSuccess()){
|
|
|
+ if (!response.isSuccess()) {
|
|
|
throw response.getCause();
|
|
|
}
|
|
|
}
|
|
@@ -198,7 +213,7 @@ public class FlowExecutor {
|
|
|
@Deprecated
|
|
|
public void invokeInAsync(String chainId, Object param, Integer slotIndex) throws Exception {
|
|
|
LiteflowResponse response = this.invoke2Resp(chainId, param, slotIndex, InnerChainTypeEnum.IN_ASYNC);
|
|
|
- if (!response.isSuccess()){
|
|
|
+ if (!response.isSuccess()) {
|
|
|
throw response.getCause();
|
|
|
}
|
|
|
}
|
|
@@ -240,7 +255,7 @@ public class FlowExecutor {
|
|
|
//调用一个流程并返回Future<LiteflowResponse>,允许多上下文的传入
|
|
|
public Future<LiteflowResponse> execute2Future(String chainId, Object param, Class<?>... contextBeanClazzArray) {
|
|
|
return ExecutorHelper.loadInstance().buildMainExecutor(liteflowConfig.getMainExecutorClass()).submit(()
|
|
|
- -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray,null));
|
|
|
+ -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray, null));
|
|
|
}
|
|
|
|
|
|
|
|
@@ -251,11 +266,11 @@ public class FlowExecutor {
|
|
|
|
|
|
//调用一个流程,返回默认的上下文,适用于简单的调用
|
|
|
@Deprecated
|
|
|
- public DefaultContext execute(String chainId, Object param) throws Exception{
|
|
|
+ public DefaultContext execute(String chainId, Object param) throws Exception {
|
|
|
LiteflowResponse response = this.execute2Resp(chainId, param, DefaultContext.class);
|
|
|
- if (!response.isSuccess()){
|
|
|
+ if (!response.isSuccess()) {
|
|
|
throw response.getCause();
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
return response.getFirstContextBean();
|
|
|
}
|
|
|
}
|
|
@@ -269,8 +284,8 @@ public class FlowExecutor {
|
|
|
}
|
|
|
|
|
|
private LiteflowResponse invoke2Resp(String chainId,
|
|
|
- Object param,
|
|
|
- Integer slotIndex, InnerChainTypeEnum innerChainType) {
|
|
|
+ Object param,
|
|
|
+ Integer slotIndex, InnerChainTypeEnum innerChainType) {
|
|
|
Slot slot = doExecute(chainId, param, null, null, slotIndex, innerChainType);
|
|
|
return LiteflowResponse.newInnerResponse(chainId, slot);
|
|
|
}
|
|
@@ -288,9 +303,9 @@ public class FlowExecutor {
|
|
|
//如果不是隐式流程,那么需要分配Slot
|
|
|
if (innerChainType.equals(InnerChainTypeEnum.NONE) && ObjectUtil.isNull(slotIndex)) {
|
|
|
//这里可以根据class分配,也可以根据bean去分配
|
|
|
- if (ArrayUtil.isNotEmpty(contextBeanClazzArray)){
|
|
|
+ if (ArrayUtil.isNotEmpty(contextBeanClazzArray)) {
|
|
|
slotIndex = DataBus.offerSlotByClass(ListUtil.toList(contextBeanClazzArray));
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
slotIndex = DataBus.offerSlotByBean(ListUtil.toList(contextBeanArray));
|
|
|
}
|
|
|
if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) {
|
|
@@ -311,7 +326,7 @@ public class FlowExecutor {
|
|
|
//如果是隐式流程,事先把subException给置空,然后把隐式流程的chainId放入slot元数据中
|
|
|
//我知道这在多线程调用隐式流程中会有问题。但是考虑到这种场景的不会多,也有其他的转换方式。
|
|
|
//所以暂且这么做,以后再优化
|
|
|
- if (!innerChainType.equals(InnerChainTypeEnum.NONE)){
|
|
|
+ if (!innerChainType.equals(InnerChainTypeEnum.NONE)) {
|
|
|
slot.removeSubException(chainId);
|
|
|
slot.addSubChain(chainId);
|
|
|
}
|
|
@@ -326,9 +341,9 @@ public class FlowExecutor {
|
|
|
if (ObjectUtil.isNotNull(param)) {
|
|
|
if (innerChainType.equals(InnerChainTypeEnum.NONE)) {
|
|
|
slot.setRequestData(param);
|
|
|
- } else if(innerChainType.equals(InnerChainTypeEnum.IN_SYNC)){
|
|
|
+ } else if (innerChainType.equals(InnerChainTypeEnum.IN_SYNC)) {
|
|
|
slot.setChainReqData(chainId, param);
|
|
|
- } else if(innerChainType.equals(InnerChainTypeEnum.IN_ASYNC)){
|
|
|
+ } else if (innerChainType.equals(InnerChainTypeEnum.IN_ASYNC)) {
|
|
|
slot.setChainReqData2Queue(chainId, param);
|
|
|
}
|
|
|
}
|
|
@@ -351,15 +366,15 @@ public class FlowExecutor {
|
|
|
} catch (Exception e) {
|
|
|
if (ObjectUtil.isNotNull(chain)) {
|
|
|
String errMsg = StrUtil.format("[{}]:chain[{}] execute error on slot[{}]", slot.getRequestId(), chain.getChainName(), slotIndex);
|
|
|
- if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())){
|
|
|
+ if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) {
|
|
|
LOG.error(errMsg, e);
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
LOG.error(errMsg);
|
|
|
}
|
|
|
- }else{
|
|
|
- if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())){
|
|
|
+ } else {
|
|
|
+ if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) {
|
|
|
LOG.error(e.getMessage(), e);
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
LOG.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
@@ -368,7 +383,7 @@ public class FlowExecutor {
|
|
|
//如果是隐式流程,则需要设置到隐式流程的exception属性里
|
|
|
if (innerChainType.equals(InnerChainTypeEnum.NONE)) {
|
|
|
slot.setException(e);
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
slot.setSubException(chainId, e);
|
|
|
}
|
|
|
} finally {
|
|
@@ -389,4 +404,15 @@ public class FlowExecutor {
|
|
|
//把liteFlowConfig设到LiteFlowGetter中去
|
|
|
LiteflowConfigGetter.setLiteflowConfig(liteflowConfig);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 添加监听文件路径
|
|
|
+ *
|
|
|
+ * @param pathList 文件路径
|
|
|
+ */
|
|
|
+ private void addMonitorFilePaths(List<String> pathList) throws Exception {
|
|
|
+ // 添加规则文件监听
|
|
|
+ List<String> fileAbsolutePath = PathContentParserHolder.loadContextAware().getFileAbsolutePath(pathList);
|
|
|
+ MonitorFile.getInstance().addMonitorFilePaths(fileAbsolutePath);
|
|
|
+ }
|
|
|
}
|