|
@@ -99,58 +99,57 @@ public class FlowExecutor {
|
|
|
|
|
|
public <T extends Slot> T execute(String chainId,Object param,Class<? extends Slot> slotClazz,Integer slotIndex,boolean isInnerChain) throws Exception{
|
|
|
Slot slot = null;
|
|
|
- try{
|
|
|
- if(FlowBus.needInit()) {
|
|
|
- init();
|
|
|
- }
|
|
|
|
|
|
- Chain chain = FlowBus.getChain(chainId);
|
|
|
+ if(FlowBus.needInit()) {
|
|
|
+ init();
|
|
|
+ }
|
|
|
|
|
|
- if(chain == null){
|
|
|
- String errorMsg = MessageFormat.format("couldn't find chain with the id[{0}]", chainId);
|
|
|
- throw new ChainNotFoundException(errorMsg);
|
|
|
- }
|
|
|
+ Chain chain = FlowBus.getChain(chainId);
|
|
|
|
|
|
- if(!isInnerChain && slotIndex == null) {
|
|
|
- slotIndex = DataBus.offerSlot(slotClazz);
|
|
|
- LOG.info("slot[{}] offered",slotIndex);
|
|
|
- }
|
|
|
+ if(chain == null){
|
|
|
+ String errorMsg = MessageFormat.format("couldn't find chain with the id[{0}]", chainId);
|
|
|
+ throw new ChainNotFoundException(errorMsg);
|
|
|
+ }
|
|
|
|
|
|
- if(slotIndex == -1){
|
|
|
- throw new NoAvailableSlotException("there is no available slot");
|
|
|
- }
|
|
|
+ if(!isInnerChain && slotIndex == null) {
|
|
|
+ slotIndex = DataBus.offerSlot(slotClazz);
|
|
|
+ LOG.info("slot[{}] offered",slotIndex);
|
|
|
+ }
|
|
|
|
|
|
- slot = DataBus.getSlot(slotIndex);
|
|
|
- if(slot == null) {
|
|
|
- throw new NoAvailableSlotException("the slot is not exist");
|
|
|
- }
|
|
|
+ if(slotIndex == -1){
|
|
|
+ throw new NoAvailableSlotException("there is no available slot");
|
|
|
+ }
|
|
|
|
|
|
- if(StringUtils.isBlank(slot.getRequestId())) {
|
|
|
- slot.generateRequestId();
|
|
|
- LOG.info("requestId[{}] has generated",slot.getRequestId());
|
|
|
- }
|
|
|
+ slot = DataBus.getSlot(slotIndex);
|
|
|
+ if(slot == null) {
|
|
|
+ throw new NoAvailableSlotException("the slot is not exist");
|
|
|
+ }
|
|
|
|
|
|
- if(!isInnerChain) {
|
|
|
- slot.setRequestData(param);
|
|
|
- slot.setChainName(chainId);
|
|
|
- }else {
|
|
|
- slot.setChainReqData(chainId, param);
|
|
|
- }
|
|
|
+ if(StringUtils.isBlank(slot.getRequestId())) {
|
|
|
+ slot.generateRequestId();
|
|
|
+ LOG.info("requestId[{}] has generated",slot.getRequestId());
|
|
|
+ }
|
|
|
+
|
|
|
+ if(!isInnerChain) {
|
|
|
+ slot.setRequestData(param);
|
|
|
+ slot.setChainName(chainId);
|
|
|
+ }else {
|
|
|
+ slot.setChainReqData(chainId, param);
|
|
|
+ }
|
|
|
|
|
|
+ try {
|
|
|
//执行chain
|
|
|
chain.execute(slotIndex);
|
|
|
-
|
|
|
- return (T)slot;
|
|
|
}catch(Exception e){
|
|
|
- String errorMsg = MessageFormat.format("[{0}]executor cause error", slot.getRequestId());
|
|
|
- LOG.error(errorMsg,e);
|
|
|
- throw e;
|
|
|
+ slot.setSuccess(false);
|
|
|
+ slot.setErrorMsg(e.getMessage());
|
|
|
}finally{
|
|
|
if(!isInnerChain) {
|
|
|
slot.printStep();
|
|
|
DataBus.releaseSlot(slotIndex);
|
|
|
}
|
|
|
}
|
|
|
+ return (T)slot;
|
|
|
}
|
|
|
|
|
|
private class WhenConditionThread extends Thread{
|