AE86 3 yıl önce
ebeveyn
işleme
51325e3049

+ 1 - 1
README.md

@@ -1,6 +1,6 @@
 <div>
     <h3>介绍</h3>
-    <p>DBSyncer是一款开源的数据同步软件,提供Mysql、Oracle、SqlServer、SQL结果集等场景,支持自定义同步转换业务。</p>
+    <p>DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServer、SQL结果集等同步场景。支持上传插件自定义同步转换业务,提供监控全量和增量数据统计图、应用性能预警等。</p>
     <p>特点</p>
     <ol>
         <li>组合驱动,自定义库同步到库组合,关系型数据库与非关系型之间组合,任意搭配表同步映射关系</li>

+ 1 - 17
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -15,7 +15,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * @version 1.0.0
@@ -32,14 +31,8 @@ public abstract class AbstractExtractor implements Extractor {
     protected ListenerConfig listenerConfig;
     protected Map<String, String> snapshot;
     protected Set<String> filterTable;
-    protected AtomicInteger taskCounter = new AtomicInteger();
     private List<Event> watcher;
 
-    @Override
-    public int getStackingSize() {
-        return taskCounter.get();
-    }
-
     @Override
     public void addListener(Event event) {
         if (null != event) {
@@ -61,16 +54,7 @@ public abstract class AbstractExtractor implements Extractor {
     @Override
     public void changedEvent(RowChangedEvent event) {
         if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> {
-                try {
-                    taskCounter.incrementAndGet();
-                    w.changedEvent(event);
-                } catch (Exception e) {
-                    logger.error(e.getMessage());
-                } finally {
-                    taskCounter.decrementAndGet();
-                }
-            });
+            watcher.forEach(w -> w.changedEvent(event));
         }
     }
 

+ 0 - 7
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -58,11 +58,4 @@ public interface Extractor {
      */
     void interruptException(Exception e);
 
-    /**
-     * 获取堆积任务数
-     *
-     * @return
-     */
-    int getStackingSize();
-
 }

+ 0 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -118,7 +118,6 @@ public class MysqlExtractor extends AbstractExtractor {
     }
 
     private void reStart() {
-        taskCounter.set(0);
         for (int i = 1; i <= RETRY_TIMES; i++) {
             try {
                 if (null != client) {

+ 0 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -119,7 +119,6 @@ public class SqlServerExtractor extends AbstractExtractor {
     }
 
     private void connect() {
-        taskCounter.set(0);
         DatabaseConfig cfg = (DatabaseConfig) connectorConfig;
         if (connectorFactory.isAlive(cfg)) {
             connectorMapper = connectorFactory.connect(cfg);

+ 0 - 9
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/Puller.java

@@ -10,13 +10,4 @@ public interface Puller {
 
     void close(String metaId);
 
-    /**
-     * 获取堆积任务数
-     *
-     * @return
-     */
-    default int getStackingSize() {
-        return 0;
-    }
-
 }

+ 0 - 7
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -132,13 +132,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         logger.info("关闭成功:{}", metaId);
     }
 
-    @Override
-    public int getStackingSize() {
-        AtomicInteger counter = new AtomicInteger();
-        map.values().forEach(extractor -> counter.getAndAdd(extractor.getStackingSize()));
-        return counter.get();
-    }
-
     @Override
     public void run() {
         // 定时同步增量信息

+ 15 - 14
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -5,7 +5,6 @@ import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.manager.Manager;
-import org.dbsyncer.manager.puller.Puller;
 import org.dbsyncer.monitor.enums.MetricEnum;
 import org.dbsyncer.monitor.enums.StatisticEnum;
 import org.dbsyncer.monitor.enums.ThreadPoolMetricEnum;
@@ -21,6 +20,7 @@ import org.dbsyncer.storage.query.Query;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.cache.annotation.Cacheable;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
@@ -28,6 +28,7 @@ import org.springframework.stereotype.Component;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
@@ -45,12 +46,15 @@ public class MonitorFactory implements Monitor {
     @Autowired
     private Manager manager;
 
-    @Autowired
-    private Puller incrementPuller;
-
     @Autowired
     private Executor taskExecutor;
 
+    /**
+     * 工作线程池队列容量
+     */
+    @Value(value = "${dbsyncer.web.thread.pool.queue.capacity}")
+    private int queueCapacity;
+
     @Override
     @Cacheable(value = "connector", keyGenerator = "cacheKeyGenerator")
     public boolean isAlive(String id) {
@@ -137,7 +141,13 @@ public class MonitorFactory implements Monitor {
         report.setInsert(getMappingInsert(metaAll));
         report.setUpdate(getMappingUpdate(metaAll));
         report.setDelete(getMappingDelete(metaAll));
-        report.setTaskNumber(getTaskNumber());
+
+        // 线程池使用情况
+        ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) taskExecutor;
+        ThreadPoolExecutor pool = threadTask.getThreadPoolExecutor();
+        BlockingQueue<Runnable> queue = pool.getQueue();
+        report.setQueueUp(queue.size());
+        report.setQueueCapacity(queueCapacity);
         return report;
     }
 
@@ -191,15 +201,6 @@ public class MonitorFactory implements Monitor {
         return queryMappingMetricCount(metaAll, (query) -> query.addFilter(ConfigConstant.DATA_EVENT, ConnectorConstant.OPERTION_DELETE));
     }
 
-    /**
-     * 获取所有监听器队列待处理数
-     *
-     * @return
-     */
-    private long getTaskNumber() {
-        return incrementPuller.getStackingSize();
-    }
-
     private MetricResponse createMetricResponse(ThreadPoolMetricEnum metricEnum, Object value) {
         return new MetricResponse(metricEnum.getCode(), metricEnum.getGroup(), metricEnum.getMetricName(), Arrays.asList(new Sample(StatisticEnum.COUNT.getTagValueRepresentation(), value)));
     }

+ 2 - 2
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/enums/ThreadPoolMetricEnum.java

@@ -26,9 +26,9 @@ public enum ThreadPoolMetricEnum {
      */
     COMPLETED("thread.pool.completed", "线程池", "已完成"),
     /**
-     * 队列长度
+     * 空闲队列
      */
-    REMAINING_CAPACITY("thread.pool.remaining.capacity", "线程池", "队列长度");
+    REMAINING_CAPACITY("thread.pool.remaining.capacity", "线程池", "空闲队列");
 
     private String code;
     private String group;

+ 18 - 5
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/model/AppReportMetric.java

@@ -30,7 +30,12 @@ public class AppReportMetric {
     /**
      * 待处理数
      */
-    private long taskNumber;
+    private long queueUp;
+
+    /**
+     * 队列长度
+     */
+    private long queueCapacity;
 
     public long getSuccess() {
         return success;
@@ -72,11 +77,19 @@ public class AppReportMetric {
         this.delete = delete;
     }
 
-    public long getTaskNumber() {
-        return taskNumber;
+    public long getQueueUp() {
+        return queueUp;
+    }
+
+    public void setQueueUp(long queueUp) {
+        this.queueUp = queueUp;
+    }
+
+    public long getQueueCapacity() {
+        return queueCapacity;
     }
 
-    public void setTaskNumber(long taskNumber) {
-        this.taskNumber = taskNumber;
+    public void setQueueCapacity(long queueCapacity) {
+        this.queueCapacity = queueCapacity;
     }
 }

+ 9 - 2
dbsyncer-web/src/main/java/org/dbsyncer/web/config/TaskPoolConfig.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.web.config;
 
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -15,6 +16,12 @@ import java.util.concurrent.ThreadPoolExecutor;
 @Configuration
 public class TaskPoolConfig {
 
+    /**
+     * 工作线程池队列容量
+     */
+    @Value(value = "${dbsyncer.web.thread.pool.queue.capacity}")
+    private int queueCapacity;
+
     @Bean("taskExecutor")
     public Executor taskExecutor() {
         //注意这一行日志:2. do submit,taskCount [101], completedTaskCount [87], activeCount [5], queueSize [9]
@@ -25,8 +32,8 @@ public class TaskPoolConfig {
         //最大线程数128:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
         //maxPoolSize 当系统负载大道最大值时,核心线程数已无法按时处理完所有任务,这是就需要增加线程.每秒200个任务需要20个线程,那么当每秒1000个任务时,则需要(1000-queueCapacity)*(20/200),即60个线程,可将maxPoolSize设置为60;
         executor.setMaxPoolSize(128);
-        //缓冲队列1000:用来缓冲执行任务的队列
-        executor.setQueueCapacity(1000);
+        //缓冲队列:用来缓冲执行任务的队列
+        executor.setQueueCapacity(queueCapacity);
         //允许线程的空闲时间30秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
         executor.setKeepAliveSeconds(30);
         //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池

+ 1 - 0
dbsyncer-web/src/main/resources/application.properties

@@ -5,6 +5,7 @@ server.port=18686
 dbsyncer.web.login.username=admin
 dbsyncer.web.login.password=0DPiKuNIrrVmD8IUCuw1hQxNqZc=
 dbsyncer.web.cache.connector.timeout=5
+dbsyncer.web.thread.pool.queue.capacity=1000
 server.servlet.session.timeout=1800
 server.servlet.context-path=/
 

+ 29 - 4
dbsyncer-web/src/main/resources/public/monitor/monitor.html

@@ -19,19 +19,44 @@
                             <div class="col-md-6">
                                 <div class="row">
                                     <div class="col-md-6">
-                                        <div id="totalChart" style="height: 260px; max-width: 260px; margin: 0px auto;"></div>
+                                        <div class="row">
+                                            <div class="col-md-6">
+                                                <h3><small>总共</small>&nbsp;<span class="label label-info" id="totalSpan">0</span></h3>
+                                            </div>
+                                            <div class="col-md-6">
+                                                <h3><small>新增</small>&nbsp;<span class="label label-default" id="insertSpan">0</span></h3>
+                                            </div>
+                                        </div>
+                                        <div class="row">
+                                            <div class="col-md-6">
+                                                <h3><small>成功</small>&nbsp;<span class="label label-success" id="successSpan">0</span></h3>
+                                            </div>
+                                            <div class="col-md-6">
+                                                <h3><small>修改</small>&nbsp;<span class="label label-default" id="updateSpan">0</span></h3>
+                                            </div>
+                                        </div>
+                                        <div class="row">
+                                            <div class="col-md-6">
+                                                <h3><small>失败</small>&nbsp;<span class="label label-warning" id="failSpan">0</span></h3>
+                                            </div>
+                                            <div class="col-md-6">
+                                                <h3><small>删除</small>&nbsp;<span class="label label-default" id="deleteSpan">0</span></h3>
+                                            </div>
+                                        </div>
                                     </div>
 
                                     <div class="col-md-6">
-                                        <div id="eventChart" style="height: 260px; max-width: 260px; margin: 0px auto;"></div>
+                                        <div id="queueChart" style="height: 260px; max-width: 260px; margin: 0px auto;"></div>
                                     </div>
                                 </div>
                                 <div class="row">
                                     <div class="col-md-6">
-                                        <div id="queueChart" style="height: 260px; max-width: 260px; margin: 0px auto;"></div>
+                                        <div id="totalChart" style="height: 260px; max-width: 260px; margin: 0px auto;"></div>
                                     </div>
 
-                                    <div class="col-md-6"></div>
+                                    <div class="col-md-6">
+                                        <div id="eventChart" style="height: 260px; max-width: 260px; margin: 0px auto;"></div>
+                                    </div>
                                 </div>
                             </div>
 

+ 14 - 7
dbsyncer-web/src/main/resources/static/js/monitor/index.js

@@ -212,7 +212,7 @@ function showLog($logList, arr, append){
 }
 
 // 堆积数据
-function showQueueChart(taskNumber){
+function showQueueChart(queueUp, queueCapacity){
     var option={
         title:{
             text:"堆积数据",
@@ -229,11 +229,11 @@ function showQueueChart(taskNumber){
                 animation: true,
                 type: 'gauge',
                 min: 0,
-                max: 1000,
+                max: queueCapacity,
                 splitNumber: 5,
                 axisLine: {            // 坐标轴线
                     lineStyle: {       // 属性lineStyle控制线条样式
-                        color: [[0.1, '#d9534f'], [0.3, '#f0ad4e'],[0.8, '#5bc0de'],[1, '#5cb85c']],
+                        color: [[0.1, '#5cb85c'], [0.3, '#5bc0de'],[0.8, '#f0ad4e'],[1, '#d9534f']],
                         width: 10
                     }
                 },
@@ -250,7 +250,7 @@ function showQueueChart(taskNumber){
                     }
                 },
                 detail: {fontSize:12, offsetCenter:[0,'65%']},
-                data: [{value: taskNumber, name: ''}]
+                data: [{value: queueUp, name: ''}]
             }
         ]
     };
@@ -296,6 +296,10 @@ function showEventChart(ins, upd, del){
         ]
     };
     echarts.init(document.getElementById('eventChart')).setOption(option);
+
+    $("#insertSpan").html(ins);
+    $("#updateSpan").html(upd);
+    $("#deleteSpan").html(del);
 }
 
 // 统计成功失败
@@ -336,6 +340,10 @@ function showTotalChart(success, fail){
         ]
     };
     echarts.init(document.getElementById('totalChart')).setOption(option);
+
+    $("#totalSpan").html(success + fail);
+    $("#successSpan").html(success);
+    $("#failSpan").html(fail);
 }
 
 $(function () {
@@ -347,8 +355,7 @@ $(function () {
 
     // 连接类型切换事件
     $("#searchMetaData").change(function () {
-        var $id = $(this).val();
-        doLoader('/monitor?id=' + $id);
+        $("#queryDataBtn").click();
     });
     // 数据状态切换事件
     $("#searchDataSuccess").change(function () {
@@ -369,7 +376,7 @@ $(function () {
             var report = data.resultValue;
             showTotalChart(report.success, report.fail);
             showEventChart(report.insert, report.update, report.delete);
-            showQueueChart(report.taskNumber);
+            showQueueChart(report.queueUp, report.queueCapacity);
         } else {
             bootGrowl(data.resultValue, "danger");
         }