Forráskód Böngészése

修复全量覆盖写入

Signed-off-by: AE86 <836391306@qq.com>
AE86 2 éve
szülő
commit
2ff859fcef

+ 5 - 15
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -553,21 +553,11 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
     private void forceUpdate(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField,
                              Map row) {
-        // 不存在转insert
-        if (isUpdate(config.getEvent())) {
-            String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
-            if (!existRow(connectorMapper, queryCount, row.get(pkField.getName()))) {
-                logger.warn("{}表执行{}失败, 尝试执行{}, {}", config.getTableName(), config.getEvent(), ConnectorConstant.OPERTION_INSERT, row);
-                writer(result, connectorMapper, config, pkField, row, ConnectorConstant.OPERTION_INSERT);
-            }
-            return;
-        }
-
-        // 存在转update
-        if (isInsert(config.getEvent())) {
-            logger.warn("{}表执行{}失败, 尝试执行{}, {}", config.getTableName(), config.getEvent(), ConnectorConstant.OPERTION_UPDATE, row);
-            writer(result, connectorMapper, config, pkField, row, ConnectorConstant.OPERTION_UPDATE);
-        }
+        // 存在执行覆盖更新,否则写入
+        final String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
+        final String event = existRow(connectorMapper, queryCount, row.get(pkField.getName())) ? ConnectorConstant.OPERTION_UPDATE : ConnectorConstant.OPERTION_INSERT;
+        logger.warn("{}表执行{}失败, 重新执行{}, {}", config.getTableName(), config.getEvent(), event, row);
+        writer(result, connectorMapper, config, pkField, row, event);
     }
 
     private void writer(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField, Map row,

+ 2 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/BinaryValueMapper.java

@@ -4,17 +4,15 @@ import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
 
-import java.math.BigDecimal;
-
 /**
  * @author AE86
  * @version 1.0.0
  * @date 2022/8/24 23:43
  */
-public class BinaryValueMapper extends AbstractValueMapper<BigDecimal> {
+public class BinaryValueMapper extends AbstractValueMapper<byte[]> {
 
     @Override
-    protected BigDecimal convert(ConnectorMapper connectorMapper, Object val) {
+    protected byte[] convert(ConnectorMapper connectorMapper, Object val) {
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
 }

+ 1 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/impl/PreloadTemplate.java

@@ -83,7 +83,6 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
             total += paging.getTotal();
             pageNum++;
         }
-
         logger.info("PreLoad {}:{}", modelType, total);
     }
 
@@ -168,4 +167,4 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
         launch();
     }
 
-}
+}

+ 1 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/DateHandler.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.parser.convert.handler;
 
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.convert.Handler;
 
 import java.sql.Date;
@@ -17,6 +16,6 @@ public class DateHandler implements Handler {
 
     @Override
     public Object handle(String args, Object value) {
-        return null == value || StringUtil.isBlank(String.valueOf(value)) ? Date.valueOf(LocalDate.now()) : value;
+        return Date.valueOf(LocalDate.now());
     }
 }

+ 1 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/TimestampHandler.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.parser.convert.handler;
 
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.convert.Handler;
 
 import java.sql.Timestamp;
@@ -17,6 +16,6 @@ public class TimestampHandler implements Handler {
 
     @Override
     public Object handle(String args, Object value) {
-        return null == value || StringUtil.isBlank(String.valueOf(value)) ? new Timestamp(Instant.now().toEpochMilli()) : value;
+        return new Timestamp(Instant.now().toEpochMilli());
     }
 }

+ 5 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableFullFlushStrategy.java

@@ -5,6 +5,8 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.parser.flush.AbstractFlushStrategy;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
 /**
@@ -16,6 +18,8 @@ import org.springframework.beans.factory.annotation.Autowired;
  */
 public final class DisableFullFlushStrategy extends AbstractFlushStrategy {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
     @Autowired
     private LogService logService;
 
@@ -25,6 +29,7 @@ public final class DisableFullFlushStrategy extends AbstractFlushStrategy {
         refreshTotal(metaId, result);
 
         if (!CollectionUtils.isEmpty(result.getFailData())) {
+            logger.error(result.getError().toString());
             LogType logType = LogType.TableGroupLog.FULL_FAILED;
             logService.log(logType, "%s:%s", logType.getMessage(), result.getError().toString());
         }