Browse Source

enhancement #I49JP1 DataBus中SlotSize的大小不支持动态扩展,无法应对高并发下的流量突增

bryan31 3 years ago
parent
commit
a052ad50ff

+ 3 - 0
liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java

@@ -137,6 +137,9 @@ public class FlowExecutor {
                 throw new FlowExecutorNotInitException(errorMsg);
             }
         }
+
+        //初始化DataBus
+        DataBus.init();
     }
 
     /**

+ 7 - 3
liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java

@@ -32,14 +32,18 @@ public class DataBus {
 	//这里为什么采用ConcurrentHashMap作为slot存放的容器?
 	//因为ConcurrentHashMap的随机取值复杂度也和数组一样为O(1),并且没有并发问题,还有自动扩容的功能
 	//用数组的话,扩容涉及copy,线程安全问题还要自己处理
-	private static final ConcurrentHashMap<Integer, Slot> SLOTS;
+	private static ConcurrentHashMap<Integer, Slot> SLOTS;
 
-	private static final ConcurrentLinkedQueue<Integer> QUEUE;
+	private static ConcurrentLinkedQueue<Integer> QUEUE;
 
 	//当前slot的下标index的最大值
 	private static Integer currentIndexMaxValue;
 
-	static {
+	//这里原先版本中是static块,现在改成init静态方法,由FlowExecutor中的init去调用
+	//这样的改动对项目来说没有什么实际意义,但是在单元测试中,却有意义。
+	//因为单元测试中所有的一起跑,jvm是不退出的,所以如果是static块的话,跑多个testsuite只会执行一次。
+	//而由FlowExecutor中的init去调用,是会被执行多次的。保证了每个单元测试都能初始化一遍
+	public static void init() {
 		LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
 		currentIndexMaxValue = liteflowConfig.getSlotSize();
 

+ 1 - 0
liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorInit.java

@@ -1,6 +1,7 @@
 package com.yomahub.liteflow.springboot;
 
 import com.yomahub.liteflow.core.FlowExecutor;
+import com.yomahub.liteflow.entity.data.DataBus;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 

+ 4 - 2
liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/resizeSlot/ResizeSlotSpringbootTest.java

@@ -55,7 +55,9 @@ public class ResizeSlotSpringbootTest extends BaseTest {
         Field field = ReflectUtil.getField(DataBus.class, "QUEUE");
         ConcurrentLinkedQueue<Integer> queue = (ConcurrentLinkedQueue<Integer>) ReflectUtil.getStaticFieldValue(field);
 
-        //因为初始slotSize是4,按照0.75的扩容比,要满足100个线程,应该扩容6次,6次之后应该是扩容到114
-        Assert.assertEquals(queue.size(),114);
+        //因为初始slotSize是4,按照0.75的扩容比,要满足100个线程,应该扩容5~6次,5次=65,6次=114
+        //为什么不是直接114呢?
+        //因为在单测中根据机器的性能,在多线程情况下,有些机器跑的慢一点,也就是说65个就足够了。有些机器跑的快一点,是能真正扩容到114个的
+        Assert.assertTrue(queue.size() ==65 || queue.size() == 114);
     }
 }