Bläddra i källkod

支持采集TPS

AE86 11 månader sedan
förälder
incheckning
46d144deae

+ 28 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/metric/Bucket.java

@@ -0,0 +1,28 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
+package org.dbsyncer.common.metric;
+
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2024-06-02 22:45
+ */
+public final class Bucket {
+
+    private LongAdder longAdder = new LongAdder();
+
+    public void add(long count) {
+        longAdder.add(count);
+    }
+
+    public long get() {
+        return longAdder.sum();
+    }
+
+    public void reset() {
+        this.longAdder.reset();
+    }
+}

+ 119 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/metric/TimeMetric.java

@@ -0,0 +1,119 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
+package org.dbsyncer.common.metric;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2024-06-02 22:44
+ */
+public class TimeMetric {
+    /**
+     * 槽位的数量
+     */
+    private int bucketSize;
+
+    /**
+     * 时间片,单位毫秒
+     */
+    private int slice;
+
+    /**
+     * 用于判断是否可跳过锁争抢
+     */
+    private int timeSliceUsed;
+
+    /**
+     * 槽位
+     */
+    private Bucket[] buckets;
+
+    /**
+     * 目标槽位的位置
+     */
+    private volatile Integer targetBucketPosition;
+
+    /**
+     * 接近目标槽位最新时间
+     */
+    private volatile Long latestPassedTime;
+
+    /**
+     * 进入下一个槽位时使用的锁
+     */
+    private ReentrantLock enterNextBucketLock;
+
+    /**
+     * 默认60个槽位,槽位的时间片为1000毫秒
+     */
+    public TimeMetric() {
+        this(60, 1000);
+    }
+
+    /**
+     * 初始化Bucket数量与每个Bucket的时间片
+     *
+     * @param bucketSize
+     * @param slice
+     */
+    public TimeMetric(int bucketSize, int slice) {
+        this.bucketSize = bucketSize;
+        this.slice = slice;
+        this.latestPassedTime = System.currentTimeMillis() - (2 * slice);
+        this.timeSliceUsed = 3 * slice;
+        this.targetBucketPosition = getTargetBucketPosition(System.currentTimeMillis());
+        this.enterNextBucketLock = new ReentrantLock();
+        this.buckets = new Bucket[bucketSize];
+        for (int i = 0; i < bucketSize; i++) {
+            this.buckets[i] = new Bucket();
+        }
+    }
+
+    public void add(long count) {
+        long passTime = System.currentTimeMillis();
+        Bucket currentBucket = buckets[targetBucketPosition];
+        if (passTime - latestPassedTime < slice) {
+            currentBucket.add(count);
+            return;
+        }
+
+        if (enterNextBucketLock.isLocked() && passTime - latestPassedTime < timeSliceUsed) {
+            currentBucket.add(count);
+            return;
+        }
+
+        try {
+            enterNextBucketLock.lock();
+            if (passTime - latestPassedTime < slice) {
+                currentBucket.add(count);
+                return;
+            }
+
+            int nextTargetBucketPosition = getTargetBucketPosition(passTime);
+            Bucket nextBucket = buckets[nextTargetBucketPosition];
+            if (nextBucket.equals(currentBucket)) {
+                if (passTime - latestPassedTime >= slice) {
+                    latestPassedTime = passTime;
+                }
+            } else {
+                nextBucket.reset();
+                targetBucketPosition = nextTargetBucketPosition;
+                latestPassedTime = passTime;
+            }
+            nextBucket.add(count);
+        } finally {
+            enterNextBucketLock.unlock();
+        }
+    }
+
+    public long getSecondsRate() {
+        return System.currentTimeMillis() - latestPassedTime < slice ? buckets[targetBucketPosition].get() : 0L;
+    }
+
+    private Integer getTargetBucketPosition(long now) {
+        return (int) (now / slice) % bucketSize;
+    }
+}

+ 24 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/metric/TimeRegistry.java

@@ -0,0 +1,24 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
+package org.dbsyncer.common.metric;
+
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2024-06-02 22:53
+ */
+@Component
+public final class TimeRegistry {
+
+    private Map<String, TimeMetric> metricMap = new ConcurrentHashMap<>();
+
+    public TimeMetric meter(String name) {
+        return metricMap.computeIfAbsent(name, k -> new TimeMetric());
+    }
+}

+ 16 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -4,6 +4,7 @@
 package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.common.config.BufferActuatorConfig;
+import org.dbsyncer.common.metric.TimeRegistry;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.parser.ProfileComponent;
@@ -48,6 +49,9 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
     @Resource
     private ProfileComponent profileComponent;
 
+    @Resource
+    private TimeRegistry timeRegistry;
+
     public AbstractBufferActuator() {
         int level = 5;
         Class<?> aClass = getClass();
@@ -130,7 +134,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
      *
      * @param map
      */
-    protected void process(Map<String, Response> map){
+    protected void process(Map<String, Response> map) {
         map.forEach((key, response) -> {
             long now = Instant.now().toEpochMilli();
             try {
@@ -150,6 +154,16 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
      */
     protected abstract void offerFailed(BlockingQueue<Request> queue, Request request);
 
+    /**
+     * 统计消费 TPS
+     *
+     * @param timeRegistry
+     * @param count
+     */
+    protected void meter(TimeRegistry timeRegistry, long count) {
+
+    }
+
     @Override
     public void offer(BufferRequest request) {
         if (queue.offer((Request) request)) {
@@ -212,6 +226,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
 
         process(map);
         map.clear();
+        meter(timeRegistry, batchCounter.get());
         map = null;
         batchCounter = null;
     }

+ 7 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -5,6 +5,7 @@ package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.common.QueueOverflowException;
 import org.dbsyncer.common.config.GeneralBufferConfig;
+import org.dbsyncer.common.metric.TimeRegistry;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
@@ -170,6 +171,12 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         throw new QueueOverflowException("缓存队列已满");
     }
 
+    @Override
+    protected void meter(TimeRegistry timeRegistry, long count) {
+        // 统计执行器同步效率TPS
+        timeRegistry.meter("general.buffer.actuator.tps").add(count);
+    }
+
     @Override
     public Executor getExecutor() {
         return generalExecutor;