1
0
Эх сурвалжийг харах

消息通知 https://gitee.com/ghi/dbsyncer/issues/I66LUK

AE86 2 жил өмнө
parent
commit
d134c20697

+ 41 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java

@@ -15,6 +15,8 @@ import org.dbsyncer.biz.vo.LogVo;
 import org.dbsyncer.biz.vo.MetaVo;
 import org.dbsyncer.biz.vo.MetricResponseVo;
 import org.dbsyncer.common.model.Paging;
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.NumberUtil;
@@ -52,7 +54,7 @@ import java.util.stream.Collectors;
  * @date 2020/04/27 10:20
  */
 @Service
-public class MonitorServiceImpl implements MonitorService {
+public class MonitorServiceImpl extends BaseServiceImpl implements MonitorService, ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -62,6 +64,9 @@ public class MonitorServiceImpl implements MonitorService {
     @Resource
     private DataSyncService dataSyncService;
 
+    @Resource
+    private ScheduledTaskService scheduledTaskService;
+
     private Map<String, MetricDetailFormatter> metricDetailFormatterMap = new LinkedHashMap<>();
 
     @PostConstruct
@@ -84,6 +89,9 @@ public class MonitorServiceImpl implements MonitorService {
         metricDetailFormatterMap.putIfAbsent(DiskMetricEnum.THRESHOLD.getCode(), new DiskMetricDetailFormatter());
         metricDetailFormatterMap.putIfAbsent(DiskMetricEnum.FREE.getCode(), new DiskMetricDetailFormatter());
         metricDetailFormatterMap.putIfAbsent(DiskMetricEnum.TOTAL.getCode(), new DiskMetricDetailFormatter());
+
+        // 间隔10分钟预警
+        scheduledTaskService.start("0 */10 * * * ?", this);
     }
 
     @Override
@@ -180,6 +188,37 @@ public class MonitorServiceImpl implements MonitorService {
         return vo;
     }
 
+    @Override
+    public void run() {
+        // 预警:驱动出现失败记录,发送通知消息
+        List<Meta> metaAll = monitor.getMetaAll();
+        if (CollectionUtils.isEmpty(metaAll)) {
+            return;
+        }
+
+        StringBuilder content = new StringBuilder();
+        metaAll.forEach(meta -> {
+            // 有失败记录
+            if (meta.getFail().get() > 0) {
+                Mapping mapping = monitor.getMapping(meta.getMappingId());
+                if (null != mapping) {
+                    ModelEnum modelEnum = ModelEnum.getModelEnum(mapping.getModel());
+                    content.append("<p>");
+                    content.append(String.format("%s(%s) 失败:%s, 成功:%s", mapping.getName(), modelEnum.getName(), meta.getFail(), meta.getSuccess()));
+                    if (ModelEnum.FULL == modelEnum) {
+                        content.append(String.format(", 总数:%s", meta.getTotal()));
+                    }
+                    content.append("<p>");
+                }
+            }
+        });
+
+        String msg = content.toString();
+        if (StringUtil.isNotBlank(msg)) {
+            sendNotifyMessage("同步失败", msg);
+        }
+    }
+
     private MetaVo convertMeta2Vo(Meta meta) {
         Mapping mapping = monitor.getMapping(meta.getMappingId());
         Assert.notNull(mapping, "驱动不存在.");
@@ -221,4 +260,5 @@ public class MonitorServiceImpl implements MonitorService {
             return vo;
         }).collect(Collectors.toList());
     }
+
 }

+ 3 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/AbstractPuller.java

@@ -2,12 +2,13 @@ package org.dbsyncer.manager.puller;
 
 import org.dbsyncer.common.event.ClosedEvent;
 import org.dbsyncer.manager.Puller;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
 
+import javax.annotation.Resource;
+
 public abstract class AbstractPuller implements Puller {
 
-    @Autowired
+    @Resource
     private ApplicationContext applicationContext;
 
     protected void publishClosedEvent(String metaId) {

+ 7 - 7
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -13,11 +13,11 @@ import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
+import javax.annotation.Resource;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
@@ -37,20 +37,20 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    @Autowired
+    @Resource
     private Parser parser;
 
-    @Autowired
+    @Resource
     private Manager manager;
 
-    @Autowired
+    @Resource
     private LogService logService;
 
     private Map<String, Task> map = new ConcurrentHashMap<>();
 
     @Override
     public void start(Mapping mapping) {
-        Thread worker = new Thread(()->{
+        Thread worker = new Thread(() -> {
             final String metaId = mapping.getMetaId();
             try {
                 List<TableGroup> list = manager.getSortedTableGroupAll(mapping.getId());
@@ -105,7 +105,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         flush(task);
 
         int i = task.getTableGroupIndex();
-        while (i < list.size()){
+        while (i < list.size()) {
             parser.execute(task, mapping, list.get(i), executorService);
             if (!task.isRunning()) {
                 break;
@@ -128,7 +128,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
 
         // 全量的过程中,有新数据则更新总数
         long finished = meta.getSuccess().get() + meta.getFail().get();
-        if(meta.getTotal().get() < finished){
+        if (meta.getTotal().get() < finished) {
             meta.getTotal().set(finished);
         }
 

+ 10 - 10
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -30,12 +30,12 @@ import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDateTime;
@@ -63,26 +63,26 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    @Autowired
+    @Resource
     private Parser parser;
 
-    @Autowired
+    @Resource
     private Listener listener;
 
-    @Autowired
+    @Resource
     private Manager manager;
 
-    @Autowired
+    @Resource
     private LogService logService;
 
-    @Autowired
+    @Resource
     private ScheduledTaskService scheduledTaskService;
 
-    @Autowired
+    @Resource
     private ConnectorFactory connectorFactory;
 
     @Qualifier("taskExecutor")
-    @Autowired
+    @Resource
     private Executor taskExecutor;
 
     private Map<String, Extractor> map = new ConcurrentHashMap<>();
@@ -104,7 +104,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         Meta meta = manager.getMeta(metaId);
         Assert.notNull(meta, "Meta不能为空.");
 
-        Thread worker = new Thread(()->{
+        Thread worker = new Thread(() -> {
             try {
                 long now = Instant.now().toEpochMilli();
                 meta.setBeginTime(now);
@@ -195,7 +195,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
             // 30s内更新,执行写入
             Meta meta = manager.getMeta(metaId);
             LocalDateTime lastSeconds = LocalDateTime.now().minusSeconds(FLUSH_DELAYED_SECONDS);
-            if(meta.getUpdateTime() > Timestamp.valueOf(lastSeconds).getTime()){
+            if (meta.getUpdateTime() > Timestamp.valueOf(lastSeconds).getTime()) {
                 if (!CollectionUtils.isEmpty(snapshot)) {
                     logger.debug("{}", snapshot);
                 }

+ 17 - 1
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/notify/MailNotifyService.java

@@ -75,7 +75,7 @@ public class MailNotifyService implements NotifyService {
             checkMail(notifyMessage);
             // 统一应用标题
             String title = String.format("【%s通知】%s", appConfig.getName(), notifyMessage.getTitle());
-            String content = notifyMessage.getContent();
+            String content = createTemplate(appConfig.getName(), notifyMessage.getContent());
 
             // 创建邮件消息
             MimeMessage message = new MimeMessage(session);
@@ -104,6 +104,22 @@ public class MailNotifyService implements NotifyService {
         }
     }
 
+    private String createTemplate(String appName, String content) {
+        String temp = "<!DOCTYPE html>\n" +
+                "<html lang=\"en\">\n" +
+                "<meta charset=\"UTF-8\">\n" +
+                "<title>${appName}通知</title>\n" +
+                "</head>\n" +
+                "<body>\n" +
+                "${content}\n" +
+                "<p><a href=\"http://gitee.com/ghi/dbsyncer\">访问项目</a></p>\n" +
+                "</body>\n" +
+                "</html>";
+        String replace = StringUtil.replace(temp, "${appName}", appName);
+        replace = StringUtil.replace(replace, "${content}", content);
+        return replace;
+    }
+
     private void checkMail(NotifyMessage notifyMessage) {
         Assert.notNull(notifyMessage, "通知请求不能为空");
         Assert.notNull(notifyMessage.getTitle(), "邮件主题不能为空");

+ 3 - 3
dbsyncer-web/src/main/resources/application.properties

@@ -20,9 +20,9 @@ dbsyncer.parser.flush.buffer.actuator.queue-capacity=100000
 dbsyncer.parser.flush.buffer.actuator.period-millisecond=300
 
 #plugin
-#dbsyncer.plugin.notify.mail.enabled=true
-#dbsyncer.plugin.notify.mail.username=your mail username
-#dbsyncer.plugin.notify.mail.password=your mail authorization code
+dbsyncer.plugin.notify.mail.enabled=false
+dbsyncer.plugin.notify.mail.username=your mail username
+dbsyncer.plugin.notify.mail.password=your mail authorization code
 
 #storage
 dbsyncer.storage.binlog.recorder.batch-count=3000