Explorar el Código

增加SQL轮询chain部分

houxinyu hace 1 año
padre
commit
d07768d231

+ 6 - 0
liteflow-rule-plugin/liteflow-rule-sql/pom.xml

@@ -20,5 +20,11 @@
             <optional>true</optional>
             <scope>provided</scope>
         </dependency>
+
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-crypto</artifactId>
+            <version>${hutool-crypto.version}</version>
+        </dependency>
     </dependencies>
 </project>

+ 17 - 2
liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java

@@ -5,6 +5,7 @@ import cn.hutool.core.bean.copier.CopyOptions;
 import cn.hutool.core.map.MapUtil;
 import cn.hutool.core.text.StrFormatter;
 import cn.hutool.core.util.StrUtil;
+import com.yomahub.liteflow.core.FlowInitHook;
 import com.yomahub.liteflow.parser.el.ClassXmlFlowELParser;
 import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
 import com.yomahub.liteflow.parser.sql.util.JDBCHelper;
@@ -23,6 +24,8 @@ import java.util.Objects;
  */
 public class SQLXmlELParser extends ClassXmlFlowELParser {
 
+	private static SQLParserVO sqlParserVO;
+
 	private static final String ERROR_MSG_PATTERN = "rule-source-ext-data {} is blank";
 
 	private static final String ERROR_COMMON_MSG = "rule-source-ext-data is empty";
@@ -34,7 +37,6 @@ public class SQLXmlELParser extends ClassXmlFlowELParser {
 		LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
 
 		try {
-			SQLParserVO sqlParserVO = null;
 			if (MapUtil.isNotEmpty((liteflowConfig.getRuleSourceExtDataMap()))) {
 				sqlParserVO = BeanUtil.toBean(liteflowConfig.getRuleSourceExtDataMap(), SQLParserVO.class,
 						CopyOptions.create());
@@ -63,7 +65,20 @@ public class SQLXmlELParser extends ClassXmlFlowELParser {
 
 	@Override
 	public String parseCustom() {
-		return JDBCHelper.getInstance().getContent();
+		try{
+			JDBCHelper jdbcHelper = JDBCHelper.getInstance();
+			String content = jdbcHelper.getContent();
+			if(sqlParserVO.getIfPolling()) {
+				FlowInitHook.addHook(() -> {
+					jdbcHelper.listenSQL();
+					return true;
+				});
+			}
+			return content;
+		}
+		catch (Exception ex) {
+			throw new ELSQLException(ex.getMessage());
+		}
 	}
 
 	/**

+ 149 - 0
liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java

@@ -0,0 +1,149 @@
+package com.yomahub.liteflow.parser.sql.util;
+
+import cn.hutool.core.util.StrUtil;
+import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
+import com.yomahub.liteflow.flow.FlowBus;
+import com.yomahub.liteflow.log.LFLog;
+import com.yomahub.liteflow.log.LFLoggerManager;
+import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
+import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * 用于轮询chain的定时任务
+ *
+ * @author hxinyu
+ * @since  2.11.1
+ */
+public class ChainPollingTask implements Runnable {
+
+    private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?";
+
+    private static final String NEW_CHAIN_PATTERN = "SELECT {} FROM {} WHERE {}=? AND {}=?";
+
+    private static final String SHA_PATTERN = "SHA1({})";
+
+    public static Connection conn;
+
+    private SQLParserVO sqlParserVO;
+
+    private Map<String, String> chainSHAMap;
+
+    private static final Integer FETCH_SIZE_MAX = 1000;
+
+    LFLog LOG = LFLoggerManager.getLogger(ChainPollingTask.class);
+
+    public ChainPollingTask(SQLParserVO sqlParserVO, Map<String, String> chainSHAMap) {
+        this.sqlParserVO = sqlParserVO;
+        this.chainSHAMap = chainSHAMap;
+        conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
+    }
+
+    @Override
+    public void run() {
+        try{
+            PreparedStatement stmt;
+            ResultSet rs;
+            String chainTableName = sqlParserVO.getChainTableName();
+            String elDataField = sqlParserVO.getElDataField();
+            String chainNameField = sqlParserVO.getChainNameField();
+            String chainApplicationNameField = sqlParserVO.getChainApplicationNameField();
+            String applicationName = sqlParserVO.getApplicationName();
+
+            String SHAField = StrUtil.format(SHA_PATTERN, elDataField);
+            String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, SHAField, chainTableName,
+                    chainApplicationNameField);
+            stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            // 设置游标拉取数量
+            stmt.setFetchSize(FETCH_SIZE_MAX);
+            stmt.setString(1, applicationName);
+            rs = stmt.executeQuery();
+
+            Set<String> newChainSet = new HashSet<>();
+
+            while(rs.next()) {
+                String chainName = getStringFromResultSet(rs, chainNameField);
+                String newSHA = getStringFromResultSet(rs, SHAField);
+                newChainSet.add(chainName);
+                //如果封装的SHAMap中不存在该chain, 表示该chain为新增
+                if(!chainSHAMap.containsKey(chainName)) {
+                    //获取新chain结果
+                    ResultSet newChainRS = getNewChainRS(elDataField, chainTableName, chainNameField,
+                            chainApplicationNameField, applicationName, chainName);
+                    if(newChainRS.next()) {
+                        String newELData = getStringFromResultSet(newChainRS, elDataField);
+                        //新增chain
+                        LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build();
+                        LOG.info("starting reload flow config... create key={} new value={},", chainName, newELData);
+                        //加入到shaMap
+                        chainSHAMap.put(chainName, newSHA);
+                    }
+                }
+                else if (!StrUtil.equals(newSHA, chainSHAMap.get(chainName))) {
+                    //SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain
+                    ResultSet newChainRS = getNewChainRS(elDataField, chainTableName, chainNameField,
+                            chainApplicationNameField, applicationName, chainName);
+                    if(newChainRS.next()) {
+                        String newELData = getStringFromResultSet(newChainRS, elDataField);
+                        //修改chain
+                        LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build();
+                        LOG.info("starting reload flow config... update key={} new value={},", chainName, newELData);
+                        //修改shaMap
+                        chainSHAMap.put(chainName, newSHA);
+                    }
+                }
+                //SHA值无变化,表示该chain未改变
+            }
+
+            if(chainSHAMap.size() > newChainSet.size()) {
+                //如果遍历prepareStatement后修改过的SHAMap数量比最新chain总数多, 说明有两种情况:
+                // 1、删除了chain
+                // 2、修改了chainName:因为遍历到新的name时会加到SHAMap里,但没有机会删除旧的chain
+                // 3、上述两者结合
+                //在此处遍历chainSHAMap,把不在newChainSet中的chain删除
+                for (String chainName : chainSHAMap.keySet()) {
+                    if(!newChainSet.contains(chainName)){
+                        FlowBus.removeChain(chainName);
+                        LOG.info("starting reload flow config... delete chain={}", chainName);
+                        //修改SHAMap
+                        chainSHAMap.remove(chainName);
+                    }
+                }
+            }
+
+        }catch (Exception e) {
+            LOG.error("[Exception during SQL chain polling] " + e.getMessage(), e);
+        }
+    }
+
+    private ResultSet getNewChainRS(String elDataField, String chainTableName, String chainNameField,
+                                    String chainApplicationNameField, String applicationName, String chainName) {
+        ResultSet rs = null;
+        String sqlCmd = StrUtil.format(NEW_CHAIN_PATTERN, elDataField, chainTableName,
+                chainNameField, chainApplicationNameField);
+        try{
+            PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            stmt.setString(1, chainName);
+            stmt.setString(2, applicationName);
+            rs = stmt.executeQuery();
+        }catch (Exception e) {
+            throw new ELSQLException(e.getMessage());
+        }
+        return rs;
+    }
+
+    private String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
+        String data = rs.getString(field);
+        if (StrUtil.isBlank(data)) {
+            throw new ELSQLException(StrUtil.format("exist {} field value is empty", field));
+        }
+        return data;
+    }
+}

+ 49 - 3
liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java

@@ -1,8 +1,11 @@
 package com.yomahub.liteflow.parser.sql.util;
 
 import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.thread.NamedThreadFactory;
+import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.core.util.XmlUtil;
+import cn.hutool.crypto.digest.DigestUtil;
 import com.yomahub.liteflow.enums.NodeTypeEnum;
 import com.yomahub.liteflow.enums.ScriptTypeEnum;
 import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
@@ -12,9 +15,11 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
+import java.util.*;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * jdbc 工具类
@@ -48,6 +53,15 @@ public class JDBCHelper {
 
     private static JDBCHelper INSTANCE;
 
+    //定时任务线程池核心线程数
+    private static final int CORE_POOL_SIZE = 2;
+
+    //定时任务线程池
+    private static ScheduledThreadPoolExecutor pollExecutor;
+
+    //chain的SHA1加密值 用于轮询时确定elDataField是否变化
+    private Map<String, String> chainSHAMap = new HashMap<>();
+
     /**
      * 初始化 INSTANCE
      */
@@ -58,6 +72,15 @@ public class JDBCHelper {
                 Class.forName(sqlParserVO.getDriverClassName());
             }
             INSTANCE.setSqlParserVO(sqlParserVO);
+            //创建定时任务线程池
+            if (sqlParserVO.getIfPolling() && ObjectUtil.isNull(getPollExecutor())) {
+                ThreadFactory namedThreadFactory = new NamedThreadFactory("SQL-Polling-", false);
+                ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(
+                        CORE_POOL_SIZE,
+                        namedThreadFactory,
+                        new ThreadPoolExecutor.DiscardOldestPolicy());
+                setPollExecutor(threadPoolExecutor);
+            }
         } catch (ClassNotFoundException e) {
             throw new ELSQLException(e.getMessage());
         }
@@ -109,6 +132,12 @@ public class JDBCHelper {
                 String chainName = getStringFromResultSet(rs, chainNameField);
 
                 result.add(StrUtil.format(CHAIN_XML_PATTERN, XmlUtil.escape(chainName), elData));
+
+                //如果需要轮询 计算该chainData的SHA值
+                if(sqlParserVO.getIfPolling()){
+                    String chainSHA = DigestUtil.sha1Hex(elData);
+                    chainSHAMap.put(chainName, chainSHA);
+                }
             }
         } catch (Exception e) {
             throw new ELSQLException(e.getMessage());
@@ -129,6 +158,16 @@ public class JDBCHelper {
         return StrUtil.format(XML_PATTERN, nodesContent, chainsContent);
     }
 
+    /**
+     * 定时轮询拉取SQL中变化的数据
+     */
+    public void listenSQL() {
+        ChainPollingTask chainTask = new ChainPollingTask(sqlParserVO, chainSHAMap);
+        pollExecutor.scheduleAtFixedRate(chainTask, sqlParserVO.getPollingStartTime().longValue(),
+                sqlParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS);
+    }
+
+
     /**
      * 获取脚本节点内容
      */
@@ -299,4 +338,11 @@ public class JDBCHelper {
         this.sqlParserVO = sqlParserVO;
     }
 
+    public static ScheduledThreadPoolExecutor getPollExecutor() {
+        return pollExecutor;
+    }
+
+    public static void setPollExecutor(ScheduledThreadPoolExecutor pollExecutor) {
+        JDBCHelper.pollExecutor = pollExecutor;
+    }
 }

+ 32 - 0
liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java

@@ -90,6 +90,15 @@ public class SQLParserVO {
      */
     private String scriptLanguageField;
 
+    /*是否开启轮询机制 默认不开启*/
+    private Boolean ifPolling = false;
+
+    /*轮询时间间隔(s) 默认60s*/
+    private Integer pollingInterval = 60;
+
+    /*规则配置后首次轮询的起始时间 默认为60s*/
+    private Integer pollingStartTime = 60;
+
     public String getUrl() {
         return url;
     }
@@ -225,4 +234,27 @@ public class SQLParserVO {
         return StrUtil.isBlank(url) && StrUtil.isBlank(username) && StrUtil.isBlank(password) && StrUtil.isBlank(driverClassName);
     }
 
+    public Boolean getIfPolling() {
+        return ifPolling;
+    }
+
+    public void setIfPolling(Boolean ifPolling) {
+        this.ifPolling = ifPolling;
+    }
+
+    public Integer getPollingInterval() {
+        return pollingInterval;
+    }
+
+    public void setPollingInterval(Integer pollingInterval) {
+        this.pollingInterval = pollingInterval;
+    }
+
+    public Integer getPollingStartTime() {
+        return pollingStartTime;
+    }
+
+    public void setPollingStartTime(Integer pollingStartTime) {
+        this.pollingStartTime = pollingStartTime;
+    }
 }