AE86 3 年之前
父节点
当前提交
c99e278436

+ 7 - 7
dbsyncer-common/src/main/java/org/dbsyncer/common/event/RowChangedEvent.java

@@ -24,7 +24,7 @@ public class RowChangedEvent {
     private List<Object> afterData;
     private Map<String, Object> before;
     private Map<String, Object> after;
-    private boolean updateRowIfInsertFailed;
+    private boolean forceUpdate;
 
     public RowChangedEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
         this.tableGroupIndex = tableGroupIndex;
@@ -33,12 +33,12 @@ public class RowChangedEvent {
         this.after = after;
     }
 
-    public RowChangedEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after, boolean updateRowIfInsertFailed) {
+    public RowChangedEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after, boolean forceUpdate) {
         this.tableGroupIndex = tableGroupIndex;
         this.event = event;
         this.before = before;
         this.after = after;
-        this.updateRowIfInsertFailed = updateRowIfInsertFailed;
+        this.forceUpdate = forceUpdate;
     }
 
     public RowChangedEvent(String tableName, String event, List<Object> beforeData, List<Object> afterData) {
@@ -88,12 +88,12 @@ public class RowChangedEvent {
         this.after = after;
     }
 
-    public boolean isUpdateRowIfInsertFailed() {
-        return updateRowIfInsertFailed;
+    public boolean isForceUpdate() {
+        return forceUpdate;
     }
 
-    public RowChangedEvent setUpdateRowIfInsertFailed(boolean updateRowIfInsertFailed) {
-        this.updateRowIfInsertFailed = updateRowIfInsertFailed;
+    public RowChangedEvent setForceUpdate(boolean forceUpdate) {
+        this.forceUpdate = forceUpdate;
         return this;
     }
 

+ 8 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterSingleConfig.java

@@ -26,17 +26,17 @@ public class WriterSingleConfig extends WriterConfig {
     private boolean retry;
 
     /**
-     * 插入失败转更新
+     * 更新失败转插入
      */
-    private boolean updateRowIfInsertFailed;
+    private boolean forceUpdate;
 
-    public WriterSingleConfig(List<Field> fields, Map<String, String> command, String event, Map<String, Object> data, String table, boolean updateRowIfInsertFailed) {
+    public WriterSingleConfig(List<Field> fields, Map<String, String> command, String event, Map<String, Object> data, String table, boolean forceUpdate) {
         setCommand(command);
         setFields(fields);
         setData(data);
         setEvent(event);
         setTable(table);
-        this.updateRowIfInsertFailed = updateRowIfInsertFailed;
+        this.forceUpdate = forceUpdate;
     }
 
     public Map<String, Object> getData() {
@@ -74,12 +74,12 @@ public class WriterSingleConfig extends WriterConfig {
         this.retry = retry;
     }
 
-    public boolean isUpdateRowIfInsertFailed() {
-        return updateRowIfInsertFailed;
+    public boolean isForceUpdate() {
+        return forceUpdate;
     }
 
-    public WriterSingleConfig setUpdateRowIfInsertFailed(boolean updateRowIfInsertFailed) {
-        this.updateRowIfInsertFailed = updateRowIfInsertFailed;
+    public WriterSingleConfig setForceUpdate(boolean forceUpdate) {
+        this.forceUpdate = forceUpdate;
         return this;
     }
 }

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -182,7 +182,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             );
         } catch (Exception e) {
             // 记录错误数据
-            if(!config.isUpdateRowIfInsertFailed()){
+            if(!config.isForceUpdate()){
                 result.getFailData().add(data);
                 result.getFail().set(1);
                 result.getError().append("SQL:").append(sql).append(System.lineSeparator())
@@ -200,7 +200,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
                     fields.remove(fields.size() - 1);
                     config.setEvent(ConnectorConstant.OPERTION_INSERT);
                     config.setRetry(true);
-                    logger.warn("{}表执行{}失败, 尝试执行{}", config.getTable(), event, config.getEvent());
+                    logger.warn("{}表执行{}失败, 尝试执行{}, {}", config.getTable(), event, config.getEvent(), data);
                     return writer(connectorMapper, config);
                 }
                 return result;

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -107,7 +107,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
             Object event = null;
             for (Map<String, Object> row : data) {
                 if(StringUtil.isBlank(eventFieldName)){
-                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_MAP, row, true));
+                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_MAP, row, true));
                     continue;
                 }
 

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -319,7 +319,7 @@ public class ParserFactory implements Parser {
         pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
 
         // 4、写入目标源
-        Result writer = connectorFactory.writer(tConnectorMapper, new WriterSingleConfig(picker.getTargetFields(), tableGroup.getCommand(), event, target, rowChangedEvent.getTableName(), rowChangedEvent.isUpdateRowIfInsertFailed()));
+        Result writer = connectorFactory.writer(tConnectorMapper, new WriterSingleConfig(picker.getTargetFields(), tableGroup.getCommand(), event, target, rowChangedEvent.getTableName(), rowChangedEvent.isForceUpdate()));
 
         // 5、更新结果
         flush(metaId, writer, event, picker.getTargetMapList());