Browse Source

重试后,更正成功失败数

AE86 2 years ago
parent
commit
537b1fee83

+ 7 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -13,6 +13,7 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.monitor.Monitor;
 import org.dbsyncer.parser.flush.BufferActuator;
+import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.WriterRequest;
@@ -25,6 +26,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 
 import javax.annotation.Resource;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -145,6 +147,11 @@ public class DataSyncServiceImpl implements DataSyncService {
             }
             writerBufferActuator.offer(new WriterRequest(tableGroupId, event, binlogData));
             monitor.removeData(metaId, messageId);
+            // 更新失败数
+            Meta meta = cacheService.get(metaId, Meta.class);
+            Assert.notNull(meta, "Meta can not be null.");
+            meta.getFail().decrementAndGet();
+            meta.setUpdateTime(Instant.now().toEpochMilli());
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage());
         }

+ 9 - 13
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleOtherValueMapper.java

@@ -8,7 +8,6 @@ import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
 
-import java.sql.Connection;
 import java.sql.Struct;
 
 /**
@@ -22,7 +21,7 @@ import java.sql.Struct;
  * @version 1.0.0
  * @date 2022/12/22 22:59
  */
-public class OracleOtherValueMapper extends AbstractValueMapper<Struct> {
+public final class OracleOtherValueMapper extends AbstractValueMapper<Struct> {
 
     @Override
     protected boolean skipConvert(Object val) {
@@ -33,17 +32,14 @@ public class OracleOtherValueMapper extends AbstractValueMapper<Struct> {
     protected Struct convert(ConnectorMapper connectorMapper, Object val) throws Exception {
         // SqlServer Geometry
         if (val instanceof byte[]) {
-            Object connection = connectorMapper.getConnection();
-            if (connection instanceof Connection) {
-                SimpleConnection simpleConnection = (SimpleConnection) connection;
-                OracleConnection conn = simpleConnection.unwrap(OracleConnection.class);
-                // TODO 兼容Oracle STRUCT 字节数组
-                Geometry geometry = Geometry.deserialize((byte[]) val);
-                Double x = geometry.getX();
-                Double y = geometry.getY();
-                JGeometry jGeometry = new JGeometry(x, y, 0);
-                return JGeometry.store(jGeometry, conn);
-            }
+            SimpleConnection connection = (SimpleConnection) connectorMapper.getConnection();
+            OracleConnection conn = connection.unwrap(OracleConnection.class);
+            // TODO 兼容Oracle STRUCT 字节数组
+            Geometry geometry = Geometry.deserialize((byte[]) val);
+            Double x = geometry.getX();
+            Double y = geometry.getY();
+            JGeometry jGeometry = new JGeometry(x, y, 0);
+            return JGeometry.store(jGeometry, conn);
         }
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLOtherValueMapper.java

@@ -29,7 +29,7 @@ import org.postgis.binary.BinaryWriter;
  * @version 1.0.0
  * @date 2022/12/22 22:59
  */
-public class PostgreSQLOtherValueMapper extends AbstractValueMapper<byte[]> {
+public final class PostgreSQLOtherValueMapper extends AbstractValueMapper<byte[]> {
 
     @Override
     protected boolean skipConvert(Object val) {