Explorar el Código

新增可配置记录全量数据日志开关

AE86 hace 3 años
padre
commit
d7e4f06197

+ 6 - 21
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -21,6 +21,7 @@ import org.dbsyncer.parser.enums.ParserEnum;
 import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
 import org.dbsyncer.plugin.PluginFactory;
@@ -65,6 +66,9 @@ public class ParserFactory implements Parser {
     @Autowired
     @Autowired
     private FlushService flushService;
     private FlushService flushService;
 
 
+    @Autowired
+    private FlushStrategy flushStrategy;
+
     @Autowired
     @Autowired
     @Qualifier("taskExecutor")
     @Qualifier("taskExecutor")
     private Executor taskExecutor;
     private Executor taskExecutor;
@@ -322,7 +326,7 @@ public class ParserFactory implements Parser {
         Result writer = connectorFactory.writer(tConnectorMapper, new WriterSingleConfig(picker.getTargetFields(), tableGroup.getCommand(), event, target, rowChangedEvent.getTableName(), rowChangedEvent.isForceUpdate()));
         Result writer = connectorFactory.writer(tConnectorMapper, new WriterSingleConfig(picker.getTargetFields(), tableGroup.getCommand(), event, target, rowChangedEvent.getTableName(), rowChangedEvent.isForceUpdate()));
 
 
         // 5、更新结果
         // 5、更新结果
-        flush(metaId, writer, event, picker.getTargetMapList());
+        flushStrategy.flushIncrementData(metaId, writer, event, picker.getTargetMapList());
     }
     }
 
 
     /**
     /**
@@ -333,32 +337,13 @@ public class ParserFactory implements Parser {
      * @param data
      * @param data
      */
      */
     private void flush(Task task, Result writer, List<Map> data) {
     private void flush(Task task, Result writer, List<Map> data) {
-        flush(task.getId(), writer, ConnectorConstant.OPERTION_INSERT, data);
+        flushStrategy.flushFullData(task.getId(), writer, ConnectorConstant.OPERTION_INSERT, data);
 
 
         // 发布刷新事件给FullExtractor
         // 发布刷新事件给FullExtractor
         task.setEndTime(Instant.now().toEpochMilli());
         task.setEndTime(Instant.now().toEpochMilli());
         applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
         applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
     }
     }
 
 
-    private void flush(String metaId, Result writer, String event, List<Map> data) {
-        // 引用传递
-        long total = data.size();
-        long fail = writer.getFail().get();
-        Meta meta = getMeta(metaId);
-        meta.getFail().getAndAdd(fail);
-        meta.getSuccess().getAndAdd(total - fail);
-
-        // 记录错误数据
-        Queue<Map> failData = writer.getFailData();
-        boolean success = CollectionUtils.isEmpty(failData);
-        if (!success) {
-            data.clear();
-            data.addAll(failData);
-        }
-        String error = writer.getError().toString();
-        flushService.asyncWrite(metaId, event, success, data, error);
-    }
-
     /**
     /**
      * 获取Meta(注: 没有bean拷贝, 便于直接更新缓存)
      * 获取Meta(注: 没有bean拷贝, 便于直接更新缓存)
      *
      *

+ 23 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/config/ParserFlushStrategyConfiguration.java

@@ -0,0 +1,23 @@
+package org.dbsyncer.parser.config;
+
+import org.dbsyncer.parser.strategy.impl.DisableFullFlushStrategy;
+import org.dbsyncer.parser.strategy.FlushStrategy;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2021/11/18 21:36
+ */
+@Configuration
+public class ParserFlushStrategyConfiguration {
+
+    @Bean
+    @ConditionalOnMissingBean
+    public FlushStrategy flushStrategy() {
+        return new DisableFullFlushStrategy();
+    }
+
+}

+ 64 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/AbstractFlushStrategy.java

@@ -0,0 +1,64 @@
+package org.dbsyncer.parser.strategy;
+
+import org.dbsyncer.cache.CacheService;
+import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.parser.flush.FlushService;
+import org.dbsyncer.parser.model.Meta;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.util.Assert;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2021/11/18 22:22
+ */
+public abstract class AbstractFlushStrategy implements FlushStrategy {
+
+    @Autowired
+    private FlushService flushService;
+
+    @Autowired
+    private CacheService cacheService;
+
+    @Override
+    public void flushFullData(String metaId, Result writer, String event, List<Map> data) {
+        flush(metaId, writer, event, data);
+    }
+
+    @Override
+    public void flushIncrementData(String metaId, Result writer, String event, List<Map> data) {
+        flush(metaId, writer, event, data);
+    }
+
+    protected void flush(String metaId, Result writer, String event, List<Map> data) {
+        // 引用传递
+        long total = data.size();
+        long fail = writer.getFail().get();
+        Meta meta = getMeta(metaId);
+        meta.getFail().getAndAdd(fail);
+        meta.getSuccess().getAndAdd(total - fail);
+
+        // 记录错误数据
+        Queue<Map> failData = writer.getFailData();
+        boolean success = CollectionUtils.isEmpty(failData);
+        if (!success) {
+            data.clear();
+            data.addAll(failData);
+        }
+        String error = writer.getError().toString();
+        flushService.asyncWrite(metaId, event, success, data, error);
+    }
+
+    protected Meta getMeta(String metaId) {
+        Assert.hasText(metaId, "Meta id can not be empty.");
+        Meta meta = cacheService.get(metaId, Meta.class);
+        Assert.notNull(meta, "Meta can not be null.");
+        return meta;
+    }
+
+}

+ 37 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/FlushStrategy.java

@@ -0,0 +1,37 @@
+package org.dbsyncer.parser.strategy;
+
+import org.dbsyncer.common.model.Result;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 记录同步数据策略
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/9/29 22:38
+ */
+public interface FlushStrategy {
+
+    /**
+     * 记录全量同步数据
+     *
+     * @param metaId
+     * @param writer
+     * @param event
+     * @param data
+     */
+    void flushFullData(String metaId, Result writer, String event, List<Map> data);
+
+    /**
+     * 记录增量同步数据
+     *
+     * @param metaId
+     * @param writer
+     * @param event
+     * @param data
+     */
+    void flushIncrementData(String metaId, Result writer, String event, List<Map> data);
+
+}

+ 23 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableFullFlushStrategy.java

@@ -0,0 +1,23 @@
+package org.dbsyncer.parser.strategy.impl;
+
+import org.dbsyncer.common.model.Result;
+import org.dbsyncer.parser.strategy.AbstractFlushStrategy;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 不记录全量数据, 只记录增量同步数据
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2021/11/18 21:49
+ */
+public final class DisableFullFlushStrategy extends AbstractFlushStrategy {
+
+    @Override
+    public void flushFullData(String metaId, Result writer, String event, List<Map> data) {
+        // 不记录全量数据
+    }
+
+}

+ 19 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableFlushStrategy.java

@@ -0,0 +1,19 @@
+package org.dbsyncer.parser.strategy.impl;
+
+import org.dbsyncer.parser.strategy.AbstractFlushStrategy;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+/**
+ * 记录全量和增量同步数据
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2021/11/18 22:21
+ */
+@Component
+@ConditionalOnProperty(prefix = "dbsyncer.parser.flush", value = "enabled", havingValue = "true",
+        matchIfMissing = false)
+public final class EnableFlushStrategy extends AbstractFlushStrategy {
+
+}

+ 3 - 0
dbsyncer-web/src/main/resources/application.properties

@@ -17,6 +17,9 @@ dbsyncer.storage.support.disk=true
 #dbsyncer.storage.support.mysql.config.username=root
 #dbsyncer.storage.support.mysql.config.username=root
 #dbsyncer.storage.support.mysql.config.password=123
 #dbsyncer.storage.support.mysql.config.password=123
 
 
+#parser
+#dbsyncer.parser.flush.enabled=true
+
 #monitor
 #monitor
 management.endpoints.web.base-path=/app
 management.endpoints.web.base-path=/app
 management.endpoints.web.exposure.include=*
 management.endpoints.web.exposure.include=*