Browse Source

!84 merge
Merge pull request !84 from AE86/V_1.0.0_Beta

AE86 2 years ago
parent
commit
3e862799dd
43 changed files with 394 additions and 134 deletions
  1. 1 1
      dbsyncer-biz/pom.xml
  2. 1 1
      dbsyncer-cache/pom.xml
  3. 1 1
      dbsyncer-cluster/pom.xml
  4. 1 1
      dbsyncer-common/pom.xml
  5. 53 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/config/IncrementDataConfig.java
  6. 1 1
      dbsyncer-common/src/main/java/org/dbsyncer/common/snowflake/SnowflakeIdWorker.java
  7. 1 1
      dbsyncer-connector/pom.xml
  8. 31 38
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java
  9. 11 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractValueMapper.java
  10. 3 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java
  11. 2 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  12. 2 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseValueMapper.java
  13. 6 6
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java
  14. 5 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java
  15. 39 12
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java
  16. 5 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/BinaryValueMapper.java
  17. 6 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/DoubleValueMapper.java
  18. 28 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/GeometryValueMapper.java
  19. 5 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/LongVarBinaryValueMapper.java
  20. 2 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/OtherValueMapper.java
  21. 4 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/TinyintValueMapper.java
  22. 5 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/VarBinaryValueMapper.java
  23. 11 6
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java
  24. 72 0
      dbsyncer-connector/src/main/test/ConnectionTest.java
  25. 1 1
      dbsyncer-listener/pom.xml
  26. 1 1
      dbsyncer-manager/pom.xml
  27. 11 3
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java
  28. 1 1
      dbsyncer-monitor/pom.xml
  29. 1 1
      dbsyncer-parser/pom.xml
  30. 6 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/AbstractWriterBinlog.java
  31. 3 7
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  32. 9 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java
  33. 5 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  34. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java
  35. 13 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Task.java
  36. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableFlushStrategy.java
  37. 1 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableWriterBufferActuatorStrategy.java
  38. 1 1
      dbsyncer-plugin/pom.xml
  39. 1 1
      dbsyncer-storage/pom.xml
  40. 7 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogColumnValue.java
  41. 1 1
      dbsyncer-web/pom.xml
  42. 17 14
      dbsyncer-web/src/main/resources/application.properties
  43. 16 1
      pom.xml

+ 1 - 1
dbsyncer-biz/pom.xml

@@ -5,7 +5,7 @@
 	<parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.0-Beta</version>
+        <version>1.2.1-RC</version>
     </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-biz</artifactId>

+ 1 - 1
dbsyncer-cache/pom.xml

@@ -4,7 +4,7 @@
 	<parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.0-Beta</version>
+        <version>1.2.1-RC</version>
     </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-cache</artifactId>

+ 1 - 1
dbsyncer-cluster/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.0-Beta</version>
+        <version>1.2.1-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-cluster</artifactId>

+ 1 - 1
dbsyncer-common/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.0-Beta</version>
+        <version>1.2.1-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-common</artifactId>

+ 53 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/config/IncrementDataConfig.java

@@ -0,0 +1,53 @@
+package org.dbsyncer.common.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/9/21 22:20
+ */
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.parser.flush.increment.data")
+public class IncrementDataConfig {
+
+    /**
+     * 是否记录同步成功数据
+     */
+    private boolean writerSuccess;
+
+    /**
+     * 是否记录同步失败数据
+     */
+    private boolean writerFail;
+
+    /**
+     * 最大记录异常信息长度
+     */
+    private int maxErrorLength;
+
+    public boolean isWriterSuccess() {
+        return writerSuccess;
+    }
+
+    public void setWriterSuccess(boolean writerSuccess) {
+        this.writerSuccess = writerSuccess;
+    }
+
+    public boolean isWriterFail() {
+        return writerFail;
+    }
+
+    public void setWriterFail(boolean writerFail) {
+        this.writerFail = writerFail;
+    }
+
+    public int getMaxErrorLength() {
+        return maxErrorLength;
+    }
+
+    public void setMaxErrorLength(int maxErrorLength) {
+        this.maxErrorLength = maxErrorLength;
+    }
+}

+ 1 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/snowflake/SnowflakeIdWorker.java

@@ -7,7 +7,7 @@ import org.springframework.stereotype.Component;
 import java.time.Instant;
 
 @Component
-@ConfigurationProperties(prefix = "dbsyncer.common.worker")
+@ConfigurationProperties(prefix = "dbsyncer.web.worker")
 public class SnowflakeIdWorker {
 
     /**

+ 1 - 1
dbsyncer-connector/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.0-Beta</version>
+        <version>1.2.1-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-connector</artifactId>

+ 31 - 38
dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java

@@ -14,49 +14,40 @@ import java.util.Map;
 
 public abstract class AbstractConnector {
 
-    private static final Map<Integer, ValueMapper> values = new LinkedHashMap<>();
+    protected static final Map<Integer, ValueMapper> valueMappers = new LinkedHashMap<>();
 
     static {
         // 常用类型
-        values.putIfAbsent(Types.VARCHAR, new VarcharValueMapper());
-        values.putIfAbsent(Types.INTEGER, new IntegerValueMapper());
-        values.putIfAbsent(Types.BIGINT, new BigintValueMapper());
-        values.putIfAbsent(Types.TIMESTAMP, new TimestampValueMapper());
-        values.putIfAbsent(Types.DATE, new DateValueMapper());
+        valueMappers.putIfAbsent(Types.VARCHAR, new VarcharValueMapper());
+        valueMappers.putIfAbsent(Types.INTEGER, new IntegerValueMapper());
+        valueMappers.putIfAbsent(Types.BIGINT, new BigintValueMapper());
+        valueMappers.putIfAbsent(Types.TIMESTAMP, new TimestampValueMapper());
+        valueMappers.putIfAbsent(Types.DATE, new DateValueMapper());
 
         // 较少使用
-        values.putIfAbsent(Types.CHAR, new CharValueMapper());
-        values.putIfAbsent(Types.NCHAR, new NCharValueMapper());
-        values.putIfAbsent(Types.NVARCHAR, new NVarcharValueMapper());
-        values.putIfAbsent(Types.LONGVARCHAR, new LongVarcharValueMapper());
-        values.putIfAbsent(Types.NUMERIC, new NumberValueMapper());
-        values.putIfAbsent(Types.BINARY, new BinaryValueMapper());
+        valueMappers.putIfAbsent(Types.CHAR, new CharValueMapper());
+        valueMappers.putIfAbsent(Types.NCHAR, new NCharValueMapper());
+        valueMappers.putIfAbsent(Types.NVARCHAR, new NVarcharValueMapper());
+        valueMappers.putIfAbsent(Types.LONGVARCHAR, new LongVarcharValueMapper());
+        valueMappers.putIfAbsent(Types.NUMERIC, new NumberValueMapper());
+        valueMappers.putIfAbsent(Types.BINARY, new BinaryValueMapper());
 
         // 很少使用
-        values.putIfAbsent(Types.SMALLINT, new SmallintValueMapper());
-        values.putIfAbsent(Types.TINYINT, new TinyintValueMapper());
-        values.putIfAbsent(Types.TIME, new TimeValueMapper());
-        values.putIfAbsent(Types.DECIMAL, new DecimalValueMapper());
-        values.putIfAbsent(Types.DOUBLE, new DoubleValueMapper());
-        values.putIfAbsent(Types.FLOAT, new FloatValueMapper());
-        values.putIfAbsent(Types.BIT, new BitValueMapper());
-        values.putIfAbsent(Types.BLOB, new BlobValueMapper());
-        values.putIfAbsent(Types.CLOB, new ClobValueMapper());
-        values.putIfAbsent(Types.NCLOB, new NClobValueMapper());
-        values.putIfAbsent(Types.ROWID, new RowIdValueMapper());
-        values.putIfAbsent(Types.REAL, new RealValueMapper());
-        values.putIfAbsent(Types.VARBINARY, new VarBinaryValueMapper());
-        values.putIfAbsent(Types.LONGVARBINARY, new LongVarBinaryValueMapper());
-        values.putIfAbsent(Types.OTHER, new OtherValueMapper());
-    }
-
-    /**
-     * 获取值转换配置
-     *
-     * @return
-     */
-    protected Map<Integer, ValueMapper> getValueMapper() {
-        return values;
+        valueMappers.putIfAbsent(Types.SMALLINT, new SmallintValueMapper());
+        valueMappers.putIfAbsent(Types.TINYINT, new TinyintValueMapper());
+        valueMappers.putIfAbsent(Types.TIME, new TimeValueMapper());
+        valueMappers.putIfAbsent(Types.DECIMAL, new DecimalValueMapper());
+        valueMappers.putIfAbsent(Types.DOUBLE, new DoubleValueMapper());
+        valueMappers.putIfAbsent(Types.FLOAT, new FloatValueMapper());
+        valueMappers.putIfAbsent(Types.BIT, new BitValueMapper());
+        valueMappers.putIfAbsent(Types.BLOB, new BlobValueMapper());
+        valueMappers.putIfAbsent(Types.CLOB, new ClobValueMapper());
+        valueMappers.putIfAbsent(Types.NCLOB, new NClobValueMapper());
+        valueMappers.putIfAbsent(Types.ROWID, new RowIdValueMapper());
+        valueMappers.putIfAbsent(Types.REAL, new RealValueMapper());
+        valueMappers.putIfAbsent(Types.VARBINARY, new VarBinaryValueMapper());
+        valueMappers.putIfAbsent(Types.LONGVARBINARY, new LongVarBinaryValueMapper());
+        valueMappers.putIfAbsent(Types.OTHER, new OtherValueMapper());
     }
 
     /**
@@ -71,12 +62,14 @@ public abstract class AbstractConnector {
         }
 
         // 获取字段映射规则
-        final Map<Integer, ValueMapper> mappers = getValueMapper();
         for (Map row : config.getData()) {
             // 根据目标字段类型转换值
             for (Field f : config.getFields()) {
+                if(null == f){
+                    continue;
+                }
                 // 根据字段类型转换值
-                final ValueMapper valueMapper = mappers.get(f.getType());
+                final ValueMapper valueMapper = valueMappers.get(f.getType());
                 if (null != valueMapper) {
                     // 当数据类型不同时,转换值类型
                     row.put(f.getName(), valueMapper.convertValue(connectorMapper, row.get(f.getName())));

+ 11 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractValueMapper.java

@@ -22,9 +22,19 @@ public abstract class AbstractValueMapper<T> implements ValueMapper {
      */
     protected abstract T convert(ConnectorMapper connectorMapper, Object val) throws Exception;
 
+    /**
+     * 获取默认值
+     *
+     * @param val
+     * @return
+     */
+    protected Object getDefaultVal(Object val) throws Exception {
+        return val;
+    }
+
     @Override
     public Object convertValue(ConnectorMapper connectorMapper, Object val) throws Exception {
         // 当数据类型不同时,返回转换值
-        return null != val && !val.getClass().equals(parameterClazz) ? convert(connectorMapper, val) : val;
+        return null != val && !val.getClass().equals(parameterClazz) ? convert(connectorMapper, val) : getDefaultVal(val);
     }
 }

+ 3 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java

@@ -7,11 +7,11 @@ public class ReaderConfig {
 
     private Map<String, String> command;
     private List<Object> args;
-    private String cursor;
+    private Object cursor;
     private int pageIndex;
     private int pageSize;
 
-    public ReaderConfig(Map<String,String> command, List<Object> args, String cursor, int pageIndex, int pageSize) {
+    public ReaderConfig(Map<String,String> command, List<Object> args, Object cursor, int pageIndex, int pageSize) {
         this.command = command;
         this.args = args;
         this.cursor = cursor;
@@ -27,7 +27,7 @@ public class ReaderConfig {
         return args;
     }
 
-    public String getCursor() {
+    public Object getCursor() {
         return cursor;
     }
 

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -110,7 +110,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     @Override
     public Result reader(DatabaseConnectorMapper connectorMapper, ReaderConfig config) {
         // 1、获取select SQL
-        String queryKey = enableCursor() && StringUtil.isBlank(config.getCursor()) ? ConnectorConstant.OPERTION_QUERY_CURSOR : SqlBuilderEnum.QUERY.getName();
+        String queryKey = enableCursor() && null == config.getCursor() ? ConnectorConstant.OPERTION_QUERY_CURSOR : SqlBuilderEnum.QUERY.getName();
         String querySql = config.getCommand().get(queryKey);
         Assert.hasText(querySql, "查询语句不能为空.");
 
@@ -165,7 +165,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         if (null != execute) {
             int batchSize = execute.length;
             for (int i = 0; i < batchSize; i++) {
-                if (execute[i] == 0) {
+                if (execute[i] == 0 || execute[i] == -2) {
                     forceUpdate(result, connectorMapper, config, pkField, data.get(i));
                     continue;
                 }

+ 2 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseValueMapper.java

@@ -3,7 +3,6 @@ package org.dbsyncer.connector.database;
 import com.microsoft.sqlserver.jdbc.Geometry;
 import oracle.jdbc.OracleConnection;
 import oracle.spatial.geometry.JGeometry;
-import oracle.sql.STRUCT;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
 
@@ -48,15 +47,14 @@ public class DatabaseValueMapper {
         return connection.createClob();
     }
 
-    public Struct getSTRUCT(byte[] val) throws SQLException {
+    public Struct getStruct(byte[] val) throws SQLException {
         if (connection.getConnection() instanceof OracleConnection) {
             OracleConnection conn = connection.unwrap(OracleConnection.class);
             Geometry geometry = Geometry.deserialize(val);
             Double x = geometry.getX();
             Double y = geometry.getY();
             JGeometry jGeometry = new JGeometry(x, y, 0);
-            STRUCT struct = JGeometry.store(jGeometry, conn);
-            return struct;
+            return JGeometry.store(jGeometry, conn);
         }
         throw new ConnectorException(String.format("%s can not get STRUCT [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }

+ 6 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -21,7 +21,7 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
         String queryFilter = config.getSqlBuilderConfig().getQueryFilter();
         if (StringUtil.isNotBlank(queryFilter)) {
             querySql.append(" AND ");
-        }else {
+        } else {
             querySql.append(" WHERE ");
         }
         querySql.append(quotation).append(pk).append(quotation).append(" > ? ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
@@ -29,7 +29,7 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public String getPageCursorSql(PageSql config){
+    public String getPageCursorSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
         final String pk = config.getPk();
         // select * from test.`my_user` order by `id` limit ?
@@ -40,11 +40,11 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
     @Override
     public Object[] getPageArgs(ReaderConfig config) {
         int pageSize = config.getPageSize();
-        String cursor = config.getCursor();
-        if (StringUtil.isBlank(cursor)) {
-            return new Object[] {pageSize};
+        Object cursor = config.getCursor();
+        if (null == cursor) {
+            return new Object[]{pageSize};
         }
-        return new Object[] {cursor, pageSize};
+        return new Object[]{cursor, pageSize};
     }
 
     @Override

+ 5 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -6,7 +6,9 @@ import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
+import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.model.Table;
 
 public final class OracleConnector extends AbstractDatabaseConnector {
 
@@ -34,15 +36,14 @@ public final class OracleConnector extends AbstractDatabaseConnector {
 
     @Override
     protected String getQueryCountSql(CommandConfig commandConfig, String schema, String quotation, String queryFilterSql) {
-        // 有过滤条件,走默认方式
-        if (StringUtil.isNotBlank(queryFilterSql)) {
+        final Table table = commandConfig.getTable();
+        if (StringUtil.isNotBlank(queryFilterSql) || TableTypeEnum.isView(table.getType())) {
             return super.getQueryCountSql(commandConfig, schema, quotation, queryFilterSql);
         }
 
         // 从系统表查询
-        final String table = commandConfig.getTable().getName();
         DatabaseConfig cfg = (DatabaseConfig) commandConfig.getConnectorConfig();
-        return String.format("SELECT NUM_ROWS FROM ALL_TABLES WHERE OWNER = '%s' AND TABLE_NAME = '%s'", cfg.getUsername().toUpperCase(), table);
+        return String.format("SELECT NUM_ROWS FROM ALL_TABLES WHERE OWNER = '%s' AND TABLE_NAME = '%s'", cfg.getUsername().toUpperCase(), table.getName());
     }
 
     @Override

+ 39 - 12
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -4,39 +4,66 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
-import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
+import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.model.Table;
 
 public final class PostgreSQLConnector extends AbstractDatabaseConnector {
 
+    @Override
+    protected String buildSqlWithQuotation() {
+        return "\"";
+    }
+
     @Override
     public String getPageSql(PageSql config) {
-        return config.getQuerySql() + DatabaseConstant.POSTGRESQL_PAGE_SQL;
+        final String quotation = buildSqlWithQuotation();
+        final String pk = config.getPk();
+        StringBuilder querySql = new StringBuilder(config.getQuerySql());
+        String queryFilter = config.getSqlBuilderConfig().getQueryFilter();
+        if (StringUtil.isNotBlank(queryFilter)) {
+            querySql.append(" AND ");
+        } else {
+            querySql.append(" WHERE ");
+        }
+        querySql.append(quotation).append(pk).append(quotation).append(" > ? ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
+        return querySql.toString();
     }
 
     @Override
-    public Object[] getPageArgs(ReaderConfig config) {
-        int pageSize = config.getPageSize();
-        int pageIndex = config.getPageIndex();
-        return new Object[]{pageSize, (pageIndex - 1) * pageSize};
+    public String getPageCursorSql(PageSql config) {
+        final String quotation = buildSqlWithQuotation();
+        final String pk = config.getPk();
+        StringBuilder querySql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
+        return querySql.toString();
     }
 
     @Override
-    protected String buildSqlWithQuotation() {
-        return "\"";
+    public Object[] getPageArgs(ReaderConfig config) {
+        int pageSize = config.getPageSize();
+        Object cursor = config.getCursor();
+        if (null == cursor) {
+            return new Object[]{pageSize};
+        }
+        return new Object[]{cursor, pageSize};
     }
 
     @Override
     protected String getQueryCountSql(CommandConfig commandConfig, String schema, String quotation, String queryFilterSql) {
-        // 有过滤条件,走默认方式
-        if (StringUtil.isNotBlank(queryFilterSql)) {
+        final Table table = commandConfig.getTable();
+        if (StringUtil.isNotBlank(queryFilterSql) || TableTypeEnum.isView(table.getType())) {
             return super.getQueryCountSql(commandConfig, schema, quotation, queryFilterSql);
         }
 
         // 从系统表查询
-        final String table = commandConfig.getTable().getName();
         DatabaseConfig cfg = (DatabaseConfig) commandConfig.getConnectorConfig();
-        return String.format("SELECT N_LIVE_TUP FROM PG_STAT_USER_TABLES WHERE SCHEMANAME='%s' AND RELNAME='%s'", cfg.getSchema(), table);
+        return String.format("SELECT N_LIVE_TUP FROM PG_STAT_USER_TABLES WHERE SCHEMANAME='%s' AND RELNAME='%s'", cfg.getSchema(), table.getName());
     }
+
+    @Override
+    protected boolean enableCursor() {
+        return true;
+    }
+
 }

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/BinaryValueMapper.java

@@ -11,6 +11,11 @@ import org.dbsyncer.connector.ConnectorMapper;
  */
 public class BinaryValueMapper extends AbstractValueMapper<byte[]> {
 
+    @Override
+    protected Object getDefaultVal(Object val) {
+        return null != val ? val : new byte[0];
+    }
+
     @Override
     protected byte[] convert(ConnectorMapper connectorMapper, Object val) {
         if(val instanceof String){

+ 6 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/DoubleValueMapper.java

@@ -4,6 +4,8 @@ import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
 
+import java.math.BigDecimal;
+
 /**
  * @author AE86
  * @version 1.0.0
@@ -13,6 +15,10 @@ public class DoubleValueMapper extends AbstractValueMapper<Double> {
 
     @Override
     protected Double convert(ConnectorMapper connectorMapper, Object val) {
+        if (val instanceof BigDecimal) {
+            BigDecimal bigDecimal = (BigDecimal) val;
+            return bigDecimal.doubleValue();
+        }
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
 }

+ 28 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/GeometryValueMapper.java

@@ -0,0 +1,28 @@
+package org.dbsyncer.connector.schema;
+
+import com.microsoft.sqlserver.jdbc.Geometry;
+import com.microsoft.sqlserver.jdbc.SQLServerException;
+import org.dbsyncer.connector.AbstractValueMapper;
+import org.dbsyncer.connector.ConnectorException;
+import org.dbsyncer.connector.ConnectorMapper;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/8/24 23:43
+ */
+public class GeometryValueMapper extends AbstractValueMapper<Geometry> {
+
+    @Override
+    protected Object getDefaultVal(Object val) throws SQLServerException {
+        return null != val ? val : Geometry.point(0, 0, 0);
+    }
+
+    @Override
+    protected Geometry convert(ConnectorMapper connectorMapper, Object val) throws SQLServerException {
+        if (val instanceof byte[]) {
+            return Geometry.deserialize((byte[]) val);
+        }
+        throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
+    }
+}

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/LongVarBinaryValueMapper.java

@@ -11,6 +11,11 @@ import org.dbsyncer.connector.ConnectorMapper;
  */
 public class LongVarBinaryValueMapper extends AbstractValueMapper<byte[]> {
 
+    @Override
+    protected Object getDefaultVal(Object val) {
+        return null != val ? val : new byte[0];
+    }
+
     @Override
     protected byte[] convert(ConnectorMapper connectorMapper, Object val) {
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));

+ 2 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/OtherValueMapper.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.connector.schema;
 
-import oracle.sql.STRUCT;
 import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
@@ -19,7 +18,7 @@ public class OtherValueMapper extends AbstractValueMapper<Struct> {
 
     @Override
     protected Struct convert(ConnectorMapper connectorMapper, Object val) throws Exception {
-        if (val instanceof STRUCT) {
+        if (val instanceof oracle.sql.STRUCT) {
             return (Struct) val;
         }
         // SqlServer Geometry
@@ -27,7 +26,7 @@ public class OtherValueMapper extends AbstractValueMapper<Struct> {
             Object connection = connectorMapper.getConnection();
             if (connection instanceof Connection) {
                 final DatabaseValueMapper mapper = new DatabaseValueMapper((SimpleConnection) connection);
-                return mapper.getSTRUCT((byte[]) val);
+                return mapper.getStruct((byte[]) val);
             }
         }
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));

+ 4 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/TinyintValueMapper.java

@@ -17,12 +17,14 @@ public class TinyintValueMapper extends AbstractValueMapper<Integer> {
             Short s = (Short) val;
             return new Integer(s);
         }
-
         if (val instanceof Boolean) {
             Boolean b = (Boolean) val;
             return new Integer(b ? 1 : 0);
         }
-
+        if (val instanceof String) {
+            String s = (String) val;
+            return new Integer(s);
+        }
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
 }

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/VarBinaryValueMapper.java

@@ -11,6 +11,11 @@ import org.dbsyncer.connector.ConnectorMapper;
  */
 public class VarBinaryValueMapper extends AbstractValueMapper<byte[]> {
 
+    @Override
+    protected Object getDefaultVal(Object val) {
+        return null != val ? val : new byte[0];
+    }
+
     @Override
     protected byte[] convert(ConnectorMapper connectorMapper, Object val) {
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));

+ 11 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -11,7 +11,9 @@ import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.schema.GeometryValueMapper;
 
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -22,6 +24,10 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
 
     private static final String QUERY_TABLE = "select name from sys.tables where schema_id = schema_id('%s') and is_ms_shipped = 0";
 
+    static {
+        valueMappers.put(Types.VARBINARY, new GeometryValueMapper());
+    }
+
     @Override
     public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
         DatabaseConfig config = connectorMapper.getConfig();
@@ -44,16 +50,15 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
 
     @Override
     protected String getQueryCountSql(CommandConfig commandConfig, String schema, String quotation, String queryFilterSql) {
-        // 有过滤条件,走默认方式
-        if (StringUtil.isNotBlank(queryFilterSql)) {
-            String table = commandConfig.getTable().getName();
-            return new StringBuilder("SELECT COUNT(1) FROM ").append(schema).append(quotation).append(table).append(quotation).append(queryFilterSql).toString();
+        // 视图或有过滤条件,走默认方式
+        final Table table = commandConfig.getTable();
+        if (StringUtil.isNotBlank(queryFilterSql) || TableTypeEnum.isView(table.getType())) {
+            return new StringBuilder("SELECT COUNT(1) FROM ").append(schema).append(quotation).append(table.getName()).append(quotation).append(queryFilterSql).toString();
         }
 
-        String table = commandConfig.getTable().getName();
         DatabaseConfig cfg = (DatabaseConfig) commandConfig.getConnectorConfig();
         // 从存储过程查询(定时更新总数,可能存在误差)
-        return String.format("select rows from sysindexes where id = object_id('%s.%s') and indid in (0, 1)", cfg.getSchema(), table);
+        return String.format("select rows from sysindexes where id = object_id('%s.%s') and indid in (0, 1)", cfg.getSchema(), table.getName());
     }
 
     private List<Table> getTables(DatabaseConnectorMapper connectorMapper, String sql, TableTypeEnum type) {

+ 72 - 0
dbsyncer-connector/src/main/test/ConnectionTest.java

@@ -1,4 +1,5 @@
 import oracle.jdbc.OracleConnection;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
@@ -12,6 +13,7 @@ import org.springframework.jdbc.core.BatchPreparedStatementSetter;
 
 import java.nio.charset.Charset;
 import java.sql.*;
+import java.time.Instant;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.List;
@@ -103,6 +105,76 @@ public class ConnectionTest {
         logger.info("test end");
     }
 
+    @Test
+    public void testBatchInsert() {
+        final DatabaseConnectorMapper connectorMapper = new DatabaseConnectorMapper(createPostgresConfig());
+
+        long begin = Instant.now().toEpochMilli();
+        final int threadSize = 10;
+        final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
+
+        // 模拟1000w条数据
+        List<Object[]> dataList = new ArrayList<>();
+        for (int i = 1; i <= 10000001; i++) {
+            Object[] args = new Object[5];
+            args[0] = i;
+            args[1] = "mathaaaaaaaaaaaaaaaaaa";
+            args[2] = 9999;
+            args[3] = 8888;
+            args[4] = "888899999999999999999999999999蓉儿UN是代付款房间里的解放东路来得及分类的恢复力度菲欧";
+            dataList.add(args);
+
+            if (i % 10000 == 0) {
+                System.out.println(i + "-----------------正在处理");
+                batchInsert(connectorMapper, pool, dataList, 1000);
+                dataList.clear();
+            }
+        }
+
+        if(!CollectionUtils.isEmpty(dataList)){
+            System.out.println("-----------------正在处理剩余数据");
+            batchInsert(connectorMapper, pool, dataList, 1000);
+        }
+
+        pool.shutdown();
+        logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
+    }
+
+    private void batchInsert(DatabaseConnectorMapper connectorMapper, ExecutorService pool, List<Object[]> dataList, int batchSize) {
+        int total = dataList.size();
+        int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
+        final String sql = "insert into t_course2 (id,course,score,user_id,memo) VALUES (?,?,?,?,?)";
+        final CountDownLatch latch = new CountDownLatch(taskSize);
+        int fromIndex = 0;
+        int toIndex = batchSize;
+        for (int i = 0; i < taskSize; i++) {
+            final List<Object[]> data;
+            if (toIndex > total) {
+                toIndex = fromIndex + (total % batchSize);
+                data = dataList.subList(fromIndex, toIndex);
+            } else {
+                data = dataList.subList(fromIndex, toIndex);
+                fromIndex += batchSize;
+                toIndex += batchSize;
+            }
+
+            pool.submit(() -> {
+                try {
+                    connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, data));
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        }
+    }
+
     @Test
     public void testReadSchema() {
         getTables(createOracleConfig(), "test", "AE86", "MY_ORG");

+ 1 - 1
dbsyncer-listener/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.0-Beta</version>
+        <version>1.2.1-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-listener</artifactId>

+ 1 - 1
dbsyncer-manager/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.0-Beta</version>
+        <version>1.2.1-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-manager</artifactId>

+ 11 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -98,7 +98,9 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         Meta meta = manager.getMeta(task.getId());
         Map<String, String> snapshot = meta.getSnapshot();
         task.setPageIndex(NumberUtil.toInt(snapshot.get(ParserEnum.PAGE_INDEX.getCode()), ParserEnum.PAGE_INDEX.getDefaultValue()));
-        task.setCursor(snapshot.get(ParserEnum.CURSOR.getCode()));
+        // 反序列化游标值类型(通常为数字或字符串类型)
+        String cursorValue = snapshot.get(ParserEnum.CURSOR.getCode());
+        task.setCursor(NumberUtil.isCreatable(cursorValue) ? NumberUtil.toLong(cursorValue) : cursorValue);
         task.setTableGroupIndex(NumberUtil.toInt(snapshot.get(ParserEnum.TABLE_GROUP_INDEX.getCode()), ParserEnum.TABLE_GROUP_INDEX.getDefaultValue()));
         flush(task);
 
@@ -109,7 +111,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
             }
             parser.execute(task, mapping, list.get(i), executorService);
             task.setPageIndex(ParserEnum.PAGE_INDEX.getDefaultValue());
-            task.setCursor("");
+            task.setCursor(null);
             task.setTableGroupIndex(++i);
             flush(task);
         }
@@ -117,6 +119,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         // 记录结束时间
         task.setEndTime(Instant.now().toEpochMilli());
         task.setTableGroupIndex(ParserEnum.TABLE_GROUP_INDEX.getDefaultValue());
+        task.setFinished(true);
         flush(task);
     }
 
@@ -130,11 +133,16 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
             meta.getTotal().set(finished);
         }
 
+        // 同步实际完成总数(读取的系统表存在误差,执行的过程中,总数可能有变化)
+        if(task.isFinished() && meta.getTotal().get() != finished){
+            meta.getTotal().set(finished);
+        }
+
         meta.setBeginTime(task.getBeginTime());
         meta.setEndTime(task.getEndTime());
         Map<String, String> snapshot = meta.getSnapshot();
         snapshot.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(task.getPageIndex()));
-        snapshot.put(ParserEnum.CURSOR.getCode(), task.getCursor());
+        snapshot.put(ParserEnum.CURSOR.getCode(), String.valueOf(task.getCursor()));
         snapshot.put(ParserEnum.TABLE_GROUP_INDEX.getCode(), String.valueOf(task.getTableGroupIndex()));
         manager.editMeta(meta);
     }

+ 1 - 1
dbsyncer-monitor/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.0-Beta</version>
+        <version>1.2.1-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-monitor</artifactId>

+ 1 - 1
dbsyncer-parser/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.0-Beta</version>
+        <version>1.2.1-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-parser</artifactId>

+ 6 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/AbstractWriterBinlog.java

@@ -4,6 +4,7 @@ import com.google.protobuf.ByteString;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
@@ -64,10 +65,10 @@ public abstract class AbstractWriterBinlog extends AbstractBinlogRecorder<Writer
         final TableGroup tableGroup = cacheService.get(message.getTableGroupId(), TableGroup.class);
 
         // 2、反序列数据
+        Map<String, Object> data = new HashMap<>();
         try {
             final Picker picker = new Picker(tableGroup.getFieldMapping());
             final Map<String, Field> fieldMap = picker.getTargetFieldMap();
-            Map<String, Object> data = new HashMap<>();
             message.getData().getRowMap().forEach((k, v) -> {
                 if (fieldMap.containsKey(k)) {
                     data.put(k, BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v));
@@ -75,7 +76,10 @@ public abstract class AbstractWriterBinlog extends AbstractBinlogRecorder<Writer
             });
             return new WriterRequest(messageId, message.getTableGroupId(), message.getEvent().name(), data);
         } catch (Exception e) {
-            logger.error(e.getMessage());
+            Table sTable = tableGroup.getSourceTable();
+            Table tTable = tableGroup.getTargetTable();
+            logger.error("messageId:{}, sTable:{}, tTable:{}, event:{}, data:{}", messageId, sTable.getName(), tTable.getName(), message.getEvent().name(), data);
+            logger.error(messageId, e);
         }
         return null;
     }

+ 3 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -276,7 +276,7 @@ public class ParserFactory implements Parser {
             pluginFactory.convert(group.getPlugin(), data, target);
 
             // 5、写入目标源
-            BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, sTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
+            BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, tTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
             Result writer = writeBatch(batchWriter, executorService);
 
             // 6、更新结果
@@ -425,12 +425,8 @@ public class ParserFactory implements Parser {
      * @param pk
      * @return
      */
-    private String getLastCursor(List<Map> data, String pk) {
-        if(CollectionUtils.isEmpty(data)){
-            return "";
-        }
-        Object value = data.get(data.size() - 1).get(pk);
-        return value == null ? "" : String.valueOf(value);
+    private Object getLastCursor(List<Map> data, String pk) {
+        return CollectionUtils.isEmpty(data) ? null : data.get(data.size() - 1).get(pk);
     }
 
 }

+ 9 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.cache.CacheService;
+import org.dbsyncer.common.config.IncrementDataConfig;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
@@ -16,14 +17,15 @@ import org.springframework.util.Assert;
  */
 public abstract class AbstractFlushStrategy implements FlushStrategy {
 
-    private static final int MAX_ERROR_LENGTH = 1000;
-
     @Autowired
     private FlushService flushService;
 
     @Autowired
     private CacheService cacheService;
 
+    @Autowired
+    private IncrementDataConfig flushDataConfig;
+
     @Override
     public void flushFullData(String metaId, Result result, String event) {
         flush(metaId, result, event);
@@ -37,11 +39,13 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     protected void flush(String metaId, Result result, String event) {
         refreshTotal(metaId, result);
 
-        if (!CollectionUtils.isEmpty(result.getFailData())) {
-            final String error = StringUtil.substring(result.getError().toString(), 0, MAX_ERROR_LENGTH);
+        if (flushDataConfig.isWriterFail() && !CollectionUtils.isEmpty(result.getFailData())) {
+            final String error = StringUtil.substring(result.getError().toString(), 0, flushDataConfig.getMaxErrorLength());
             flushService.write(metaId, event, false, result.getFailData(), error);
         }
-        if (!CollectionUtils.isEmpty(result.getSuccessData())) {
+
+        // 是否写增量数据
+        if (flushDataConfig.isWriterSuccess() && !CollectionUtils.isEmpty(result.getSuccessData())) {
             flushService.write(metaId, event, true, result.getSuccessData(), "");
         }
     }

+ 5 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser.flush.impl;
 
 import com.alibaba.fastjson.JSONException;
+import org.dbsyncer.common.config.IncrementDataConfig;
 import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
@@ -44,6 +45,9 @@ public class FlushServiceImpl implements FlushService {
     @Autowired
     private BufferActuator storageBufferActuator;
 
+    @Autowired
+    private IncrementDataConfig flushDataConfig;
+
     @Override
     public void asyncWrite(String type, String error) {
         Map<String, Object> params = new HashMap();
@@ -81,7 +85,7 @@ public class FlushServiceImpl implements FlushService {
      * @return
      */
     private String substring(String error){
-        return StringUtil.isNotBlank(error) ? StringUtil.substring(error, 0, 2048) : error;
+        return StringUtil.substring(error, 0, flushDataConfig.getMaxErrorLength());
     }
 
 }

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -78,10 +78,10 @@ public class Picker {
     }
 
     public List<Field> getTargetFields() {
-        return targetFields;
+        return targetFields.stream().filter(f -> null != f).collect(Collectors.toList());
     }
 
     public Map<String, Field> getTargetFieldMap() {
-        return targetFields.stream().collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
+        return getTargetFields().stream().collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
     }
 }

+ 13 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Task.java

@@ -10,12 +10,14 @@ public class Task {
 
     private int pageIndex;
 
-    private String cursor;
+    private Object cursor;
 
     private long beginTime;
 
     private long endTime;
 
+    private boolean finished;
+
     public Task() {
     }
 
@@ -56,11 +58,11 @@ public class Task {
         this.pageIndex = pageIndex;
     }
 
-    public String getCursor() {
+    public Object getCursor() {
         return cursor;
     }
 
-    public void setCursor(String cursor) {
+    public void setCursor(Object cursor) {
         this.cursor = cursor;
     }
 
@@ -80,6 +82,14 @@ public class Task {
         this.endTime = endTime;
     }
 
+    public boolean isFinished() {
+        return finished;
+    }
+
+    public void setFinished(boolean finished) {
+        this.finished = finished;
+    }
+
     public enum StateEnum {
         /**
          * 运行

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableFlushStrategy.java

@@ -12,7 +12,7 @@ import org.springframework.stereotype.Component;
  * @date 2021/11/18 22:21
  */
 @Component
-@ConditionalOnProperty(value = "dbsyncer.parser.flush.full.enabled", havingValue = "true")
+@ConditionalOnProperty(value = "dbsyncer.parser.flush.full.data.enabled", havingValue = "true")
 public final class EnableFlushStrategy extends AbstractFlushStrategy {
 
 }

+ 1 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/EnableWriterBufferActuatorStrategy.java

@@ -32,6 +32,7 @@ public final class EnableWriterBufferActuatorStrategy extends AbstractWriterBinl
     public void execute(String tableGroupId, String event, Map<String, Object> data) {
         if (getQueue().size() >= limit) {
             super.flush(tableGroupId, event, data);
+            return;
         }
         writerBufferActuator.offer(new WriterRequest(tableGroupId, event, data));
     }

+ 1 - 1
dbsyncer-plugin/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.0-Beta</version>
+        <version>1.2.1-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-plugin</artifactId>

+ 1 - 1
dbsyncer-storage/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.0-Beta</version>
+        <version>1.2.1-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-storage</artifactId>

+ 7 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogColumnValue.java

@@ -38,8 +38,14 @@ public class BinlogColumnValue extends AbstractColumnValue<ByteString> {
 
     @Override
     public Integer asInteger() {
+        byte[] bytes = asByteArray();
+        if (bytes.length == 2) {
+            Short aShort = asShort();
+            return new Integer(aShort);
+        }
+
         buffer.clear();
-        buffer.put(asByteArray(), 0, 4);
+        buffer.put(bytes, 0, 4);
         buffer.flip();
         return buffer.asIntBuffer().get();
     }

+ 1 - 1
dbsyncer-web/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.0-Beta</version>
+        <version>1.2.1-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-web</artifactId>

+ 17 - 14
dbsyncer-web/src/main/resources/application.properties

@@ -1,33 +1,36 @@
 server.ip=127.0.0.1
 server.port=18686
 #web
+dbsyncer.web.worker.id=1
 dbsyncer.web.login.username=admin
 dbsyncer.web.login.password=0DPiKuNIrrVmD8IUCuw1hQxNqZc=
-server.servlet.session.timeout=1800
-server.servlet.context-path=/
-#dbsyncer.common.worker.id=1
 dbsyncer.web.task.scheduler.pool-size=8
 dbsyncer.web.task.executor.core-size=10
 dbsyncer.web.task.executor.queue-capacity=1000
+server.servlet.session.timeout=1800
+server.servlet.context-path=/
 
 #parser
+dbsyncer.parser.flush.full.data.enabled=false
+dbsyncer.parser.flush.increment.data.writer-success=true
+dbsyncer.parser.flush.increment.data.writer-fail=true
+dbsyncer.parser.flush.increment.data.max-error-length=2048
 dbsyncer.parser.flush.buffer.actuator.speed.enabled=true
-#dbsyncer.parser.flush.buffer.actuator.writer-batch-count=100
-#dbsyncer.parser.flush.buffer.actuator.batch-count=1000
-#dbsyncer.parser.flush.buffer.actuator.queue-capacity=50000
-#dbsyncer.parser.flush.buffer.actuator.period-millisecond=300
-#dbsyncer.parser.flush.full.enabled=true
+dbsyncer.parser.flush.buffer.actuator.writer-batch-count=100
+dbsyncer.parser.flush.buffer.actuator.batch-count=1000
+dbsyncer.parser.flush.buffer.actuator.queue-capacity=50000
+dbsyncer.parser.flush.buffer.actuator.period-millisecond=300
 
 #storage
+dbsyncer.storage.binlog.recorder.batch-count=1000
+dbsyncer.storage.binlog.recorder.max-processing-seconds=120
+dbsyncer.storage.binlog.recorder.queue-capacity=10000
+dbsyncer.storage.binlog.recorder.writer-period-millisecond=500
+dbsyncer.storage.binlog.recorder.reader-period-millisecond=2000
 #dbsyncer.storage.support.mysql.enabled=true
 #dbsyncer.storage.support.mysql.config.url=jdbc:mysql://127.0.0.1:3306/dbsyncer?rewriteBatchedStatements=true&seUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false
 #dbsyncer.storage.support.mysql.config.username=root
 #dbsyncer.storage.support.mysql.config.password=123
-#dbsyncer.storage.binlog.recorder.batch-count=1000
-#dbsyncer.storage.binlog.recorder.max-processing-seconds=120
-#dbsyncer.storage.binlog.recorder.queue-capacity=10000
-#dbsyncer.storage.binlog.recorder.writer-period-millisecond=500
-#dbsyncer.storage.binlog.recorder.reader-period-millisecond=2000
 
 #monitor
 management.endpoints.web.base-path=/app
@@ -35,7 +38,7 @@ management.endpoints.web.exposure.include=*
 management.endpoint.health.show-details=always
 management.health.elasticsearch.enabled=false
 info.app.name=DBSyncer
-info.app.version=1.2.0-Beta
+info.app.version=1.2.1-RC
 info.app.copyright=&copy;2022 ${info.app.name}(${info.app.version})<footer>Designed By <a href='https://gitee.com/ghi/dbsyncer' target='_blank' >AE86</a></footer>
 
 #All < Trace < Debug < Info < Warn < Error < Fatal < OFF

+ 16 - 1
pom.xml

@@ -6,7 +6,7 @@
 
     <groupId>org.ghi</groupId>
     <artifactId>dbsyncer</artifactId>
-    <version>1.2.0-Beta</version>
+    <version>1.2.1-RC</version>
     <packaging>pom</packaging>
     <name>dbsyncer</name>
     <url>https://gitee.com/ghi/dbsyncer</url>
@@ -72,6 +72,21 @@
             </snapshots>
         </repository>
 
+        <!-- atlassian仓库 -->
+        <repository>
+            <id>atlassian-public</id>
+            <url>https://packages.atlassian.com/mvn/maven-external/</url>
+            <snapshots>
+                <enabled>true</enabled>
+                <updatePolicy>never</updatePolicy>
+                <checksumPolicy>warn</checksumPolicy>
+            </snapshots>
+            <releases>
+                <enabled>true</enabled>
+                <checksumPolicy>warn</checksumPolicy>
+            </releases>
+        </repository>
+
     </repositories>
 
     <!-- 统一管理第三方依赖jar -->