瀏覽代碼

增加配置项--仅更新(即只更新已存在的数据行,不存在的数据行可以设置不需要新增)

bble 1 年之前
父節點
當前提交
95c04cbfc4

+ 7 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/SystemConfigService.java

@@ -57,4 +57,11 @@ public interface SystemConfigService {
      * @return
      */
     boolean isEnableCDN();
+
+    /**
+     * 是否启用仅更新
+     *
+     * @return
+     */
+    boolean isEnableOnlyUpdate();
 }

+ 1 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/system/SystemConfigChecker.java

@@ -49,6 +49,7 @@ public class SystemConfigChecker extends AbstractChecker {
         logger.info("params:{}", params);
         Assert.notEmpty(params, "Config check params is null.");
         params.put("enableCDN", StringUtil.isNotBlank(params.get("enableCDN")) ? "true" : "false");
+        params.put("enableOnlyUpdate", StringUtil.isNotBlank(params.get("enableOnlyUpdate")) ? "true" : "false");
         params.put("enableStorageWriteFull", StringUtil.isNotBlank(params.get("enableStorageWriteFull")) ? "true" : "false");
         params.put("enableStorageWriteSuccess", StringUtil.isNotBlank(params.get("enableStorageWriteSuccess")) ? "true" : "false");
         params.put("enableStorageWriteFail", StringUtil.isNotBlank(params.get("enableStorageWriteFail")) ? "true" : "false");

+ 5 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/SystemConfigServiceImpl.java

@@ -103,6 +103,11 @@ public class SystemConfigServiceImpl implements SystemConfigService {
         return getSystemConfig().isEnableCDN();
     }
 
+    @Override
+    public boolean isEnableOnlyUpdate(){
+        return getSystemConfig().isEnableOnlyUpdate();
+    }
+
     private SystemConfig getSystemConfig() {
         SystemConfig config = profileComponent.getSystemConfig();
         if (null != config) {

+ 2 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -148,9 +148,10 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         final ConnectorInstance tConnectorInstance = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
         final IncrementPluginContext context = new IncrementPluginContext(sConnectorInstance, tConnectorInstance, sourceTableName, targetTableName, event, sourceDataList, targetDataList);
         pluginFactory.convert(group.getPlugin(), context);
+        boolean enableOnlyUpdate = profileComponent.getSystemConfig().isEnableOnlyUpdate();
 
         // 5、批量执行同步
-        BatchWriter batchWriter = new BatchWriter(tConnectorInstance, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, generalBufferConfig.getBufferWriterCount());
+        BatchWriter batchWriter = new BatchWriter(tConnectorInstance, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, generalBufferConfig.getBufferWriterCount(),enableOnlyUpdate);
         Result result = parserComponent.writeBatch(context, batchWriter, getExecutor());
 
         // 6.发布刷新增量点事件

+ 4 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/ParserComponentImpl.java

@@ -176,7 +176,8 @@ public class ParserComponentImpl implements ParserComponent {
             pluginFactory.convert(group.getPlugin(), context);
 
             // 5、写入目标源
-            BatchWriter batchWriter = new BatchWriter(tConnectorInstance, command, tTableName, event, picker.getTargetFields(), target, batchSize);
+            boolean enableOnlyUpdate = profileComponent.getSystemConfig().isEnableOnlyUpdate();
+            BatchWriter batchWriter = new BatchWriter(tConnectorInstance, command, tTableName, event, picker.getTargetFields(), target, batchSize,enableOnlyUpdate);
             Result result = writeBatch(context, batchWriter, executor);
 
             // 6、更新结果
@@ -216,7 +217,7 @@ public class ParserComponentImpl implements ParserComponent {
         int total = dataList.size();
         // 单次任务
         if (total <= batchSize) {
-            return connectorFactory.writer(batchWriter.getConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, dataList));
+            return connectorFactory.writer(batchWriter.getConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, dataList,batchWriter.isEnableOnlyUpdate()));
         }
 
         // 批量任务, 拆分
@@ -238,7 +239,7 @@ public class ParserComponentImpl implements ParserComponent {
 
             executor.execute(() -> {
                 try {
-                    Result w = connectorFactory.writer(batchWriter.getConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, data));
+                    Result w = connectorFactory.writer(batchWriter.getConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, data,batchWriter.isEnableOnlyUpdate()));
                     result.addSuccessData(w.getSuccessData());
                     result.addFailData(w.getFailData());
                     result.getError().append(w.getError());

+ 6 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/BatchWriter.java

@@ -15,9 +15,10 @@ public final class BatchWriter {
     private List<Field> fields;
     private List<Map> dataList;
     private int batchSize;
+    private boolean enableOnlyUpdate;
 
     public BatchWriter(ConnectorInstance connectorInstance, Map<String, String> command, String tableName, String event,
-                       List<Field> fields, List<Map> dataList, int batchSize) {
+                       List<Field> fields, List<Map> dataList, int batchSize,boolean enableOnlyUpdate) {
         this.connectorInstance = connectorInstance;
         this.command = command;
         this.tableName = tableName;
@@ -25,6 +26,7 @@ public final class BatchWriter {
         this.fields = fields;
         this.dataList = dataList;
         this.batchSize = batchSize;
+        this.enableOnlyUpdate=enableOnlyUpdate;
     }
 
     public ConnectorInstance getConnectorInstance() {
@@ -55,4 +57,7 @@ public final class BatchWriter {
         return batchSize;
     }
 
+    public boolean isEnableOnlyUpdate() {
+        return enableOnlyUpdate;
+    }
 }

+ 14 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/SystemConfig.java

@@ -1,6 +1,7 @@
 /**
  * DBSyncer Copyright 2020-2023 All Rights Reserved.
  */
+
 package org.dbsyncer.parser.model;
 
 import org.dbsyncer.sdk.constant.ConfigConstant;
@@ -38,6 +39,11 @@ public class SystemConfig extends ConfigModel {
      */
     private boolean enableCDN;
 
+    /**
+     * 是否启用仅更新
+     */
+    private boolean enableOnlyUpdate = false;
+
     /**
      * 是否记录全量数据(false-关闭; true-开启)
      */
@@ -90,6 +96,14 @@ public class SystemConfig extends ConfigModel {
         this.enableCDN = enableCDN;
     }
 
+    public boolean isEnableOnlyUpdate() {
+        return enableOnlyUpdate;
+    }
+
+    public void setEnableOnlyUpdate(boolean enableOnlyUpdate) {
+        this.enableOnlyUpdate = enableOnlyUpdate;
+    }
+
     public boolean isEnableStorageWriteFull() {
         return enableStorageWriteFull;
     }

+ 7 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/WriterBatchConfig.java

@@ -28,12 +28,15 @@ public class WriterBatchConfig {
      */
     private List<Map> data;
 
-    public WriterBatchConfig(String tableName, String event, Map<String, String> command, List<Field> fields, List<Map> data) {
+    private boolean enableOnlyUpdate;
+
+    public WriterBatchConfig(String tableName, String event, Map<String, String> command, List<Field> fields, List<Map> data,boolean enableOnlyUpdate) {
         this.tableName = tableName;
         this.event = event;
         this.command = command;
         this.fields = fields;
         this.data = data;
+        this.enableOnlyUpdate=enableOnlyUpdate;
     }
 
     public String getTableName() {
@@ -56,4 +59,7 @@ public class WriterBatchConfig {
         return data;
     }
 
+    public boolean isEnableOnlyUpdate() {
+        return enableOnlyUpdate;
+    }
 }

+ 9 - 2
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDatabaseConnector.java

@@ -193,6 +193,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             logger.error("writer data can not be empty.");
             throw new SdkException("writer data can not be empty.");
         }
+        logger.info("enableOnlyUpdate is {}", config.isEnableOnlyUpdate());
         List<Field> fields = new ArrayList<>(config.getFields());
         List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
         // Update / Delete
@@ -211,7 +212,11 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             // 2、设置参数
             execute = connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(executeSql, batchRows(fields, data)));
         } catch (Exception e) {
-            data.forEach(row -> forceUpdate(result, connectorInstance, config, pkFields, row));
+            data.forEach(row -> {
+                if(!config.isEnableOnlyUpdate()){
+                    forceUpdate(result, connectorInstance, config, pkFields, row);
+                }
+            });
         }
 
         if (null != execute) {
@@ -221,7 +226,9 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
                     result.getSuccessData().add(data.get(i));
                     continue;
                 }
-                forceUpdate(result, connectorInstance, config, pkFields, data.get(i));
+                if(!config.isEnableOnlyUpdate()){
+                    forceUpdate(result, connectorInstance, config, pkFields, data.get(i));
+                }
             }
         }
         return result;

+ 6 - 0
dbsyncer-web/src/main/resources/public/system/system.html

@@ -63,6 +63,12 @@
                         <input class="systemConfigSwitch" name="enableCDN" th:checked="${config?.enableCDN}" type="checkbox" />
                     </div>
                 </div>
+                <div class="form-group">
+                    <label class="col-sm-4 control-label">仅更新(仅更新存在的记录)</label>
+                    <div class="col-sm-8">
+                        <input class="systemConfigSwitch" name="enableOnlyUpdate" th:checked="${config?.enableOnlyUpdate}" type="checkbox" />
+                    </div>
+                </div>
                 <div class="form-group">
                     <div class="text-right col-sm-4"></div>
                     <div class="text-right col-sm-8">