瀏覽代碼

修复全量同步未显示统计数

AE86 3 年之前
父節點
當前提交
e94719586e

+ 0 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -136,7 +136,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             });
             });
         } catch (Exception e) {
         } catch (Exception e) {
             // 记录错误数据
             // 记录错误数据
-            result.getFailData().addAll(data);
             result.getFail().set(size);
             result.getFail().set(size);
             result.getError().append(e.getMessage()).append(System.lineSeparator());
             result.getError().append(e.getMessage()).append(System.lineSeparator());
             logger.error(e.getMessage());
             logger.error(e.getMessage());

+ 10 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -18,7 +18,7 @@ import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.listener.enums.QuartzFilterEnum;
 import org.dbsyncer.listener.enums.QuartzFilterEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.enums.ParserEnum;
 import org.dbsyncer.parser.enums.ParserEnum;
-import org.dbsyncer.parser.flush.FlushService;
+import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.strategy.FlushStrategy;
@@ -64,7 +64,7 @@ public class ParserFactory implements Parser {
     private CacheService cacheService;
     private CacheService cacheService;
 
 
     @Autowired
     @Autowired
-    private FlushService flushService;
+    private LogService logService;
 
 
     @Autowired
     @Autowired
     private FlushStrategy flushStrategy;
     private FlushStrategy flushStrategy;
@@ -93,16 +93,16 @@ public class ParserFactory implements Parser {
             alive = connectorFactory.isAlive(config);
             alive = connectorFactory.isAlive(config);
         } catch (Exception e) {
         } catch (Exception e) {
             LogType.ConnectorLog logType = LogType.ConnectorLog.FAILED;
             LogType.ConnectorLog logType = LogType.ConnectorLog.FAILED;
-            flushService.asyncWrite(logType.getType(), String.format("%s%s", logType.getName(), e.getMessage()));
+            logService.log(logType, "%s%s", logType.getName(), e.getMessage());
         }
         }
         // 断线重连
         // 断线重连
-        if(!alive){
+        if (!alive) {
             try {
             try {
                 alive = connectorFactory.refresh(config);
                 alive = connectorFactory.refresh(config);
             } catch (Exception e) {
             } catch (Exception e) {
                 // nothing to do
                 // nothing to do
             }
             }
-            if(alive){
+            if (alive) {
                 logger.info(LogType.ConnectorLog.RECONNECT_SUCCESS.getMessage());
                 logger.info(LogType.ConnectorLog.RECONNECT_SUCCESS.getMessage());
             }
             }
         }
         }
@@ -119,9 +119,9 @@ public class ParserFactory implements Parser {
         Connector connector = getConnector(connectorId);
         Connector connector = getConnector(connectorId);
         ConnectorMapper connectorMapper = connectorFactory.connect(connector.getConfig());
         ConnectorMapper connectorMapper = connectorFactory.connect(connector.getConfig());
         MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableName);
         MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableName);
-        if(!CollectionUtils.isEmpty(connector.getTable())){
-            for(Table t :connector.getTable()){
-                if(t.getName().equals(tableName)){
+        if (!CollectionUtils.isEmpty(connector.getTable())) {
+            for (Table t : connector.getTable()) {
+                if (t.getName().equals(tableName)) {
                     metaInfo.setTableType(t.getType());
                     metaInfo.setTableType(t.getType());
                     break;
                     break;
                 }
                 }
@@ -178,12 +178,12 @@ public class ParserFactory implements Parser {
             List<Table> tableList = new ArrayList<>();
             List<Table> tableList = new ArrayList<>();
             boolean exist = false;
             boolean exist = false;
             for (int i = 0; i < table.length(); i++) {
             for (int i = 0; i < table.length(); i++) {
-                if(table.get(i) instanceof String){
+                if (table.get(i) instanceof String) {
                     tableList.add(new Table(table.getString(i)));
                     tableList.add(new Table(table.getString(i)));
                     exist = true;
                     exist = true;
                 }
                 }
             }
             }
-            if(!exist){
+            if (!exist) {
                 tableList = JsonUtil.jsonToArray(table.toString(), Table.class);
                 tableList = JsonUtil.jsonToArray(table.toString(), Table.class);
             }
             }
             connector.setTable(tableList);
             connector.setTable(tableList);

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/config/ParserFlushStrategyConfiguration.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.parser.config;
 package org.dbsyncer.parser.config;
 
 
-import org.dbsyncer.parser.strategy.impl.DisableFullFlushStrategy;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.strategy.FlushStrategy;
+import org.dbsyncer.parser.strategy.impl.DisableFullFlushStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Configuration;

+ 2 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/logger/LogType.java

@@ -147,7 +147,8 @@ public interface LogType {
         INSERT("40", "新增"),
         INSERT("40", "新增"),
         UPDATE("41", "修改"),
         UPDATE("41", "修改"),
         DELETE("42", "删除"),
         DELETE("42", "删除"),
-        INCREMENT_FAILED("43", "增量同步异常");
+        INCREMENT_FAILED("43", "增量同步异常"),
+        FULL_FAILED("44", "全量同步异常");
 
 
         private String type;
         private String type;
         private String message;
         private String message;

+ 12 - 15
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/AbstractFlushStrategy.java

@@ -2,7 +2,6 @@ package org.dbsyncer.parser.strategy;
 
 
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Result;
-import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.Meta;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -10,7 +9,6 @@ import org.springframework.util.Assert;
 
 
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.Queue;
 
 
 /**
 /**
  * @author AE86
  * @author AE86
@@ -36,22 +34,21 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     }
     }
 
 
     protected void flush(String metaId, Result writer, String event, List<Map> data) {
     protected void flush(String metaId, Result writer, String event, List<Map> data) {
-        // 引用传递
-        long total = data.size();
-        long fail = writer.getFail().get();
-        Meta meta = getMeta(metaId);
-        meta.getFail().getAndAdd(fail);
-        meta.getSuccess().getAndAdd(total - fail);
+        refreshTotal(metaId, writer, data);
 
 
-        // 记录错误数据
-        Queue<Map> failData = writer.getFailData();
-        boolean success = CollectionUtils.isEmpty(failData);
-        if (!success) {
+        boolean fail = 0 < writer.getFail().get();
+        if (fail) {
             data.clear();
             data.clear();
-            data.addAll(failData);
+            data.addAll(writer.getFailData());
         }
         }
-        String error = writer.getError().toString();
-        flushService.asyncWrite(metaId, event, success, data, error);
+        flushService.asyncWrite(metaId, event, fail, data, writer.getError().toString());
+    }
+
+    protected void refreshTotal(String metaId, Result writer, List<Map> data){
+        long fail = writer.getFail().get();
+        Meta meta = getMeta(metaId);
+        meta.getFail().getAndAdd(fail);
+        meta.getSuccess().getAndAdd(data.size() - fail);
     }
     }
 
 
     protected Meta getMeta(String metaId) {
     protected Meta getMeta(String metaId) {

+ 14 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableFullFlushStrategy.java

@@ -1,13 +1,16 @@
 package org.dbsyncer.parser.strategy.impl;
 package org.dbsyncer.parser.strategy.impl;
 
 
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.parser.logger.LogService;
+import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.strategy.AbstractFlushStrategy;
 import org.dbsyncer.parser.strategy.AbstractFlushStrategy;
+import org.springframework.beans.factory.annotation.Autowired;
 
 
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
 /**
 /**
- * 不记录全量数据, 只记录增量同步数据
+ * 不记录全量数据, 只记录增量同步数据, 将异常记录到系统日志中
  *
  *
  * @author AE86
  * @author AE86
  * @version 1.0.0
  * @version 1.0.0
@@ -15,9 +18,18 @@ import java.util.Map;
  */
  */
 public final class DisableFullFlushStrategy extends AbstractFlushStrategy {
 public final class DisableFullFlushStrategy extends AbstractFlushStrategy {
 
 
+    @Autowired
+    private LogService logService;
+
     @Override
     @Override
     public void flushFullData(String metaId, Result writer, String event, List<Map> data) {
     public void flushFullData(String metaId, Result writer, String event, List<Map> data) {
-        // 不记录全量数据
+        // 不记录全量数据,只统计成功失败总数
+        refreshTotal(metaId, writer, data);
+
+        if (0 < writer.getFail().get()) {
+            LogType logType = LogType.TableGroupLog.FULL_FAILED;
+            logService.log(logType, "%s:%s", logType.getMessage(), writer.getError().toString());
+        }
     }
     }
 
 
 }
 }