Bläddra i källkod

调整覆盖写入开关

AE86 1 år sedan
förälder
incheckning
1bc375ab7e

+ 2 - 6
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MappingChecker.java

@@ -98,17 +98,13 @@ public class MappingChecker extends AbstractChecker {
         mapping.setReadNum(NumberUtil.toInt(params.get("readNum"), mapping.getReadNum()));
         mapping.setReadNum(NumberUtil.toInt(params.get("readNum"), mapping.getReadNum()));
         mapping.setBatchNum(NumberUtil.toInt(params.get("batchNum"), mapping.getBatchNum()));
         mapping.setBatchNum(NumberUtil.toInt(params.get("batchNum"), mapping.getBatchNum()));
         mapping.setThreadNum(NumberUtil.toInt(params.get("threadNum"), mapping.getThreadNum()));
         mapping.setThreadNum(NumberUtil.toInt(params.get("threadNum"), mapping.getThreadNum()));
-        if(StringUtils.equals(mapping.getModel(),ModelEnum.FULL.getCode())){
-            mapping.setEnableOnlyUpdate(!StringUtil.isNotBlank(params.get("enableOnlyUpdate4Full")));
-        }
+        String forceUpdate = StringUtils.equals(mapping.getModel(), ModelEnum.FULL.getCode()) ? "forceUpdate4Full" : "forceUpdate";
+        mapping.setForceUpdate(StringUtil.isBlank(params.get(forceUpdate)));
 
 
         // 增量配置(日志/定时)
         // 增量配置(日志/定时)
         String incrementStrategy = params.get("incrementStrategy");
         String incrementStrategy = params.get("incrementStrategy");
         Assert.hasText(incrementStrategy, "MappingChecker check params incrementStrategy is empty");
         Assert.hasText(incrementStrategy, "MappingChecker check params incrementStrategy is empty");
         String type = StringUtil.toLowerCaseFirstOne(incrementStrategy).concat("ConfigChecker");
         String type = StringUtil.toLowerCaseFirstOne(incrementStrategy).concat("ConfigChecker");
-        if(StringUtils.equals(mapping.getModel(),ModelEnum.INCREMENT.getCode())){
-            mapping.setEnableOnlyUpdate(!StringUtil.isNotBlank(params.get("enableOnlyUpdate")));
-        }
         MappingConfigChecker checker = map.get(type);
         MappingConfigChecker checker = map.get(type);
         Assert.notNull(checker, "Checker can not be null.");
         Assert.notNull(checker, "Checker can not be null.");
         checker.modify(mapping, params);
         checker.modify(mapping, params);

+ 0 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/system/SystemConfigChecker.java

@@ -49,7 +49,6 @@ public class SystemConfigChecker extends AbstractChecker {
         logger.info("params:{}", params);
         logger.info("params:{}", params);
         Assert.notEmpty(params, "Config check params is null.");
         Assert.notEmpty(params, "Config check params is null.");
         params.put("enableCDN", StringUtil.isNotBlank(params.get("enableCDN")) ? "true" : "false");
         params.put("enableCDN", StringUtil.isNotBlank(params.get("enableCDN")) ? "true" : "false");
-        params.put("enableOnlyUpdate", !StringUtil.isNotBlank(params.get("enableOnlyUpdate")) ? "true" : "false");
         params.put("enableStorageWriteFull", StringUtil.isNotBlank(params.get("enableStorageWriteFull")) ? "true" : "false");
         params.put("enableStorageWriteFull", StringUtil.isNotBlank(params.get("enableStorageWriteFull")) ? "true" : "false");
         params.put("enableStorageWriteSuccess", StringUtil.isNotBlank(params.get("enableStorageWriteSuccess")) ? "true" : "false");
         params.put("enableStorageWriteSuccess", StringUtil.isNotBlank(params.get("enableStorageWriteSuccess")) ? "true" : "false");
         params.put("enableStorageWriteFail", StringUtil.isNotBlank(params.get("enableStorageWriteFail")) ? "true" : "false");
         params.put("enableStorageWriteFail", StringUtil.isNotBlank(params.get("enableStorageWriteFail")) ? "true" : "false");

+ 1 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -148,10 +148,9 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         final ConnectorInstance tConnectorInstance = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
         final ConnectorInstance tConnectorInstance = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
         final IncrementPluginContext context = new IncrementPluginContext(sConnectorInstance, tConnectorInstance, sourceTableName, targetTableName, event, sourceDataList, targetDataList);
         final IncrementPluginContext context = new IncrementPluginContext(sConnectorInstance, tConnectorInstance, sourceTableName, targetTableName, event, sourceDataList, targetDataList);
         pluginFactory.convert(group.getPlugin(), context);
         pluginFactory.convert(group.getPlugin(), context);
-        boolean enableOnlyUpdate = mapping.isEnableOnlyUpdate()?true:profileComponent.getSystemConfig().isEnableOnlyUpdate();
 
 
         // 5、批量执行同步
         // 5、批量执行同步
-        BatchWriter batchWriter = new BatchWriter(tConnectorInstance, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, generalBufferConfig.getBufferWriterCount(),enableOnlyUpdate);
+        BatchWriter batchWriter = new BatchWriter(tConnectorInstance, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, generalBufferConfig.getBufferWriterCount(), mapping.isForceUpdate());
         Result result = parserComponent.writeBatch(context, batchWriter, getExecutor());
         Result result = parserComponent.writeBatch(context, batchWriter, getExecutor());
 
 
         // 6.发布刷新增量点事件
         // 6.发布刷新增量点事件

+ 3 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/ParserComponentImpl.java

@@ -176,8 +176,7 @@ public class ParserComponentImpl implements ParserComponent {
             pluginFactory.convert(group.getPlugin(), context);
             pluginFactory.convert(group.getPlugin(), context);
 
 
             // 5、写入目标源
             // 5、写入目标源
-            boolean enableOnlyUpdate = mapping.isEnableOnlyUpdate()?true:profileComponent.getSystemConfig().isEnableOnlyUpdate();
-            BatchWriter batchWriter = new BatchWriter(tConnectorInstance, command, tTableName, event, picker.getTargetFields(), target, batchSize,enableOnlyUpdate);
+            BatchWriter batchWriter = new BatchWriter(tConnectorInstance, command, tTableName, event, picker.getTargetFields(), target, batchSize, mapping.isForceUpdate());
             Result result = writeBatch(context, batchWriter, executor);
             Result result = writeBatch(context, batchWriter, executor);
 
 
             // 6、更新结果
             // 6、更新结果
@@ -217,7 +216,7 @@ public class ParserComponentImpl implements ParserComponent {
         int total = dataList.size();
         int total = dataList.size();
         // 单次任务
         // 单次任务
         if (total <= batchSize) {
         if (total <= batchSize) {
-            return connectorFactory.writer(batchWriter.getConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, dataList, batchWriter.isEnableOnlyUpdate()));
+            return connectorFactory.writer(batchWriter.getConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, dataList, batchWriter.isForceUpdate()));
         }
         }
 
 
         // 批量任务, 拆分
         // 批量任务, 拆分
@@ -239,7 +238,7 @@ public class ParserComponentImpl implements ParserComponent {
 
 
             executor.execute(() -> {
             executor.execute(() -> {
                 try {
                 try {
-                    Result w = connectorFactory.writer(batchWriter.getConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, data,batchWriter.isEnableOnlyUpdate()));
+                    Result w = connectorFactory.writer(batchWriter.getConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, data, batchWriter.isForceUpdate()));
                     result.addSuccessData(w.getSuccessData());
                     result.addSuccessData(w.getSuccessData());
                     result.addFailData(w.getFailData());
                     result.addFailData(w.getFailData());
                     result.getError().append(w.getError());
                     result.getError().append(w.getError());

+ 5 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/BatchWriter.java

@@ -15,10 +15,10 @@ public final class BatchWriter {
     private List<Field> fields;
     private List<Field> fields;
     private List<Map> dataList;
     private List<Map> dataList;
     private int batchSize;
     private int batchSize;
-    private boolean enableOnlyUpdate;
+    private boolean forceUpdate;
 
 
     public BatchWriter(ConnectorInstance connectorInstance, Map<String, String> command, String tableName, String event,
     public BatchWriter(ConnectorInstance connectorInstance, Map<String, String> command, String tableName, String event,
-                       List<Field> fields, List<Map> dataList, int batchSize,boolean enableOnlyUpdate) {
+                       List<Field> fields, List<Map> dataList, int batchSize, boolean forceUpdate) {
         this.connectorInstance = connectorInstance;
         this.connectorInstance = connectorInstance;
         this.command = command;
         this.command = command;
         this.tableName = tableName;
         this.tableName = tableName;
@@ -26,7 +26,7 @@ public final class BatchWriter {
         this.fields = fields;
         this.fields = fields;
         this.dataList = dataList;
         this.dataList = dataList;
         this.batchSize = batchSize;
         this.batchSize = batchSize;
-        this.enableOnlyUpdate=enableOnlyUpdate;
+        this.forceUpdate = forceUpdate;
     }
     }
 
 
     public ConnectorInstance getConnectorInstance() {
     public ConnectorInstance getConnectorInstance() {
@@ -57,7 +57,7 @@ public final class BatchWriter {
         return batchSize;
         return batchSize;
     }
     }
 
 
-    public boolean isEnableOnlyUpdate() {
-        return enableOnlyUpdate;
+    public boolean isForceUpdate() {
+        return forceUpdate;
     }
     }
 }
 }

+ 7 - 8
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Mapping.java

@@ -53,10 +53,9 @@ public class Mapping extends AbstractConfigModel {
 
 
     // 线程数
     // 线程数
     private int threadNum = 10;
     private int threadNum = 10;
-    /**
-     * 是否启用仅更新
-     */
-    private boolean enableOnlyUpdate = false;
+
+    // 覆盖写入
+    private boolean forceUpdate = true;
 
 
     public String getSourceConnectorId() {
     public String getSourceConnectorId() {
         return sourceConnectorId;
         return sourceConnectorId;
@@ -142,11 +141,11 @@ public class Mapping extends AbstractConfigModel {
         this.threadNum = threadNum;
         this.threadNum = threadNum;
     }
     }
 
 
-    public boolean isEnableOnlyUpdate() {
-        return enableOnlyUpdate;
+    public boolean isForceUpdate() {
+        return forceUpdate;
     }
     }
 
 
-    public void setEnableOnlyUpdate(boolean enableOnlyUpdate) {
-        this.enableOnlyUpdate = enableOnlyUpdate;
+    public void setForceUpdate(boolean forceUpdate) {
+        this.forceUpdate = forceUpdate;
     }
     }
 }
 }

+ 0 - 13
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/SystemConfig.java

@@ -39,11 +39,6 @@ public class SystemConfig extends ConfigModel {
      */
      */
     private boolean enableCDN;
     private boolean enableCDN;
 
 
-    /**
-     * 是否启用仅更新
-     */
-    private boolean enableOnlyUpdate = false;
-
     /**
     /**
      * 是否记录全量数据(false-关闭; true-开启)
      * 是否记录全量数据(false-关闭; true-开启)
      */
      */
@@ -96,14 +91,6 @@ public class SystemConfig extends ConfigModel {
         this.enableCDN = enableCDN;
         this.enableCDN = enableCDN;
     }
     }
 
 
-    public boolean isEnableOnlyUpdate() {
-        return enableOnlyUpdate;
-    }
-
-    public void setEnableOnlyUpdate(boolean enableOnlyUpdate) {
-        this.enableOnlyUpdate = enableOnlyUpdate;
-    }
-
     public boolean isEnableStorageWriteFull() {
     public boolean isEnableStorageWriteFull() {
         return enableStorageWriteFull;
         return enableStorageWriteFull;
     }
     }

+ 8 - 6
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/WriterBatchConfig.java

@@ -27,16 +27,18 @@ public class WriterBatchConfig {
      * 集合数据
      * 集合数据
      */
      */
     private List<Map> data;
     private List<Map> data;
+    /**
+     * 覆盖写入
+     */
+    private boolean forceUpdate;
 
 
-    private boolean enableOnlyUpdate;
-
-    public WriterBatchConfig(String tableName, String event, Map<String, String> command, List<Field> fields, List<Map> data,boolean enableOnlyUpdate) {
+    public WriterBatchConfig(String tableName, String event, Map<String, String> command, List<Field> fields, List<Map> data, boolean forceUpdate) {
         this.tableName = tableName;
         this.tableName = tableName;
         this.event = event;
         this.event = event;
         this.command = command;
         this.command = command;
         this.fields = fields;
         this.fields = fields;
         this.data = data;
         this.data = data;
-        this.enableOnlyUpdate=enableOnlyUpdate;
+        this.forceUpdate = forceUpdate;
     }
     }
 
 
     public String getTableName() {
     public String getTableName() {
@@ -59,7 +61,7 @@ public class WriterBatchConfig {
         return data;
         return data;
     }
     }
 
 
-    public boolean isEnableOnlyUpdate() {
-        return enableOnlyUpdate;
+    public boolean isForceUpdate() {
+        return forceUpdate;
     }
     }
 }
 }

+ 2 - 2
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDatabaseConnector.java

@@ -211,7 +211,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             // 2、设置参数
             // 2、设置参数
             execute = connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(executeSql, batchRows(fields, data)));
             execute = connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(executeSql, batchRows(fields, data)));
         } catch (Exception e) {
         } catch (Exception e) {
-            if(!config.isEnableOnlyUpdate()){
+            if(!config.isForceUpdate()){
               data.forEach(row -> forceUpdate(result, connectorInstance, config, pkFields, row));
               data.forEach(row -> forceUpdate(result, connectorInstance, config, pkFields, row));
             }
             }
         }
         }
@@ -223,7 +223,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
                     result.getSuccessData().add(data.get(i));
                     result.getSuccessData().add(data.get(i));
                     continue;
                     continue;
                 }
                 }
-                if(!config.isEnableOnlyUpdate()){
+                if(!config.isForceUpdate()){
                     forceUpdate(result, connectorInstance, config, pkFields, data.get(i));
                     forceUpdate(result, connectorInstance, config, pkFields, data.get(i));
                 }
                 }
             }
             }

+ 18 - 14
dbsyncer-web/src/main/resources/public/mapping/editFull.html

@@ -7,31 +7,35 @@
 
 
     <div class="form-group">
     <div class="form-group">
         <div class="row">
         <div class="row">
-            <div class="col-md-3">
-                <label class="col-sm-5 control-label text-right">批量读取<strong class="driverVerifcateRequired">*</strong></label>
-                <div class="col-sm-7">
+            <div class="col-md-4">
+                <label class="col-sm-3 control-label text-right">批量读取<strong class="driverVerifcateRequired">*</strong></label>
+                <div class="col-sm-9">
                     <input type="number" name="readNum" class="form-control" min="1" max="200000" dbsyncer-valid="require" th:value="${mapping?.readNum}">
                     <input type="number" name="readNum" class="form-control" min="1" max="200000" dbsyncer-valid="require" th:value="${mapping?.readNum}">
                 </div>
                 </div>
             </div>
             </div>
-            <div class="col-md-3">
-                <label class="col-sm-5 control-label text-right">单次写入<strong class="driverVerifcateRequired">*</strong></label>
-                <div class="col-sm-7">
+            <div class="col-md-4">
+                <label class="col-sm-3 control-label text-right">单次写入<strong class="driverVerifcateRequired">*</strong></label>
+                <div class="col-sm-9">
                     <input type="number" name="batchNum" class="form-control" min="1" max="20000" dbsyncer-valid="require" th:value="${mapping?.batchNum}">
                     <input type="number" name="batchNum" class="form-control" min="1" max="20000" dbsyncer-valid="require" th:value="${mapping?.batchNum}">
                 </div>
                 </div>
             </div>
             </div>
-            <div class="col-md-3">
-                <label class="col-sm-5 control-label text-right">线程数<strong class="driverVerifcateRequired">*</strong></label>
-                <div class="col-sm-7">
+            <div class="col-md-4">
+                <label class="col-sm-3 control-label text-right">线程数<strong class="driverVerifcateRequired">*</strong></label>
+                <div class="col-sm-9">
                     <input type="number" name="threadNum" class="form-control" min="1" max="64" dbsyncer-valid="require" th:value="${mapping?.threadNum}">
                     <input type="number" name="threadNum" class="form-control" min="1" max="64" dbsyncer-valid="require" th:value="${mapping?.threadNum}">
                 </div>
                 </div>
             </div>
             </div>
-            <div class="col-md-3">
-                <label class="col-sm-4 control-label text-right">覆盖<i class="fa fa-question-circle fa_gray" aria-hidden="true" title="[开启]-目标表不存在该条数据时, 会执行insert, 存在,会执行update; [关闭]-不生效"></i></label>
-                <div class="col-sm-8">
-
-                    <input class="banEventSwitch" name="enableOnlyUpdate4Full" th:checked="${mapping?.enableOnlyUpdate}!=true" type="checkbox">
+        </div>
+    </div>
+    <div class="form-group">
+        <div class="row">
+            <div class="col-md-4">
+                <label class="col-sm-3 control-label text-right">覆盖 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="[开启]-目标表不存在该条数据时, 会执行insert, 存在,会执行update; [关闭]-不生效"></i></label>
+                <div class="col-sm-9">
+                    <input name="forceUpdate" class="banEventSwitch" th:checked="${mapping?.forceUpdate}" type="checkbox">
                 </div>
                 </div>
             </div>
             </div>
+            <div class="col-md-8"></div>
         </div>
         </div>
     </div>
     </div>
 </div>
 </div>

+ 13 - 12
dbsyncer-web/src/main/resources/public/mapping/editIncrement.html

@@ -49,40 +49,41 @@
     <!-- 增量事件配置 -->
     <!-- 增量事件配置 -->
     <div class="form-group">
     <div class="form-group">
         <div class="row">
         <div class="row">
-            <div class="col-md-3">
+            <div class="col-md-4">
                 <label class="col-sm-3 control-label text-right">新增*</label>
                 <label class="col-sm-3 control-label text-right">新增*</label>
                 <div class="col-sm-9">
                 <div class="col-sm-9">
                     <input name="banInsert" class="banEventSwitch" th:checked="${mapping?.listener?.banInsert}" type="checkbox">
                     <input name="banInsert" class="banEventSwitch" th:checked="${mapping?.listener?.banInsert}" type="checkbox">
                 </div>
                 </div>
             </div>
             </div>
-            <div class="col-md-3">
+            <div class="col-md-4">
                 <label class="col-sm-3 control-label text-right">修改*</label>
                 <label class="col-sm-3 control-label text-right">修改*</label>
                 <div class="col-sm-9">
                 <div class="col-sm-9">
                     <input name="banUpdate" class="banEventSwitch" th:checked="${mapping?.listener?.banUpdate}" type="checkbox">
                     <input name="banUpdate" class="banEventSwitch" th:checked="${mapping?.listener?.banUpdate}" type="checkbox">
                 </div>
                 </div>
             </div>
             </div>
-            <div class="col-md-3">
+            <div class="col-md-4">
                 <label class="col-sm-3 control-label text-right">删除*</label>
                 <label class="col-sm-3 control-label text-right">删除*</label>
                 <div class="col-sm-9">
                 <div class="col-sm-9">
                     <input name="banDelete" class="banEventSwitch" th:checked="${mapping?.listener?.banDelete}" type="checkbox">
                     <input name="banDelete" class="banEventSwitch" th:checked="${mapping?.listener?.banDelete}" type="checkbox">
                 </div>
                 </div>
             </div>
             </div>
-            <div class="col-md-3">
+        </div>
+    </div>
+    <div class="form-group">
+        <div class="row">
+            <div class="col-md-4">
                 <label class="col-sm-3 control-label text-right">ddl*</label>
                 <label class="col-sm-3 control-label text-right">ddl*</label>
                 <div class="col-sm-9">
                 <div class="col-sm-9">
                     <input name="banDDL" class="banEventSwitch" th:checked="${mapping?.listener?.banDDL}" type="checkbox">
                     <input name="banDDL" class="banEventSwitch" th:checked="${mapping?.listener?.banDDL}" type="checkbox">
                 </div>
                 </div>
             </div>
             </div>
-        </div>
-    </div>
-    <div class="form-group">
-        <div class="row">
-            <div class="col-md-3">
-                <label class="col-sm-4 control-label text-right">覆盖<i class="fa fa-question-circle fa_gray" aria-hidden="true" title="[开启]-目标表不存在该条数据时, 会执行insert, 存在,会执行update; [关闭]-不生效"></i></label>
-                <div class="col-sm-8">
-                    <input name="enableOnlyUpdate" class="banEventSwitch"  th:checked="${mapping?.enableOnlyUpdate}!=true" type="checkbox">
+            <div class="col-md-4">
+                <label class="col-sm-3 control-label text-right">覆盖 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="[开启]-目标表不存在该条数据时, 会执行insert, 存在,会执行update; [关闭]-不生效"></i></label>
+                <div class="col-sm-9">
+                    <input name="forceUpdate" class="banEventSwitch" th:checked="${mapping?.forceUpdate}" type="checkbox">
                 </div>
                 </div>
             </div>
             </div>
+            <div class="col-md-4"></div>
         </div>
         </div>
     </div>
     </div>
     <!-- 定时配置 -->
     <!-- 定时配置 -->

+ 0 - 6
dbsyncer-web/src/main/resources/public/system/system.html

@@ -63,12 +63,6 @@
                         <input class="systemConfigSwitch" name="enableCDN" th:checked="${config?.enableCDN}" type="checkbox" />
                         <input class="systemConfigSwitch" name="enableCDN" th:checked="${config?.enableCDN}" type="checkbox" />
                     </div>
                     </div>
                 </div>
                 </div>
-                <div class="form-group">
-                    <label class="col-sm-4 control-label">覆盖<i class="fa fa-question-circle fa_gray" aria-hidden="true" title="[开启]-目标表不存在该条数据时, 会执行insert, 存在,会执行update; [关闭]-不生效"></i></label>
-                    <div class="col-sm-8">
-                        <input class="systemConfigSwitch" name="enableOnlyUpdate" th:checked="${config?.enableOnlyUpdate}!=true" type="checkbox" />
-                    </div>
-                </div>
                 <div class="form-group">
                 <div class="form-group">
                     <div class="text-right col-sm-4"></div>
                     <div class="text-right col-sm-4"></div>
                     <div class="text-right col-sm-8">
                     <div class="text-right col-sm-8">