浏览代码

优化buffer

AE86 3 年之前
父节点
当前提交
28a23df279
共有 1 个文件被更改,包括 6 次插入1 次删除
  1. 6 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

+ 6 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -14,6 +14,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -37,6 +38,8 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
 
     private volatile boolean running;
 
+    private final static long MAX_BATCH_COUNT = 1000L;
+
     @PostConstruct
     private void init() {
         scheduledTaskService.start(getPeriod(), this);
@@ -116,14 +119,16 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
 
     private void flush(Queue<Request> queue) {
         if (!queue.isEmpty()) {
+            AtomicLong batchCounter = new AtomicLong();
             final Map<String, AbstractResponse> map = new LinkedHashMap<>();
-            while (!queue.isEmpty()) {
+            while (!queue.isEmpty() && batchCounter.get() < MAX_BATCH_COUNT) {
                 Request poll = queue.poll();
                 String key = getPartitionKey(poll);
                 if (!map.containsKey(key)) {
                     map.putIfAbsent(key, getValue());
                 }
                 partition(poll, (Response) map.get(key));
+                batchCounter.incrementAndGet();
             }
 
             map.forEach((key, flushTask) -> {