AE86 3 vuotta sitten
vanhempi
säilyke
fb56071421

+ 3 - 14
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -10,22 +10,11 @@ import java.util.Map;
 public interface Event {
 
     /**
-     * 日志数据变更事件
+     * 数据变更事件
      *
-     * @param rowChangedEvent
+     * @param event
      */
-    default void changedLogEvent(RowChangedEvent rowChangedEvent) {
-        // nothing to do
-    }
-
-    /**
-     * 定时数据变更事件
-     *
-     * @param rowChangedEvent
-     */
-    default void changedQuartzEvent(RowChangedEvent rowChangedEvent){
-        // nothing to do
-    }
+    void changedEvent(RowChangedEvent event);
 
     /**
      * 写入增量点事件

+ 20 - 14
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -13,10 +13,9 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * @version 1.0.0
@@ -26,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 public abstract class AbstractExtractor implements Extractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
-    protected BlockingQueue queue = new LinkedBlockingQueue<>(100);
     protected Executor taskExecutor;
     protected ConnectorFactory connectorFactory;
     protected ScheduledTaskService scheduledTaskService;
@@ -34,8 +32,14 @@ public abstract class AbstractExtractor implements Extractor {
     protected ListenerConfig listenerConfig;
     protected Map<String, String> snapshot;
     protected Set<String> filterTable;
+    protected AtomicInteger taskCounter = new AtomicInteger();
     private List<Event> watcher;
 
+    @Override
+    public int getStackingSize() {
+        return taskCounter.get();
+    }
+
     @Override
     public void addListener(Event event) {
         if (null != event) {
@@ -55,16 +59,18 @@ public abstract class AbstractExtractor implements Extractor {
     }
 
     @Override
-    public void changedQuartzEvent(RowChangedEvent rowChangedEvent) {
-        if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.changedQuartzEvent(rowChangedEvent));
-        }
-    }
-
-    @Override
-    public void changedLogEvent(RowChangedEvent rowChangedEvent) {
+    public void changedEvent(RowChangedEvent event) {
         if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.changedLogEvent(rowChangedEvent));
+            watcher.forEach(w -> {
+                try {
+                    taskCounter.incrementAndGet();
+                    w.changedEvent(event);
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                } finally {
+                    taskCounter.decrementAndGet();
+                }
+            });
         }
     }
 
@@ -97,8 +103,8 @@ public abstract class AbstractExtractor implements Extractor {
         }
     }
 
-    protected void asynSendRowChangedEvent(RowChangedEvent rowChangedEvent) {
-        taskExecutor.execute(() -> changedLogEvent(rowChangedEvent));
+    protected void asynSendRowChangedEvent(RowChangedEvent event) {
+        taskExecutor.execute(() -> changedEvent(event));
     }
 
     public void setTaskExecutor(Executor taskExecutor) {

+ 10 - 10
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -28,18 +28,11 @@ public interface Extractor {
     void clearAllListener();
 
     /**
-     * 定时模式: 监听增量事件
+     * 数据变更事件
      *
-     * @param rowChangedEvent
-     */
-    void changedQuartzEvent(RowChangedEvent rowChangedEvent);
-
-    /**
-     * 日志模式: 监听增量事件
-     *
-     * @param rowChangedEvent
+     * @param event
      */
-    void changedLogEvent(RowChangedEvent rowChangedEvent);
+    void changedEvent(RowChangedEvent event);
 
     /**
      * 刷新增量点事件
@@ -65,4 +58,11 @@ public interface Extractor {
      */
     void interruptException(Exception e);
 
+    /**
+     * 获取堆积任务数
+     *
+     * @return
+     */
+    int getStackingSize();
+
 }

+ 1 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -118,6 +118,7 @@ public class MysqlExtractor extends AbstractExtractor {
     }
 
     private void reStart() {
+        taskCounter.set(0);
         for (int i = 1; i <= RETRY_TIMES; i++) {
             try {
                 if (null != client) {

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/OracleExtractor.java

@@ -25,7 +25,7 @@ public class OracleExtractor extends AbstractExtractor {
             String username = config.getUsername();
             String password = config.getPassword();
             String url = config.getUrl();
-            client = new DBChangeNotification(username, password, url, queue);
+            client = new DBChangeNotification(username, password, url);
             client.setFilterTable(filterTable);
             client.addRowEventListener((e) -> asynSendRowChangedEvent(e));
             client.start();

+ 1 - 9
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -48,23 +48,15 @@ public class DBChangeNotification {
     private OracleStatement            statement;
     private DatabaseChangeRegistration dcr;
     private Map<Integer, String>       tables;
-    private BlockingQueue<DCNEvent>    queue;
     private Worker                     worker;
     private Set<String>                filterTable;
     private List<RowEventListener>     listeners = new ArrayList<>();
+    private BlockingQueue<DCNEvent>    queue = new LinkedBlockingQueue<> (100);
 
     public DBChangeNotification(String username, String password, String url) {
         this.username = username;
         this.password = password;
         this.url = url;
-        this.queue = new LinkedBlockingQueue<> (100);
-    }
-
-    public DBChangeNotification(String username, String password, String url, BlockingQueue queue) {
-        this.username = username;
-        this.password = password;
-        this.url = url;
-        this.queue = queue;
     }
 
     public void addRowEventListener(RowEventListener rowEventListener) {

+ 3 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java

@@ -90,15 +90,15 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
             for (Map<String, Object> row : data) {
                 event = row.get(eventFieldName);
                 if (update.contains(event)) {
-                    changedQuartzEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_MAP, row));
+                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_MAP, row));
                     continue;
                 }
                 if (insert.contains(event)) {
-                    changedQuartzEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_MAP, row));
+                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_MAP, row));
                     continue;
                 }
                 if (delete.contains(event)) {
-                    changedQuartzEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_DELETE, row, Collections.EMPTY_MAP));
+                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_DELETE, row, Collections.EMPTY_MAP));
                     continue;
                 }
 

+ 4 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -119,6 +119,7 @@ public class SqlServerExtractor extends AbstractExtractor {
     }
 
     private void connect() {
+        taskCounter.set(0);
         DatabaseConfig cfg = (DatabaseConfig) connectorConfig;
         if (connectorFactory.isAlive(cfg)) {
             connectorMapper = connectorFactory.connect(cfg);
@@ -257,17 +258,17 @@ public class SqlServerExtractor extends AbstractExtractor {
         for (CDCEvent event : list) {
             int code = event.getCode();
             if (TableOperationEnum.isUpdateAfter(code)) {
-                changedLogEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, event.getRow()));
+                changedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, event.getRow()));
                 continue;
             }
 
             if (TableOperationEnum.isInsert(code)) {
-                changedLogEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, event.getRow()));
+                changedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, event.getRow()));
                 continue;
             }
 
             if (TableOperationEnum.isDelete(code)) {
-                changedLogEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, event.getRow(), Collections.EMPTY_LIST));
+                changedEvent(new RowChangedEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, event.getRow(), Collections.EMPTY_LIST));
             }
         }
     }

+ 9 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/Puller.java

@@ -10,4 +10,13 @@ public interface Puller {
 
     void close(String metaId);
 
+    /**
+     * 获取堆积任务数
+     *
+     * @return
+     */
+    default int getStackingSize() {
+        return 0;
+    }
+
 }

+ 12 - 5
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -132,6 +132,13 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         logger.info("关闭成功:{}", metaId);
     }
 
+    @Override
+    public int getStackingSize() {
+        AtomicInteger counter = new AtomicInteger();
+        map.values().forEach(extractor -> counter.getAndAdd(extractor.getStackingSize()));
+        return counter.get();
+    }
+
     @Override
     public void run() {
         // 定时同步增量信息
@@ -187,9 +194,9 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
     }
 
     abstract class AbstractListener implements Event {
-        protected Mapping                   mapping;
-        protected String                    metaId;
-        protected AtomicBoolean             changed = new AtomicBoolean();
+        protected Mapping mapping;
+        protected String metaId;
+        protected AtomicBoolean changed = new AtomicBoolean();
 
         @Override
         public void flushEvent(Map<String, String> map) {
@@ -252,7 +259,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         }
 
         @Override
-        public void changedQuartzEvent(RowChangedEvent rowChangedEvent) {
+        public void changedEvent(RowChangedEvent rowChangedEvent) {
             final FieldPicker picker = tablePicker.get(rowChangedEvent.getTableGroupIndex());
             logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}", picker.getTableGroup().getSourceTable().getName(),
                     rowChangedEvent.getEvent(),
@@ -311,7 +318,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         }
 
         @Override
-        public void changedLogEvent(RowChangedEvent rowChangedEvent) {
+        public void changedEvent(RowChangedEvent rowChangedEvent) {
             // 处理过程有异常向上抛
             List<FieldPicker> pickers = tablePicker.get(rowChangedEvent.getTableName());
             if (!CollectionUtils.isEmpty(pickers)) {

+ 5 - 1
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -5,6 +5,7 @@ import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.manager.puller.Puller;
 import org.dbsyncer.monitor.enums.MetricEnum;
 import org.dbsyncer.monitor.enums.StatisticEnum;
 import org.dbsyncer.monitor.enums.ThreadPoolMetricEnum;
@@ -44,6 +45,9 @@ public class MonitorFactory implements Monitor {
     @Autowired
     private Manager manager;
 
+    @Autowired
+    private Puller incrementPuller;
+
     @Autowired
     private Executor taskExecutor;
 
@@ -193,7 +197,7 @@ public class MonitorFactory implements Monitor {
      * @return
      */
     private long getTaskNumber() {
-        return 0;
+        return incrementPuller.getStackingSize();
     }
 
     private MetricResponse createMetricResponse(ThreadPoolMetricEnum metricEnum, Object value) {

+ 0 - 49
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/databus/DataBusController.java

@@ -1,49 +0,0 @@
-package org.dbsyncer.web.controller.databus;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.messaging.simp.SimpMessagingTemplate;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Controller;
-import org.springframework.ui.Model;
-import org.springframework.web.bind.annotation.RequestMapping;
-
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-
-@Controller
-@RequestMapping("/dataBus")
-public class DataBusController {
-
-    @Autowired
-    private SimpMessagingTemplate messagingTemplate;
-
-    @RequestMapping("")
-    public String getView(Model model) {
-        return "databus/databus.html";
-    }
-
-    /**
-     * 定时推送消息
-     */
-    @Scheduled(fixedRate = 1000)
-    public void callback() {
-        String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
-        messagingTemplate.convertAndSend("/topic/callback", "定时推送消息时间: " + format);
-    }
-
-    /**
-     * 第一位,表示秒,取值0-59
-     * 第二位,表示分,取值0-59
-     * 第三位,表示小时,取值0-23
-     * 第四位,日期天/日,取值1-31
-     * 第五位,日期月份,取值1-12
-     * 第六位,星期,取值1-7
-     * [秒 分 时 日 月 星期]
-     */
-    @Scheduled(cron = "0 0 */1 * * ?")
-    public void testCron() {
-        String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
-        System.out.println("定时任务:" + format);
-    }
-
-}

+ 0 - 56
dbsyncer-web/src/main/resources/public/databus/databus.html

@@ -1,56 +0,0 @@
-<html>
-<head>
-    <meta charset="UTF-8"/>
-    <title>广播式WebSocket</title>
-    <script src="https://cdn.bootcss.com/sockjs-client/1.4.0/sockjs.min.js"></script>
-    <script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
-    <script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script>
-</head>
-<body onload="disconnect()">
-<noscript><h2 style="color: #e80b0a;">Sorry,浏览器不支持WebSocket</h2></noscript>
-<div>
-    <div>
-        <button id="connect" onclick="connect();">连接</button>
-        <button id="disconnect" disabled="disabled" onclick="disconnect();">断开连接</button>
-    </div>
-
-    <div id="conversationDiv">
-        <label>服务端响应:</label>
-        <p id="callback"></p>
-    </div>
-</div>
-<script type="text/javascript">
-    var stompClient = null;
-
-    function setConnected(connected) {
-        document.getElementById("connect").disabled = connected;
-        document.getElementById("disconnect").disabled = !connected;
-        document.getElementById("conversationDiv").style.visibility = connected ? 'visible' : 'hidden';
-        $("#response").html();
-        $("#callback").html();
-    }
-
-    function connect() {
-        var socket = new SockJS('/simple');
-        stompClient = Stomp.over(socket);
-        stompClient.connect({}, function (frame) {
-            setConnected(true);
-            console.log('Connected:' + frame);
-
-            stompClient.subscribe('/topic/callback', function (response) {
-                $("#callback").html(response.body);
-            });
-        });
-    }
-
-    function disconnect() {
-        if (stompClient != null) {
-            stompClient.disconnect();
-        }
-        setConnected(false);
-        console.log('Disconnected');
-    }
-
-</script>
-</body>
-</html>

+ 72 - 62
dbsyncer-web/src/main/resources/public/monitor/monitor.html

@@ -6,68 +6,6 @@
 <div class="container-fluid">
     <div class="row">
         <form class="form-horizontal" role="form" method="post">
-
-            <!-- 数据 -->
-            <div class="col-md-12">
-                <div class="form-group">
-                    <div class="col-md-3">
-                        <!-- 驱动下拉 -->
-                        <select id="searchMetaData" class="form-control select-control">
-                            <option th:each="m,s:${meta}" th:value="${m?.id}" th:text="${m?.mappingName} +' (' + ${m?.model} +')'" th:selected="${m?.id eq metaId}"/>
-                        </select>
-                    </div>
-                    <div class="col-md-1">
-                        <!-- 是否包含成功 -->
-                        <select id="searchDataSuccess" class="form-control select-control">
-                            <option th:value="${c?.value}" th:text="${c?.message}" th:each="c,state : ${storageDataStatus}"/>
-                        </select>
-                    </div>
-                    <div class="col-sm-4">
-                        <input id="searchDataKeyword" class="form-control" type="text" maxlength="32" placeholder="请输入异常关键字(最多32个字)." />
-                    </div>
-                    <div class="col-md-1">
-                        <button id="queryDataBtn" type="button" class="btn btn-primary">查询数据</button>
-                    </div>
-                    <div class="col-md-3 text-right">
-                        <button th:id="${metaId}" type="button" class="btn btn-default clearDataBtn">清空数据</button>
-                    </div>
-                </div>
-
-                <table class="table table-hover metaDataList">
-                    <thead>
-                    <tr>
-                        <th style="width:3%;"></th>
-                        <th style="width:5%;">事件</th>
-                        <th style="width:5%;">结果</th>
-                        <th style="width:60%;">异常</th>
-                        <th style="width:17%;">时间</th>
-                        <th style="width:10%;">详情</th>
-                    </tr>
-                    </thead>
-                    <tbody id="dataList">
-                        <tr th:each="d,s : ${pagingData?.data}">
-                            <td th:text="${s.index}+1"></td>
-                            <td th:text="${d?.event}"></td>
-                            <td>
-                                <span th:if="${d?.success == 1}" class="label label-success">成功</span>
-                                <span th:if="${d?.success == 0}" class="label label-warning">失败</span>
-                            </td>
-                            <td style="max-width:100px;" class="dbsyncer_over_hidden"><a href="javascript:;" class="dbsyncer_pointer queryError">[[${d?.error}]]</a></td>
-                            <td th:text="${#dates.format(d?.createTime, 'yyyy-MM-dd HH:mm:ss')}"></td>
-                            <td><a th:json="${d?.json}" href="javascript:;" class="label label-info queryData">查看数据</a><div class="hidden" th:text="${d?.json}"></div></td>
-                        </tr>
-                    </tbody>
-                </table>
-
-                <div class="form-group">
-                    <div class="col-md-5">共计: <span id="dataTotal">[[${pagingData?.total}]]</span>条</div>
-                    <div class="col-md-7">
-                        <a href="javascript:void(0);" id="queryDataMore" num="1">显示更多<i class="fa fa-angle-double-down" aria-hidden="true"></i></a>
-                    </div>
-                </div>
-
-            </div>
-
             <!-- 应用性能 -->
             <div class="col-md-12">
                 <div class="panel-group">
@@ -125,6 +63,78 @@
                 </div>
             </div>
 
+            <!-- 数据 -->
+            <div class="col-md-12">
+                <div class="panel-group">
+                    <div class="panel panel-default">
+                        <div class="panel-heading">
+                            <h4 class="panel-title">
+                                <u data-toggle="collapse" class="dbsyncer_pointer" href="#queryDataPanel">查询数据</u>
+                            </h4>
+                        </div>
+                        <div id="queryDataPanel" class="panel-body panel-collapse collapse in">
+                            <div class="form-group">
+                                <div class="col-md-3">
+                                    <!-- 驱动下拉 -->
+                                    <select id="searchMetaData" class="form-control select-control">
+                                        <option th:each="m,s:${meta}" th:value="${m?.id}" th:text="${m?.mappingName} +' (' + ${m?.model} +')'" th:selected="${m?.id eq metaId}"/>
+                                    </select>
+                                </div>
+                                <div class="col-md-1">
+                                    <!-- 是否包含成功 -->
+                                    <select id="searchDataSuccess" class="form-control select-control">
+                                        <option th:value="${c?.value}" th:text="${c?.message}" th:each="c,state : ${storageDataStatus}"/>
+                                    </select>
+                                </div>
+                                <div class="col-sm-4">
+                                    <input id="searchDataKeyword" class="form-control" type="text" maxlength="32" placeholder="请输入异常关键字(最多32个字)." />
+                                </div>
+                                <div class="col-md-1">
+                                    <button id="queryDataBtn" type="button" class="btn btn-primary">查询数据</button>
+                                </div>
+                                <div class="col-md-3 text-right">
+                                    <button th:id="${metaId}" type="button" class="btn btn-default clearDataBtn">清空数据</button>
+                                </div>
+                            </div>
+
+                            <table class="table table-hover metaDataList">
+                                <thead>
+                                <tr>
+                                    <th style="width:3%;"></th>
+                                    <th style="width:5%;">事件</th>
+                                    <th style="width:5%;">结果</th>
+                                    <th style="width:60%;">异常</th>
+                                    <th style="width:17%;">时间</th>
+                                    <th style="width:10%;">详情</th>
+                                </tr>
+                                </thead>
+                                <tbody id="dataList">
+                                <tr th:each="d,s : ${pagingData?.data}">
+                                    <td th:text="${s.index}+1"></td>
+                                    <td th:text="${d?.event}"></td>
+                                    <td>
+                                        <span th:if="${d?.success == 1}" class="label label-success">成功</span>
+                                        <span th:if="${d?.success == 0}" class="label label-warning">失败</span>
+                                    </td>
+                                    <td style="max-width:100px;" class="dbsyncer_over_hidden"><a href="javascript:;" class="dbsyncer_pointer queryError">[[${d?.error}]]</a></td>
+                                    <td th:text="${#dates.format(d?.createTime, 'yyyy-MM-dd HH:mm:ss')}"></td>
+                                    <td><a th:json="${d?.json}" href="javascript:;" class="label label-info queryData">查看数据</a><div class="hidden" th:text="${d?.json}"></div></td>
+                                </tr>
+                                </tbody>
+                            </table>
+
+                            <div class="form-group">
+                                <div class="col-md-5">共计: <span id="dataTotal">[[${pagingData?.total}]]</span>条</div>
+                                <div class="col-md-7">
+                                    <a href="javascript:void(0);" id="queryDataMore" num="1">显示更多<i class="fa fa-angle-double-down" aria-hidden="true"></i></a>
+                                </div>
+                            </div>
+                        </div>
+
+                    </div>
+                </div>
+            </div>
+
             <!-- 日志 -->
             <div class="col-md-12">
                 <div class="panel-group">