Quellcode durchsuchen

新增同步数据开关到系统配置页面

AE86 vor 1 Jahr
Ursprung
Commit
44143847d3

+ 3 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/system/SystemConfigChecker.java

@@ -49,6 +49,9 @@ public class SystemConfigChecker extends AbstractChecker {
         logger.info("params:{}", params);
         Assert.notEmpty(params, "Config check params is null.");
         params.put("enableCDN", StringUtil.isNotBlank(params.get("enableCDN")) ? "true" : "false");
+        params.put("enableStorageWriteFull", StringUtil.isNotBlank(params.get("enableStorageWriteFull")) ? "true" : "false");
+        params.put("enableStorageWriteSuccess", StringUtil.isNotBlank(params.get("enableStorageWriteSuccess")) ? "true" : "false");
+        params.put("enableStorageWriteFail", StringUtil.isNotBlank(params.get("enableStorageWriteFail")) ? "true" : "false");
 
         SystemConfig systemConfig = profileComponent.getSystemConfig();
         Assert.notNull(systemConfig, "配置文件为空.");

+ 3 - 38
dbsyncer-common/src/main/java/org/dbsyncer/common/config/StorageConfig.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.common.config;
 
 import org.dbsyncer.common.util.ThreadPoolUtil;
@@ -32,21 +35,6 @@ public class StorageConfig extends BufferActuatorConfig {
      */
     private int threadQueueCapacity = 500;
 
-    /**
-     * 是否记录同步成功数据
-     */
-    private boolean writeSuccess;
-
-    /**
-     * 是否记录同步失败数据
-     */
-    private boolean writeFail;
-
-    /**
-     * 最大记录异常信息长度
-     */
-    private int maxErrorLength;
-
     @Bean(name = "storageExecutor", destroyMethod = "shutdown")
     public ThreadPoolTaskExecutor storageExecutor() {
         return ThreadPoolUtil.newThreadPoolTaskExecutor(threadCoreSize, maxThreadSize, threadQueueCapacity, 30, "StorageExecutor-");
@@ -76,27 +64,4 @@ public class StorageConfig extends BufferActuatorConfig {
         this.threadQueueCapacity = threadQueueCapacity;
     }
 
-    public boolean isWriteSuccess() {
-        return writeSuccess;
-    }
-
-    public void setWriteSuccess(boolean writeSuccess) {
-        this.writeSuccess = writeSuccess;
-    }
-
-    public boolean isWriteFail() {
-        return writeFail;
-    }
-
-    public void setWriteFail(boolean writeFail) {
-        this.writeFail = writeFail;
-    }
-
-    public int getMaxErrorLength() {
-        return maxErrorLength;
-    }
-
-    public void setMaxErrorLength(int maxErrorLength) {
-        this.maxErrorLength = maxErrorLength;
-    }
 }

+ 0 - 23
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserStrategyConfiguration.java

@@ -1,23 +0,0 @@
-package org.dbsyncer.parser;
-
-import org.dbsyncer.parser.strategy.FlushStrategy;
-import org.dbsyncer.parser.strategy.impl.DisableFullFlushStrategy;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2021/11/18 21:36
- */
-@Configuration
-public class ParserStrategyConfiguration {
-
-    @Bean
-    @ConditionalOnMissingBean
-    public FlushStrategy flushStrategy() {
-        return new DisableFullFlushStrategy();
-    }
-
-}

+ 13 - 12
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -1,13 +1,17 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.parser.flush.impl;
 
-import org.dbsyncer.common.config.StorageConfig;
 import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.model.StorageRequest;
+import org.dbsyncer.parser.model.SystemConfig;
+import org.dbsyncer.sdk.constant.ConfigConstant;
 import org.dbsyncer.sdk.enums.StorageEnum;
 import org.dbsyncer.sdk.spi.StorageService;
-import org.dbsyncer.sdk.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.dbsyncer.storage.impl.SnowflakeIdWorker;
 import org.dbsyncer.storage.util.BinlogMessageUtil;
@@ -46,10 +50,10 @@ public class FlushServiceImpl implements FlushService {
     private BufferActuator storageBufferActuator;
 
     @Resource
-    private StorageConfig storageConfig;
+    private Executor storageExecutor;
 
     @Resource
-    private Executor storageExecutor;
+    private ProfileComponent profileComponent;
 
     @Override
     public void asyncWrite(String type, String error) {
@@ -57,7 +61,7 @@ public class FlushServiceImpl implements FlushService {
             Map<String, Object> params = new HashMap();
             params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
             params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
-            params.put(ConfigConstant.CONFIG_MODEL_JSON, substring(error));
+            params.put(ConfigConstant.CONFIG_MODEL_JSON, StringUtil.substring(error, 0, getSystemConfig().getMaxStorageErrorLength()));
             params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
             storageService.add(StorageEnum.LOG, params);
         });
@@ -73,7 +77,7 @@ public class FlushServiceImpl implements FlushService {
             row.put(ConfigConstant.DATA_TABLE_GROUP_ID, tableGroupId);
             row.put(ConfigConstant.DATA_TARGET_TABLE_NAME, targetTableGroupName);
             row.put(ConfigConstant.DATA_EVENT, event);
-            row.put(ConfigConstant.DATA_ERROR, substring(error));
+            row.put(ConfigConstant.DATA_ERROR, error);
             row.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
             try {
                 byte[] bytes = BinlogMessageUtil.toBinlogMap(r).toByteArray();
@@ -81,19 +85,16 @@ public class FlushServiceImpl implements FlushService {
             } catch (Exception e) {
                 logger.warn("可能存在Blob或inputStream大文件类型, 无法序列化:{}", r);
             }
-
             storageBufferActuator.offer(new StorageRequest(metaId, row));
         });
     }
 
     /**
-     * 限制记录异常信息长度
+     * TODO 加缓存过期
      *
-     * @param error
      * @return
      */
-    private String substring(String error) {
-        return StringUtil.substring(error, 0, storageConfig.getMaxErrorLength());
+    private SystemConfig getSystemConfig() {
+        return profileComponent.getSystemConfig();
     }
-
 }

+ 45 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushStrategyImpl.java

@@ -1,12 +1,22 @@
-package org.dbsyncer.parser.flush;
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
+package org.dbsyncer.parser.flush.impl;
 
-import org.dbsyncer.common.config.StorageConfig;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.CacheService;
+import org.dbsyncer.parser.LogService;
+import org.dbsyncer.parser.LogType;
+import org.dbsyncer.parser.ProfileComponent;
+import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.parser.model.SystemConfig;
 import org.dbsyncer.parser.strategy.FlushStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import javax.annotation.Resource;
@@ -17,7 +27,10 @@ import java.time.Instant;
  * @version 1.0.0
  * @date 2021/11/18 22:22
  */
-public abstract class AbstractFlushStrategy implements FlushStrategy {
+@Component
+public final class FlushStrategyImpl implements FlushStrategy {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Resource
     private FlushService flushService;
@@ -26,10 +39,26 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     private CacheService cacheService;
 
     @Resource
-    private StorageConfig storageConfig;
+    private ProfileComponent profileComponent;
+
+    @Resource
+    private LogService logService;
 
     @Override
     public void flushFullData(String metaId, Result result, String event) {
+        // 不记录全量数据, 只记录增量同步数据, 将异常记录到系统日志中
+        if (!getSystemConfig().isEnableStorageWriteFull()) {
+            // 不记录全量数据,只统计成功失败总数
+            refreshTotal(metaId, result);
+
+            if (!CollectionUtils.isEmpty(result.getFailData())) {
+                logger.error(result.getError().toString());
+                LogType logType = LogType.TableGroupLog.FULL_FAILED;
+                logService.log(logType, "%s:%s:%s", result.getTargetTableGroupName(), logType.getMessage(), result.getError().toString());
+            }
+            return;
+        }
+
         flush(metaId, result, event);
     }
 
@@ -52,15 +81,24 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
         refreshTotal(metaId, result);
 
         // 是否写失败数据
-        if (storageConfig.isWriteFail() && !CollectionUtils.isEmpty(result.getFailData())) {
-            final String error = StringUtil.substring(result.getError().toString(), 0, storageConfig.getMaxErrorLength());
+        if (getSystemConfig().isEnableStorageWriteFail() && !CollectionUtils.isEmpty(result.getFailData())) {
+            final String error = StringUtil.substring(result.getError().toString(), 0, getSystemConfig().getMaxStorageErrorLength());
             flushService.asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, false, result.getFailData(), error);
         }
 
         // 是否写成功数据
-        if (storageConfig.isWriteSuccess() && !CollectionUtils.isEmpty(result.getSuccessData())) {
+        if (getSystemConfig().isEnableStorageWriteSuccess() && !CollectionUtils.isEmpty(result.getSuccessData())) {
             flushService.asyncWrite(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, true, result.getSuccessData(), "");
         }
     }
 
+    /**
+     * TODO 加缓存过期
+     *
+     * @return
+     */
+    private SystemConfig getSystemConfig() {
+        return profileComponent.getSystemConfig();
+    }
+
 }

+ 3 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.common.QueueOverflowException;

+ 56 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/SystemConfig.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
+ */
 package org.dbsyncer.parser.model;
 
 import org.dbsyncer.sdk.constant.ConfigConstant;
@@ -31,10 +34,30 @@ public class SystemConfig extends ConfigModel {
     private int refreshIntervalSeconds = 5;
 
     /**
-     * 是否启用CDN加速访问静态资源(false-禁用;true-启动)
+     * 是否启用CDN加速访问静态资源(false-关闭; true-开启)
      */
     private boolean enableCDN;
 
+    /**
+     * 是否记录全量数据(false-关闭; true-开启)
+     */
+    private boolean enableStorageWriteFull;
+
+    /**
+     * 是否记录同步成功数据(false-关闭; true-开启)
+     */
+    private boolean enableStorageWriteSuccess;
+
+    /**
+     * 是否记录同步失败数据(false-关闭; true-开启)
+     */
+    private boolean enableStorageWriteFail = true;
+
+    /**
+     * 记录同步失败日志最大长度
+     */
+    private int maxStorageErrorLength = 2048;
+
     public int getExpireDataDays() {
         return expireDataDays;
     }
@@ -66,4 +89,36 @@ public class SystemConfig extends ConfigModel {
     public void setEnableCDN(boolean enableCDN) {
         this.enableCDN = enableCDN;
     }
+
+    public boolean isEnableStorageWriteFull() {
+        return enableStorageWriteFull;
+    }
+
+    public void setEnableStorageWriteFull(boolean enableStorageWriteFull) {
+        this.enableStorageWriteFull = enableStorageWriteFull;
+    }
+
+    public boolean isEnableStorageWriteSuccess() {
+        return enableStorageWriteSuccess;
+    }
+
+    public void setEnableStorageWriteSuccess(boolean enableStorageWriteSuccess) {
+        this.enableStorageWriteSuccess = enableStorageWriteSuccess;
+    }
+
+    public boolean isEnableStorageWriteFail() {
+        return enableStorageWriteFail;
+    }
+
+    public void setEnableStorageWriteFail(boolean enableStorageWriteFail) {
+        this.enableStorageWriteFail = enableStorageWriteFail;
+    }
+
+    public int getMaxStorageErrorLength() {
+        return maxStorageErrorLength;
+    }
+
+    public void setMaxStorageErrorLength(int maxStorageErrorLength) {
+        this.maxStorageErrorLength = maxStorageErrorLength;
+    }
 }

+ 0 - 39
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableFullFlushStrategy.java

@@ -1,39 +0,0 @@
-package org.dbsyncer.parser.strategy.impl;
-
-import org.dbsyncer.common.model.Result;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.parser.flush.AbstractFlushStrategy;
-import org.dbsyncer.parser.LogService;
-import org.dbsyncer.parser.LogType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Resource;
-
-/**
- * 不记录全量数据, 只记录增量同步数据, 将异常记录到系统日志中
- *
- * @author AE86
- * @version 1.0.0
- * @date 2021/11/18 21:49
- */
-public final class DisableFullFlushStrategy extends AbstractFlushStrategy {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    @Resource
-    private LogService logService;
-
-    @Override
-    public void flushFullData(String metaId, Result result, String event) {
-        // 不记录全量数据,只统计成功失败总数
-        refreshTotal(metaId, result);
-
-        if (!CollectionUtils.isEmpty(result.getFailData())) {
-            logger.error(result.getError().toString());
-            LogType logType = LogType.TableGroupLog.FULL_FAILED;
-            logService.log(logType, "%s:%s:%s", result.getTargetTableGroupName(), logType.getMessage(), result.getError().toString());
-        }
-    }
-
-}

+ 0 - 18
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableFlushStrategy.java

@@ -1,18 +0,0 @@
-package org.dbsyncer.parser.strategy.impl;
-
-import org.dbsyncer.parser.flush.AbstractFlushStrategy;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Component;
-
-/**
- * 记录全量和增量同步数据
- *
- * @author AE86
- * @version 1.0.0
- * @date 2021/11/18 22:21
- */
-@Component
-@ConditionalOnProperty(value = "dbsyncer.storage.write.full.enabled", havingValue = "true")
-public final class EnableFlushStrategy extends AbstractFlushStrategy {
-
-}

+ 0 - 8
dbsyncer-web/src/main/resources/application.properties

@@ -68,14 +68,6 @@ dbsyncer.storage.buffer-pull-count=20000
 dbsyncer.storage.buffer-queue-capacity=100000
 # [StorageBufferActuator]定时消费缓存队列间隔(毫秒)
 dbsyncer.storage.buffer-period-millisecond=300
-# 是否记录全量数据(false-关闭; true-开启)
-dbsyncer.storage.write.full.enabled=false
-# 是否记录同步成功数据(false-关闭; true-开启)
-dbsyncer.storage.write-success=true
-# 是否记录同步失败数据(false-关闭; true-开启)
-dbsyncer.storage.write-fail=true
-# 记录同步失败日志最大长度
-dbsyncer.storage.max-error-length=2048
 
 #plugin
 # 是否开启邮箱通知功能(false-关闭; true-开启)

+ 25 - 1
dbsyncer-web/src/main/resources/public/system/system.html

@@ -33,10 +33,34 @@
                         <input type="number" class="form-control" min="1" max="60" dbsyncer-valid="require" name="refreshIntervalSeconds" th:value="${config?.refreshIntervalSeconds}"/>
                     </div>
                 </div>
+                <div class="form-group">
+                    <label class="col-sm-4 control-label">记录同步成功数据</label>
+                    <div class="col-sm-8">
+                        <input class="systemConfigSwitch" name="enableStorageWriteSuccess" th:checked="${config?.enableStorageWriteSuccess}" type="checkbox" />
+                    </div>
+                </div>
+                <div class="form-group">
+                    <label class="col-sm-4 control-label">记录同步失败数据</label>
+                    <div class="col-sm-8">
+                        <input class="systemConfigSwitch" name="enableStorageWriteFail" th:checked="${config?.enableStorageWriteFail}" type="checkbox" />
+                    </div>
+                </div>
+                <div class="form-group">
+                    <label class="col-sm-4 control-label">记录同步失败日志长度 <strong class="driverVerifcateRequired">*</strong></label>
+                    <div class="col-sm-8">
+                        <input type="number" class="form-control" min="1024" max="8192" dbsyncer-valid="require" name="maxStorageErrorLength" th:value="${config?.maxStorageErrorLength}"/>
+                    </div>
+                </div>
+                <div class="form-group">
+                    <label class="col-sm-4 control-label">记录全量数据 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="不推荐在生产环境下开启,可在源库数据量较少时使用,一般用于测试"></i></label>
+                    <div class="col-sm-8">
+                        <input class="systemConfigSwitch" name="enableStorageWriteFull" th:checked="${config?.enableStorageWriteFull}" type="checkbox" />
+                    </div>
+                </div>
                 <div class="form-group">
                     <label class="col-sm-4 control-label">CDN静态资源</label>
                     <div class="col-sm-8">
-                        <input id="enableCDNSwitch" name="enableCDN" th:checked="${config?.enableCDN}" type="checkbox" />
+                        <input class="systemConfigSwitch" name="enableCDN" th:checked="${config?.enableCDN}" type="checkbox" />
                     </div>
                 </div>
                 <div class="form-group">

+ 1 - 1
dbsyncer-web/src/main/resources/static/js/system/index.js

@@ -10,7 +10,7 @@ function submit(data) {
 }
 
 $(function () {
-    $('#enableCDNSwitch').bootstrapSwitch({
+    $('.systemConfigSwitch').bootstrapSwitch({
         onText: "Yes",
         offText: "No",
         onColor: "success",