Przeglądaj źródła

优化并发性能

AE86 2 lat temu
rodzic
commit
34f0e993e9

+ 10 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -13,6 +13,7 @@ import java.time.Instant;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -37,7 +38,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     @Autowired
     private BufferActuatorConfig bufferActuatorConfig;
 
-    private Queue<Request> buffer;
+    private BlockingQueue<Request> buffer;
 
     private final Lock lock = new ReentrantLock(true);
 
@@ -81,7 +82,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
      * @param response
      * @return
      */
-    protected boolean skipPartition(Request nextRequest, Response response){
+    protected boolean skipPartition(Request nextRequest, Response response) {
         return false;
     }
 
@@ -96,8 +97,12 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     }
 
     @Override
-    public void offer(BufferRequest request) {
-        buffer.offer((Request) request);
+    public boolean offer(BufferRequest request) {
+        boolean offer = buffer.offer((Request) request);
+        if (!offer) {
+            logger.warn("[{}]缓存队列容量已达上限,建议修改参数[dbsyncer.parser.flush.buffer.actuator.queue-capacity={}], ", this.getClass().getSimpleName(), getQueueCapacity());
+        }
+        return offer;
     }
 
     @Override
@@ -139,7 +144,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
                 batchCounter.incrementAndGet();
 
                 Request next = queue.peek();
-                if(null != next && skipPartition(next, response)){
+                if (null != next && skipPartition(next, response)) {
                     break;
                 }
             }

+ 2 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -27,7 +27,8 @@ public interface BufferActuator {
      * 提交任务
      *
      * @param request
+     * @return true/false
      */
-    void offer(BufferRequest request);
+    boolean offer(BufferRequest request);
 
 }

+ 6 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -76,7 +76,11 @@ public class FlushServiceImpl implements FlushService {
                 row.put(ConfigConstant.CONFIG_MODEL_JSON, r.toString());
             }
             row.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
-            storageBufferActuator.offer(new StorageRequest(metaId, row));
+
+            // 缓存队列满时,打印日志
+            if (!storageBufferActuator.offer(new StorageRequest(metaId, row))) {
+                logger.error("缓存队列容量已达上限, 无法持久化:{}", r);
+            }
         });
     }
 
@@ -86,7 +90,7 @@ public class FlushServiceImpl implements FlushService {
      * @param error
      * @return
      */
-    private String substring(String error){
+    private String substring(String error) {
         return StringUtil.substring(error, 0, flushDataConfig.getMaxErrorLength());
     }
 

+ 0 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -19,11 +19,6 @@ public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest
     @Autowired
     private StorageService storageService;
 
-    @Override
-    public int getQueueCapacity() {
-        return super.getQueueCapacity() / 4;
-    }
-
     @Override
     protected String getPartitionKey(StorageRequest bufferTask) {
         return bufferTask.getMetaId();

+ 2 - 13
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableWriterBufferActuatorStrategy.java

@@ -8,7 +8,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.PostConstruct;
 import java.util.List;
 import java.util.Map;
 
@@ -16,25 +15,15 @@ import java.util.Map;
 @ConditionalOnProperty(value = "dbsyncer.parser.flush.buffer.actuator.speed.enabled", havingValue = "true")
 public final class EnableWriterBufferActuatorStrategy extends AbstractWriterBinlog implements ParserStrategy {
 
-    private static final double BUFFER_THRESHOLD = 0.8;
-
     @Autowired
     private BufferActuator writerBufferActuator;
 
-    private static double limit;
-
-    @PostConstruct
-    private void init() {
-        limit = Math.ceil(getQueueCapacity() * BUFFER_THRESHOLD);
-    }
-
     @Override
     public void execute(String tableGroupId, String event, Map<String, Object> data) {
-        if (getQueue().size() >= limit) {
+        // 缓存队列满时转写磁盘
+        if (!writerBufferActuator.offer(new WriterRequest(tableGroupId, event, data))) {
             super.flush(tableGroupId, event, data);
-            return;
         }
-        writerBufferActuator.offer(new WriterRequest(tableGroupId, event, data));
     }
 
     @Override