|
@@ -6,10 +6,12 @@ import cn.hutool.core.util.ObjectUtil;
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
import cn.hutool.crypto.digest.DigestUtil;
|
|
|
import com.yomahub.liteflow.parser.redis.exception.RedisException;
|
|
|
+import com.yomahub.liteflow.parser.redis.mode.RClient;
|
|
|
import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper;
|
|
|
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
|
|
|
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
|
|
|
-import redis.clients.jedis.Jedis;
|
|
|
+import org.redisson.Redisson;
|
|
|
+import org.redisson.config.Config;
|
|
|
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
@@ -25,9 +27,9 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
|
|
|
|
|
private final RedisParserVO redisParserVO;
|
|
|
|
|
|
- private Jedis chainJedis;
|
|
|
+ private RClient chainClient;
|
|
|
|
|
|
- private Jedis scriptJedis;
|
|
|
+ private RClient scriptClient;
|
|
|
|
|
|
//chainKey中chain总数
|
|
|
private Integer chainNum = 0;
|
|
@@ -63,25 +65,18 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
|
|
|
|
|
try{
|
|
|
try{
|
|
|
- this.chainJedis = ContextAwareHolder.loadContextAware().getBean("chainJedis");
|
|
|
- this.scriptJedis = ContextAwareHolder.loadContextAware().getBean("scriptJedis");
|
|
|
+ this.chainClient = ContextAwareHolder.loadContextAware().getBean("chainClient");
|
|
|
+ this.scriptClient = ContextAwareHolder.loadContextAware().getBean("scriptClient");
|
|
|
}
|
|
|
catch (Exception ignored) {
|
|
|
}
|
|
|
- if (ObjectUtil.isNull(chainJedis)) {
|
|
|
- chainJedis = new Jedis(redisParserVO.getHost(), redisParserVO.getPort());
|
|
|
- //如果配置了密码
|
|
|
- if (StrUtil.isNotBlank(redisParserVO.getPassword())) {
|
|
|
- chainJedis.auth(redisParserVO.getPassword());
|
|
|
- }
|
|
|
- chainJedis.select(redisParserVO.getChainDataBase());
|
|
|
+ if (ObjectUtil.isNull(chainClient)) {
|
|
|
+ Config config = getRedissonConfig(redisParserVO, redisParserVO.getChainDataBase());
|
|
|
+ this.chainClient = new RClient(Redisson.create(config));
|
|
|
//如果有脚本数据
|
|
|
if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) {
|
|
|
- scriptJedis = new Jedis(redisParserVO.getHost(), redisParserVO.getPort());
|
|
|
- if (StrUtil.isNotBlank(redisParserVO.getPassword())) {
|
|
|
- scriptJedis.auth(redisParserVO.getPassword());
|
|
|
- }
|
|
|
- scriptJedis.select(redisParserVO.getScriptDataBase());
|
|
|
+ config = getRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase());
|
|
|
+ this.scriptClient = new RClient(Redisson.create(config));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -95,7 +90,7 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
|
|
try {
|
|
|
// 检查chainKey下有没有子节点
|
|
|
String chainKey = redisParserVO.getChainKey();
|
|
|
- Set<String> chainNameSet = chainJedis.hkeys(chainKey);
|
|
|
+ Set<String> chainNameSet = chainClient.hkeys(chainKey);
|
|
|
if (CollectionUtil.isEmpty(chainNameSet)) {
|
|
|
throw new RedisException(StrUtil.format("There are no chains in key [{}]", chainKey));
|
|
|
}
|
|
@@ -103,7 +98,7 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
|
|
// 获取chainKey下的所有子节点内容List
|
|
|
List<String> chainItemContentList = new ArrayList<>();
|
|
|
for (String chainName : chainNameSet) {
|
|
|
- String chainData = chainJedis.hget(chainKey, chainName);
|
|
|
+ String chainData = chainClient.hget(chainKey, chainName);
|
|
|
if (StrUtil.isNotBlank(chainData)) {
|
|
|
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData));
|
|
|
}
|
|
@@ -119,7 +114,7 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
|
|
String scriptAllContent = StrUtil.EMPTY;
|
|
|
if (hasScript()) {
|
|
|
String scriptKey = redisParserVO.getScriptKey();
|
|
|
- Set<String> scriptFieldSet = scriptJedis.hkeys(scriptKey);
|
|
|
+ Set<String> scriptFieldSet = scriptClient.hkeys(scriptKey);
|
|
|
scriptNum = scriptFieldSet.size();
|
|
|
|
|
|
List<String> scriptItemContentList = new ArrayList<>();
|
|
@@ -130,7 +125,7 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
|
|
StrUtil.format("The name of the redis field [{}] in scriptKey [{}] is invalid",
|
|
|
scriptFieldValue, scriptKey));
|
|
|
}
|
|
|
- String scriptData = scriptJedis.hget(scriptKey, scriptFieldValue);
|
|
|
+ String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
|
|
|
|
|
|
// 有语言类型
|
|
|
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
|
|
@@ -161,7 +156,7 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
|
|
}
|
|
|
|
|
|
public boolean hasScript() {
|
|
|
- if (ObjectUtil.isNull(scriptJedis) || ObjectUtil.isNull(redisParserVO.getScriptDataBase())) {
|
|
|
+ if (ObjectUtil.isNull(scriptClient) || ObjectUtil.isNull(redisParserVO.getScriptDataBase())) {
|
|
|
return false;
|
|
|
}
|
|
|
try{
|
|
@@ -169,7 +164,7 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
|
|
if (StrUtil.isBlank(scriptKey)) {
|
|
|
return false;
|
|
|
}
|
|
|
- Set<String> scriptKeySet = scriptJedis.hkeys(scriptKey);
|
|
|
+ Set<String> scriptKeySet = scriptClient.hkeys(scriptKey);
|
|
|
return !CollUtil.isEmpty(scriptKeySet);
|
|
|
}
|
|
|
catch (Exception e) {
|
|
@@ -183,8 +178,8 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
|
|
@Override
|
|
|
public void listenRedis() {
|
|
|
//将lua脚本添加到chainJedis脚本缓存
|
|
|
- String keyLuaOfChain = chainJedis.scriptLoad(luaOfKey);
|
|
|
- String valueLuaOfChain = chainJedis.scriptLoad(luaOfValue);
|
|
|
+ String keyLuaOfChain = chainClient.scriptLoad(luaOfKey);
|
|
|
+ String valueLuaOfChain = chainClient.scriptLoad(luaOfValue);
|
|
|
|
|
|
//定时任务线程池
|
|
|
ScheduledThreadPoolExecutor pollExecutor = new ScheduledThreadPoolExecutor(
|
|
@@ -192,19 +187,19 @@ public class RedisParserPollingMode implements RedisParserHelper {
|
|
|
new ThreadPoolExecutor.DiscardOldestPolicy());
|
|
|
|
|
|
//添加轮询chain的定时任务
|
|
|
- ChainPollingTask chainTask = new ChainPollingTask(redisParserVO, chainJedis, chainNum, chainSHAMap, LOG);
|
|
|
+ ChainPollingTask chainTask = new ChainPollingTask(redisParserVO, chainClient, chainNum, chainSHAMap, LOG);
|
|
|
pollExecutor.scheduleAtFixedRate(chainTask.pollChainTask(keyLuaOfChain, valueLuaOfChain),
|
|
|
60, redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS);
|
|
|
|
|
|
//如果有脚本
|
|
|
- if (ObjectUtil.isNotNull(scriptJedis) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())
|
|
|
+ if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())
|
|
|
&& StrUtil.isNotBlank(redisParserVO.getScriptKey())) {
|
|
|
//将lua脚本添加到scriptJedis脚本缓存
|
|
|
- String keyLuaOfScript = scriptJedis.scriptLoad(luaOfKey);
|
|
|
- String valueLuaOfScript = scriptJedis.scriptLoad(luaOfValue);
|
|
|
+ String keyLuaOfScript = scriptClient.scriptLoad(luaOfKey);
|
|
|
+ String valueLuaOfScript = scriptClient.scriptLoad(luaOfValue);
|
|
|
|
|
|
//添加轮询script的定时任务
|
|
|
- ScriptPollingTask scriptTask = new ScriptPollingTask(redisParserVO, scriptJedis, scriptNum, scriptSHAMap, LOG);
|
|
|
+ ScriptPollingTask scriptTask = new ScriptPollingTask(redisParserVO, scriptClient, scriptNum, scriptSHAMap, LOG);
|
|
|
pollExecutor.scheduleAtFixedRate(scriptTask.pollScriptTask(keyLuaOfScript, valueLuaOfScript),
|
|
|
60, redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS);
|
|
|
}
|