Pārlūkot izejas kodu

优化监控

Signed-off-by: AE86 <836391306@qq.com>
AE86 1 gadu atpakaļ
vecāks
revīzija
28a6b73e9a

+ 2 - 2
dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java

@@ -10,10 +10,10 @@ public abstract class StringUtil {
 
     public static final String COLON = ":";
 
-    public static final String COMMA = ",";
-
     public static final String SPACE = " ";
 
+    public static final String FORWARD_SLASH = "/";
+
     public static boolean equals(CharSequence cs1, CharSequence cs2) {
         return StringUtils.equals(cs1, cs2);
     }

+ 45 - 28
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -16,8 +16,10 @@ import org.dbsyncer.monitor.enums.ThreadPoolMetricEnum;
 import org.dbsyncer.monitor.model.AppReportMetric;
 import org.dbsyncer.monitor.model.MappingReportMetric;
 import org.dbsyncer.monitor.model.MetricResponse;
+import org.dbsyncer.monitor.model.MetricResponseInfo;
 import org.dbsyncer.monitor.model.Sample;
 import org.dbsyncer.parser.flush.BufferActuator;
+import org.dbsyncer.parser.flush.impl.TableGroupBufferActuator;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
@@ -37,11 +39,14 @@ import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /**
  * @author AE86
@@ -174,34 +179,35 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
 
     @Override
     public List<MetricResponse> getMetricInfo() {
-        List<MetricResponse> list = new ArrayList<>();
-        collect(list, generalBufferActuator, BufferActuatorMetricEnum.GENERAL);
-        collect(list, storageBufferActuator, BufferActuatorMetricEnum.STORAGE);
+        List<MetricResponseInfo> list = new ArrayList<>();
+        BufferActuatorMetricEnum general = BufferActuatorMetricEnum.GENERAL;
+        BufferActuatorMetricEnum storage = BufferActuatorMetricEnum.STORAGE;
+        list.add(collect(generalBufferActuator, general.getCode(), general.getGroup(), general.getMetricName()));
+        list.add(collect(storageBufferActuator, storage.getCode(), storage.getGroup(), storage.getMetricName()));
         if (!CollectionUtils.isEmpty(bufferActuatorRouter.getRouter())) {
-            List<MetricResponse> tableResult = new ArrayList<>();
+            List<MetricResponseInfo> tableList = new ArrayList<>();
+            String tableGroupCode = BufferActuatorMetricEnum.TABLE_GROUP.getCode();
             bufferActuatorRouter.getRouter().forEach((metaId, group) -> {
-                group.forEach((k, bufferActuator) ->
-                        collect(tableResult, bufferActuator, BufferActuatorMetricEnum.TABLE_GROUP)
-                );
+                Meta meta = manager.getMeta(metaId);
+                Mapping mapping = manager.getMapping(meta.getMappingId());
+                group.forEach((k, bufferActuator) -> {
+                    if (bufferActuator instanceof TableGroupBufferActuator) {
+                        TableGroupBufferActuator actuator = bufferActuator;
+                        TableGroup tableGroup = manager.getTableGroup(actuator.getTableGroupId());
+                        String metricName = new StringBuilder()
+                                .append(tableGroup.getSourceTable().getName())
+                                .append(" >> ")
+                                .append(tableGroup.getTargetTable().getName()).toString();
+                        tableList.add(collect(bufferActuator, tableGroupCode, mapping.getName(), metricName));
+                    }
+                });
             });
-            // TODO sort by active
-            list.addAll(tableResult.size() <= SHOW_BUFFER_ACTUATOR_SIZE ? tableResult : tableResult.subList(0, SHOW_BUFFER_ACTUATOR_SIZE));
+            List<MetricResponseInfo> sortList = tableList.stream()
+                    .sorted(Comparator.comparing(MetricResponseInfo::getQueueUp).reversed())
+                    .collect(Collectors.toList());
+            list.addAll(sortList.size() <= SHOW_BUFFER_ACTUATOR_SIZE ? sortList : sortList.subList(0, SHOW_BUFFER_ACTUATOR_SIZE));
         }
-        return list;
-    }
-
-    private void collect(List<MetricResponse> result, BufferActuator bufferActuator, BufferActuatorMetricEnum metricEnum) {
-        ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) bufferActuator.getExecutor();
-        ThreadPoolExecutor pool = threadTask.getThreadPoolExecutor();
-        StringBuilder msg = new StringBuilder();
-        msg.append(ThreadPoolMetricEnum.CORE_SIZE.getMetricName()).append(StringUtil.COLON).append(pool.getCorePoolSize()).append(StringUtil.SPACE);
-        msg.append(ThreadPoolMetricEnum.QUEUE_UP.getMetricName()).append(StringUtil.COLON).append(pool.getQueue().size()).append(StringUtil.SPACE);
-        msg.append(ThreadPoolMetricEnum.ACTIVE.getMetricName()).append(StringUtil.COLON).append(pool.getActiveCount()).append(StringUtil.SPACE);
-        msg.append(ThreadPoolMetricEnum.REMAINING_CAPACITY.getMetricName()).append(StringUtil.COLON).append(pool.getQueue().remainingCapacity()).append(StringUtil.SPACE);
-        msg.append("堆积").append(StringUtil.COLON).append(bufferActuator.getQueue().size()).append(StringUtil.SPACE);
-        msg.append("容量").append(StringUtil.COLON).append(bufferActuator.getQueueCapacity()).append(StringUtil.SPACE);
-        msg.append(ThreadPoolMetricEnum.COMPLETED.getMetricName()).append(StringUtil.COLON).append(pool.getCompletedTaskCount());
-        result.add(new MetricResponse(metricEnum.getCode(), metricEnum.getGroup(), metricEnum.getMetricName(), Arrays.asList(new Sample(StatisticEnum.COUNT.getTagValueRepresentation(), msg.toString()))));
+        return list.stream().map(info -> info.getResponse()).collect(Collectors.toList());
     }
 
     @Override
@@ -298,13 +304,13 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
         return queryMappingMetricCount(metaAll, (query) -> query.addFilter(ConfigConstant.DATA_EVENT, ConnectorConstant.OPERTION_DELETE));
     }
 
-    private long queryMappingMetricCount(List<Meta> metaAll, QueryMappingOperation operation) {
+    private long queryMappingMetricCount(List<Meta> metaAll, Consumer<Query> operation) {
         AtomicLong total = new AtomicLong(0);
         if (!CollectionUtils.isEmpty(metaAll)) {
             Query query = new Query(1, 1);
             query.setQueryTotal(true);
             query.setType(StorageEnum.DATA);
-            operation.apply(query);
+            operation.accept(query);
             metaAll.forEach(meta -> {
                 query.setMetaId(meta.getId());
                 Paging paging = manager.queryData(query);
@@ -314,8 +320,19 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
         return total.get();
     }
 
-    private interface QueryMappingOperation {
-        void apply(Query query);
+    private MetricResponseInfo collect(BufferActuator bufferActuator, String code, String group, String metricName) {
+        MetricResponseInfo info = new MetricResponseInfo();
+        ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) bufferActuator.getExecutor();
+        ThreadPoolExecutor pool = threadTask.getThreadPoolExecutor();
+        info.setQueueUp(bufferActuator.getQueue().size());
+        StringBuilder msg = new StringBuilder();
+        msg.append("堆积").append(StringUtil.COLON).append(info.getQueueUp());
+        msg.append(StringUtil.FORWARD_SLASH).append(bufferActuator.getQueueCapacity()).append(StringUtil.SPACE);
+        msg.append(ThreadPoolMetricEnum.CORE_SIZE.getMetricName()).append(StringUtil.COLON).append(pool.getActiveCount());
+        msg.append(StringUtil.FORWARD_SLASH).append(pool.getCorePoolSize()).append(StringUtil.SPACE);
+        msg.append(ThreadPoolMetricEnum.COMPLETED.getMetricName()).append(StringUtil.COLON).append(pool.getCompletedTaskCount());
+        info.setResponse(new MetricResponse(code, group, metricName, Arrays.asList(new Sample(StatisticEnum.COUNT.getTagValueRepresentation(), msg.toString()))));
+        return info;
     }
 
 }

+ 24 - 0
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/model/MetricResponseInfo.java

@@ -0,0 +1,24 @@
+package org.dbsyncer.monitor.model;
+
+public class MetricResponseInfo {
+
+    private MetricResponse response;
+
+    private long queueUp;
+
+    public MetricResponse getResponse() {
+        return response;
+    }
+
+    public void setResponse(MetricResponse response) {
+        this.response = response;
+    }
+
+    public long getQueueUp() {
+        return queueUp;
+    }
+
+    public void setQueueUp(long queueUp) {
+        this.queueUp = queueUp;
+    }
+}