Bladeren bron

监控任务指标

AE86 2 jaren geleden
bovenliggende
commit
a3cf5b6cfb

+ 1 - 1
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/Monitor.java

@@ -33,7 +33,7 @@ public interface Monitor {
 
     List<MetricEnum> getMetricEnumAll();
 
-    List<MetricResponse> getThreadPoolInfo();
+    List<MetricResponse> getMetricInfo();
 
     AppReportMetric getAppReportMetric();
 

+ 21 - 17
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.monitor;
 
-import org.dbsyncer.common.config.ThreadPoolConfig;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
@@ -8,10 +7,12 @@ import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.monitor.enums.MetricEnum;
 import org.dbsyncer.monitor.enums.StatisticEnum;
+import org.dbsyncer.monitor.enums.TaskMetricEnum;
 import org.dbsyncer.monitor.enums.ThreadPoolMetricEnum;
 import org.dbsyncer.monitor.model.AppReportMetric;
 import org.dbsyncer.monitor.model.MetricResponse;
 import org.dbsyncer.monitor.model.Sample;
+import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.storage.constant.ConfigConstant;
@@ -25,7 +26,6 @@ import org.springframework.stereotype.Component;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
@@ -46,7 +46,10 @@ public class MonitorFactory implements Monitor {
     private Executor taskExecutor;
 
     @Autowired
-    private ThreadPoolConfig threadPoolConfig;
+    private BufferActuator writerBufferActuator;
+
+    @Autowired
+    private BufferActuator storageBufferActuator;
 
     @Override
     public Mapping getMapping(String mappingId) {
@@ -105,16 +108,18 @@ public class MonitorFactory implements Monitor {
     }
 
     @Override
-    public List<MetricResponse> getThreadPoolInfo() {
+    public List<MetricResponse> getMetricInfo() {
         ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) taskExecutor;
         ThreadPoolExecutor pool = threadTask.getThreadPoolExecutor();
 
         List<MetricResponse> list = new ArrayList<>();
-        list.add(createMetricResponse(ThreadPoolMetricEnum.TASK_SUBMITTED, pool.getTaskCount()));
-        list.add(createMetricResponse(ThreadPoolMetricEnum.QUEUE_UP, pool.getQueue().size()));
-        list.add(createMetricResponse(ThreadPoolMetricEnum.ACTIVE, pool.getActiveCount()));
-        list.add(createMetricResponse(ThreadPoolMetricEnum.COMPLETED, pool.getCompletedTaskCount()));
-        list.add(createMetricResponse(ThreadPoolMetricEnum.REMAINING_CAPACITY, pool.getQueue().remainingCapacity()));
+        list.add(createTaskMetricResponse(TaskMetricEnum.STORAGE_ACTIVE, storageBufferActuator.getQueue().size()));
+        list.add(createTaskMetricResponse(TaskMetricEnum.STORAGE_REMAINING_CAPACITY, storageBufferActuator.getQueueCapacity() - storageBufferActuator.getQueue().size()));
+        list.add(createThreadPoolMetricResponse(ThreadPoolMetricEnum.TASK_SUBMITTED, pool.getTaskCount()));
+        list.add(createThreadPoolMetricResponse(ThreadPoolMetricEnum.QUEUE_UP, pool.getQueue().size()));
+        list.add(createThreadPoolMetricResponse(ThreadPoolMetricEnum.ACTIVE, pool.getActiveCount()));
+        list.add(createThreadPoolMetricResponse(ThreadPoolMetricEnum.COMPLETED, pool.getCompletedTaskCount()));
+        list.add(createThreadPoolMetricResponse(ThreadPoolMetricEnum.REMAINING_CAPACITY, pool.getQueue().remainingCapacity()));
         return list;
     }
 
@@ -127,13 +132,8 @@ public class MonitorFactory implements Monitor {
         report.setInsert(getMappingInsert(metaAll));
         report.setUpdate(getMappingUpdate(metaAll));
         report.setDelete(getMappingDelete(metaAll));
-
-        // 线程池使用情况
-        ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) taskExecutor;
-        ThreadPoolExecutor pool = threadTask.getThreadPoolExecutor();
-        BlockingQueue<Runnable> queue = pool.getQueue();
-        report.setQueueUp(queue.size());
-        report.setQueueCapacity(threadPoolConfig.getQueueCapacity());
+        report.setQueueUp(writerBufferActuator.getQueue().size());
+        report.setQueueCapacity(writerBufferActuator.getQueueCapacity());
         return report;
     }
 
@@ -187,7 +187,11 @@ public class MonitorFactory implements Monitor {
         return queryMappingMetricCount(metaAll, (query) -> query.addFilter(ConfigConstant.DATA_EVENT, ConnectorConstant.OPERTION_DELETE));
     }
 
-    private MetricResponse createMetricResponse(ThreadPoolMetricEnum metricEnum, Object value) {
+    private MetricResponse createThreadPoolMetricResponse(ThreadPoolMetricEnum metricEnum, Object value) {
+        return new MetricResponse(metricEnum.getCode(), metricEnum.getGroup(), metricEnum.getMetricName(), Arrays.asList(new Sample(StatisticEnum.COUNT.getTagValueRepresentation(), value)));
+    }
+
+    private MetricResponse createTaskMetricResponse(TaskMetricEnum metricEnum, Object value) {
         return new MetricResponse(metricEnum.getCode(), metricEnum.getGroup(), metricEnum.getMetricName(), Arrays.asList(new Sample(StatisticEnum.COUNT.getTagValueRepresentation(), value)));
     }
 

+ 43 - 0
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/enums/TaskMetricEnum.java

@@ -0,0 +1,43 @@
+package org.dbsyncer.monitor.enums;
+
+/**
+ * 执行任务指标
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2021/07/23 0:19
+ */
+public enum TaskMetricEnum {
+
+    /**
+     * 处理中
+     */
+    STORAGE_ACTIVE("parser.storage.buffer.actuator.active", "持久化", "处理中"),
+
+    /**
+     * 空闲队列
+     */
+    STORAGE_REMAINING_CAPACITY("parser.storage.buffer.actuator.capacity", "持久化", "空闲队列");
+
+    private String code;
+    private String group;
+    private String metricName;
+
+    TaskMetricEnum(String code, String group, String metricName) {
+        this.code = code;
+        this.group = group;
+        this.metricName = metricName;
+    }
+
+    public String getCode() {
+        return code;
+    }
+
+    public String getGroup() {
+        return group;
+    }
+
+    public String getMetricName() {
+        return metricName;
+    }
+}

+ 9 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -38,11 +38,11 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
 
     private static final double BUFFER_THRESHOLD = 0.8;
 
-    private static final long MAX_BATCH_COUNT = 1000L;
+    private static final int MAX_BATCH_COUNT = 1000;
 
-    private static final long PERIOD = 300;
+    private static final int PERIOD = 300;
 
-    private Queue<Request> buffer = new LinkedBlockingQueue(CAPACITY);
+    private Queue<Request> buffer;
 
     private final Lock lock = new ReentrantLock(true);
 
@@ -53,6 +53,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     @PostConstruct
     private void init() {
         responseClazz = (Class<Response>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
+        buffer = new LinkedBlockingQueue(getQueueCapacity());
         scheduledTaskService.start(PERIOD, this);
     }
 
@@ -84,6 +85,11 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
         return buffer;
     }
 
+    @Override
+    public int getQueueCapacity() {
+        return CAPACITY;
+    }
+
     @Override
     public void offer(BufferRequest request) {
         buffer.offer((Request) request);

+ 7 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -16,6 +16,13 @@ public interface BufferActuator {
      */
     Queue getQueue();
 
+    /**
+     * 获取缓存队列容量
+     *
+     * @return
+     */
+    int getQueueCapacity();
+
     /**
      * 提交任务
      *

+ 5 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -19,6 +19,11 @@ public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest
     @Autowired
     private StorageService storageService;
 
+    @Override
+    public int getQueueCapacity() {
+        return 1_0000;
+    }
+
     @Override
     protected String getPartitionKey(StorageRequest bufferTask) {
         return bufferTask.getMetaId();