AE86 2 anos atrás
pai
commit
12399d41bf

+ 1 - 15
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -13,7 +13,6 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -36,9 +35,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
 
     private static final int CAPACITY = 10_0000;
 
-    private static final double BUFFER_THRESHOLD = 0.8;
-
-    private static final int MAX_BATCH_COUNT = 3000;
+    private static final int MAX_BATCH_COUNT = 2000;
 
     private static final int PERIOD = 300;
 
@@ -93,17 +90,6 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     @Override
     public void offer(BufferRequest request) {
         buffer.offer((Request) request);
-
-        // TODO 临时解决方案:生产大于消费问题,限制生产速度
-        int size = buffer.size();
-        if (size >= (CAPACITY * BUFFER_THRESHOLD)) {
-            try {
-                TimeUnit.SECONDS.sleep(30);
-                logger.warn("当前任务队列大小{}已达上限{},请稍等{}秒", size, CAPACITY * BUFFER_THRESHOLD, 30);
-            } catch (InterruptedException e) {
-                logger.error(e.getMessage());
-            }
-        }
     }
 
     @Override

+ 26 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableWriterBufferActuatorStrategy.java

@@ -3,22 +3,48 @@ package org.dbsyncer.parser.strategy.impl;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.strategy.ParserStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 @Component
 @ConditionalOnProperty(value = "dbsyncer.parser.writer.buffer.actuator.enabled", havingValue = "true")
 public final class EnableWriterBufferActuatorStrategy implements ParserStrategy {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private static final double BUFFER_THRESHOLD = 0.8;
+
     @Autowired
     private BufferActuator writerBufferActuator;
 
+    private double limit;
+
+    @PostConstruct
+    private void init() {
+        limit = Math.ceil(writerBufferActuator.getQueueCapacity() * BUFFER_THRESHOLD);
+    }
+
     @Override
     public void execute(String tableGroupId, String event, Map<String, Object> data) {
         writerBufferActuator.offer(new WriterRequest(tableGroupId, event, data));
+
+        // 超过容量限制,限制生产速度
+        final int size = writerBufferActuator.getQueue().size();
+        if (size >= limit) {
+            try {
+                TimeUnit.SECONDS.sleep(30);
+                logger.warn("当前任务队列大小{}已达上限{},请稍等{}秒", size, limit, 30);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage());
+            }
+        }
     }
 
 }