Browse Source

支持控制写入增量数据开关 https://gitee.com/ghi/dbsyncer/issues/I5QO67

AE86 2 years ago
parent
commit
d38f470ab3

+ 53 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/IncrementDataConfig.java

@@ -0,0 +1,53 @@
+package org.dbsyncer.common.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/9/21 22:20
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.parser.flush.increment.data")
+public class IncrementDataConfig {
+
+    /**
+     * 是否记录同步成功数据
+     */
+    private boolean writeSuccess;
+
+    /**
+     * 是否记录同步失败数据
+     */
+    private boolean writeFail;
+
+    /**
+     * 最大记录异常信息长度
+     */
+    private int maxErrorLength;
+
+    public boolean isWriteSuccess() {
+        return writeSuccess;
+    }
+
+    public void setWriteSuccess(boolean writeSuccess) {
+        this.writeSuccess = writeSuccess;
+    }
+
+    public boolean isWriteFail() {
+        return writeFail;
+    }
+
+    public void setWriteFail(boolean writeFail) {
+        this.writeFail = writeFail;
+    }
+
+    public int getMaxErrorLength() {
+        return maxErrorLength;
+    }
+
+    public void setMaxErrorLength(int maxErrorLength) {
+        this.maxErrorLength = maxErrorLength;
+    }
+}

+ 1 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/snowflake/SnowflakeIdWorker.java

@@ -7,7 +7,7 @@ import org.springframework.stereotype.Component;
 import java.time.Instant;
 
 @Component
-@ConfigurationProperties(prefix = "dbsyncer.common.worker")
+@ConfigurationProperties(prefix = "dbsyncer.web.worker")
 public class SnowflakeIdWorker {
 
     /**

+ 2 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseValueMapper.java

@@ -3,7 +3,6 @@ package org.dbsyncer.connector.database;
 import com.microsoft.sqlserver.jdbc.Geometry;
 import oracle.jdbc.OracleConnection;
 import oracle.spatial.geometry.JGeometry;
-import oracle.sql.STRUCT;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
 
@@ -48,15 +47,14 @@ public class DatabaseValueMapper {
         return connection.createClob();
     }
 
-    public Struct getSTRUCT(byte[] val) throws SQLException {
+    public Struct getStruct(byte[] val) throws SQLException {
         if (connection.getConnection() instanceof OracleConnection) {
             OracleConnection conn = connection.unwrap(OracleConnection.class);
             Geometry geometry = Geometry.deserialize(val);
             Double x = geometry.getX();
             Double y = geometry.getY();
             JGeometry jGeometry = new JGeometry(x, y, 0);
-            STRUCT struct = JGeometry.store(jGeometry, conn);
-            return struct;
+            return JGeometry.store(jGeometry, conn);
         }
         throw new ConnectorException(String.format("%s can not get STRUCT [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }

+ 2 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/OtherValueMapper.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.connector.schema;
 
-import oracle.sql.STRUCT;
 import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
@@ -19,7 +18,7 @@ public class OtherValueMapper extends AbstractValueMapper<Struct> {
 
     @Override
     protected Struct convert(ConnectorMapper connectorMapper, Object val) throws Exception {
-        if (val instanceof STRUCT) {
+        if (val instanceof oracle.sql.STRUCT) {
             return (Struct) val;
         }
         // SqlServer Geometry
@@ -27,7 +26,7 @@ public class OtherValueMapper extends AbstractValueMapper<Struct> {
             Object connection = connectorMapper.getConnection();
             if (connection instanceof Connection) {
                 final DatabaseValueMapper mapper = new DatabaseValueMapper((SimpleConnection) connection);
-                return mapper.getSTRUCT((byte[]) val);
+                return mapper.getStruct((byte[]) val);
             }
         }
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));

+ 9 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.cache.CacheService;
+import org.dbsyncer.common.config.IncrementDataConfig;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
@@ -16,14 +17,15 @@ import org.springframework.util.Assert;
  */
 public abstract class AbstractFlushStrategy implements FlushStrategy {
 
-    private static final int MAX_ERROR_LENGTH = 1000;
-
     @Autowired
     private FlushService flushService;
 
     @Autowired
     private CacheService cacheService;
 
+    @Autowired
+    private IncrementDataConfig flushDataConfig;
+
     @Override
     public void flushFullData(String metaId, Result result, String event) {
         flush(metaId, result, event);
@@ -37,11 +39,13 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     protected void flush(String metaId, Result result, String event) {
         refreshTotal(metaId, result);
 
-        if (!CollectionUtils.isEmpty(result.getFailData())) {
-            final String error = StringUtil.substring(result.getError().toString(), 0, MAX_ERROR_LENGTH);
+        if (flushDataConfig.isWriteFail() && !CollectionUtils.isEmpty(result.getFailData())) {
+            final String error = StringUtil.substring(result.getError().toString(), 0, flushDataConfig.getMaxErrorLength());
             flushService.write(metaId, event, false, result.getFailData(), error);
         }
-        if (!CollectionUtils.isEmpty(result.getSuccessData())) {
+
+        // 是否写增量数据
+        if (flushDataConfig.isWriteSuccess() && !CollectionUtils.isEmpty(result.getSuccessData())) {
             flushService.write(metaId, event, true, result.getSuccessData(), "");
         }
     }

+ 5 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser.flush.impl;
 
 import com.alibaba.fastjson.JSONException;
+import org.dbsyncer.common.config.IncrementDataConfig;
 import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
@@ -44,6 +45,9 @@ public class FlushServiceImpl implements FlushService {
     @Autowired
     private BufferActuator storageBufferActuator;
 
+    @Autowired
+    private IncrementDataConfig flushDataConfig;
+
     @Override
     public void asyncWrite(String type, String error) {
         Map<String, Object> params = new HashMap();
@@ -81,7 +85,7 @@ public class FlushServiceImpl implements FlushService {
      * @return
      */
     private String substring(String error){
-        return StringUtil.isNotBlank(error) ? StringUtil.substring(error, 0, 2048) : error;
+        return StringUtil.substring(error, 0, flushDataConfig.getMaxErrorLength());
     }
 
 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableFlushStrategy.java

@@ -12,7 +12,7 @@ import org.springframework.stereotype.Component;
  * @date 2021/11/18 22:21
  */
 @Component
-@ConditionalOnProperty(value = "dbsyncer.parser.flush.full.enabled", havingValue = "true")
+@ConditionalOnProperty(value = "dbsyncer.parser.flush.full.data.enabled", havingValue = "true")
 public final class EnableFlushStrategy extends AbstractFlushStrategy {
 
 }

+ 16 - 13
dbsyncer-web/src/main/resources/application.properties

@@ -1,33 +1,36 @@
 server.ip=127.0.0.1
 server.port=18686
 #web
+dbsyncer.web.worker.id=1
 dbsyncer.web.login.username=admin
 dbsyncer.web.login.password=0DPiKuNIrrVmD8IUCuw1hQxNqZc=
-server.servlet.session.timeout=1800
-server.servlet.context-path=/
-#dbsyncer.common.worker.id=1
 dbsyncer.web.task.scheduler.pool-size=8
 dbsyncer.web.task.executor.core-size=10
 dbsyncer.web.task.executor.queue-capacity=1000
+server.servlet.session.timeout=1800
+server.servlet.context-path=/
 
 #parser
+dbsyncer.parser.flush.full.data.enabled=false
+dbsyncer.parser.flush.increment.data.write-success=true
+dbsyncer.parser.flush.increment.data.write-fail=true
+dbsyncer.parser.flush.increment.data.max-error-length=2048
 dbsyncer.parser.flush.buffer.actuator.speed.enabled=true
-#dbsyncer.parser.flush.buffer.actuator.writer-batch-count=100
-#dbsyncer.parser.flush.buffer.actuator.batch-count=1000
-#dbsyncer.parser.flush.buffer.actuator.queue-capacity=50000
-#dbsyncer.parser.flush.buffer.actuator.period-millisecond=300
-#dbsyncer.parser.flush.full.enabled=true
+dbsyncer.parser.flush.buffer.actuator.writer-batch-count=100
+dbsyncer.parser.flush.buffer.actuator.batch-count=1000
+dbsyncer.parser.flush.buffer.actuator.queue-capacity=50000
+dbsyncer.parser.flush.buffer.actuator.period-millisecond=300
 
 #storage
+dbsyncer.storage.binlog.recorder.batch-count=1000
+dbsyncer.storage.binlog.recorder.max-processing-seconds=120
+dbsyncer.storage.binlog.recorder.queue-capacity=10000
+dbsyncer.storage.binlog.recorder.writer-period-millisecond=500
+dbsyncer.storage.binlog.recorder.reader-period-millisecond=2000
 #dbsyncer.storage.support.mysql.enabled=true
 #dbsyncer.storage.support.mysql.config.url=jdbc:mysql://127.0.0.1:3306/dbsyncer?rewriteBatchedStatements=true&seUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false
 #dbsyncer.storage.support.mysql.config.username=root
 #dbsyncer.storage.support.mysql.config.password=123
-#dbsyncer.storage.binlog.recorder.batch-count=1000
-#dbsyncer.storage.binlog.recorder.max-processing-seconds=120
-#dbsyncer.storage.binlog.recorder.queue-capacity=10000
-#dbsyncer.storage.binlog.recorder.writer-period-millisecond=500
-#dbsyncer.storage.binlog.recorder.reader-period-millisecond=2000
 
 #monitor
 management.endpoints.web.base-path=/app