Procházet zdrojové kódy

优化mysql全量分页性能

AE86 před 2 roky
rodič
revize
e180e84d81
21 změnil soubory, kde provedl 304 přidání a 137 odebrání
  1. 16 19
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java
  2. 1 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java
  3. 5 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/constant/ConnectorConstant.java
  4. 115 84
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  5. 13 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/Database.java
  6. 22 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryCursor.java
  7. 5 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/SqlBuilderEnum.java
  8. 36 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java
  9. 6 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java
  10. 6 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java
  11. 5 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLMysqlConnector.java
  12. 5 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java
  13. 5 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLPostgreSQLConnector.java
  14. 4 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java
  15. 4 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java
  16. 3 0
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java
  17. 23 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  18. 5 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/enums/ParserEnum.java
  19. 10 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java
  20. 10 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Task.java
  21. 5 2
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

+ 16 - 19
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java

@@ -7,49 +7,46 @@ public class ReaderConfig {
 
     private Map<String, String> command;
     private List<Object> args;
+    private String cursor;
     private int pageIndex;
     private int pageSize;
 
-    public ReaderConfig(Map<String,String> command, List<Object> args, int pageIndex, int pageSize) {
+    public ReaderConfig(Map<String, String> command, List<Object> args, int pageIndex, int pageSize) {
         this.command = command;
         this.args = args;
         this.pageIndex = pageIndex;
         this.pageSize = pageSize;
     }
 
-    public Map<String, String> getCommand() {
-        return command;
+    public ReaderConfig(Map<String,String> command, List<Object> args, String cursor, int pageIndex, int pageSize) {
+        this.command = command;
+        this.args = args;
+        this.cursor = cursor;
+        this.pageIndex = pageIndex;
+        this.pageSize = pageSize;
     }
 
-    public ReaderConfig setCommand(Map<String, String> command) {
-        this.command = command;
-        return this;
+    public Map<String, String> getCommand() {
+        return command;
     }
 
     public List<Object> getArgs() {
         return args;
     }
 
-    public ReaderConfig setArgs(List<Object> args) {
-        this.args = args;
-        return this;
+    public String getCursor() {
+        return cursor;
     }
 
-    public int getPageIndex() {
-        return pageIndex;
+    public void setCursor(String cursor) {
+        this.cursor = cursor;
     }
 
-    public ReaderConfig setPageIndex(int pageIndex) {
-        this.pageIndex = pageIndex;
-        return this;
+    public int getPageIndex() {
+        return pageIndex;
     }
 
     public int getPageSize() {
         return pageSize;
     }
-
-    public ReaderConfig setPageSize(int pageSize) {
-        this.pageSize = pageSize;
-        return this;
-    }
 }

+ 1 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java

@@ -8,7 +8,6 @@ import java.util.List;
 public class SqlBuilderConfig {
 
     private Database database;
-    private CommandConfig commandConfig;
     // 架构名
     private String schema;
     // 表名
@@ -22,9 +21,8 @@ public class SqlBuilderConfig {
     // 引号
     private String quotation;
 
-    public SqlBuilderConfig(Database database, CommandConfig commandConfig, String schema, String tableName, String pk, List<Field> fields, String queryFilter, String quotation) {
+    public SqlBuilderConfig(Database database, String schema, String tableName, String pk, List<Field> fields, String queryFilter, String quotation) {
         this.database = database;
-        this.commandConfig = commandConfig;
         this.schema = schema;
         this.tableName = tableName;
         this.pk = pk;
@@ -37,10 +35,6 @@ public class SqlBuilderConfig {
         return database;
     }
 
-    public CommandConfig getCommandConfig() {
-        return commandConfig;
-    }
-
     public String getSchema() {
         return schema;
     }

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/constant/ConnectorConstant.java

@@ -27,6 +27,11 @@ public class ConnectorConstant {
      */
     public static final String OPERTION_QUERY = "QUERY";
 
+    /**
+     * 查询游标
+     */
+    public static final String OPERTION_QUERY_CURSOR = "QUERY_CURSOR";
+
     /**
      * 查询过滤条件
      */

+ 115 - 84
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -87,15 +87,15 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
     @Override
     public Result reader(DatabaseConnectorMapper connectorMapper, ReaderConfig config) {
         // 1、获取select SQL
-        String querySql = config.getCommand().get(SqlBuilderEnum.QUERY.getName());
+        String queryKey = enableCursor() && StringUtil.isBlank(config.getCursor()) ? ConnectorConstant.OPERTION_QUERY_CURSOR : SqlBuilderEnum.QUERY.getName();
+        String querySql = config.getCommand().get(queryKey);
         Assert.hasText(querySql, "查询语句不能为空.");
 
         // 2、设置参数
-        Collections.addAll(config.getArgs(), getPageArgs(config.getPageIndex(), config.getPageSize()));
+        Collections.addAll(config.getArgs(), getPageArgs(config));
 
         // 3、执行SQL
-        List<Map<String, Object>> list = connectorMapper.execute(
-                databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
+        List<Map<String, Object>> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
 
         // 4、返回结果集
         return new Result(list);
@@ -174,6 +174,10 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         Map<String, String> map = new HashMap<>();
         String schema = getSchema((DatabaseConfig) commandConfig.getConnectorConfig(), quotation);
         map.put(ConnectorConstant.OPERTION_QUERY, buildSql(ConnectorConstant.OPERTION_QUERY, commandConfig, schema, queryFilterSql));
+        // 是否支持游标
+        if(enableCursor()){
+            map.put(ConnectorConstant.OPERTION_QUERY_CURSOR, buildSql(ConnectorConstant.OPERTION_QUERY_CURSOR, commandConfig, schema, queryFilterSql));
+        }
         // 获取查询总数SQL
         map.put(ConnectorConstant.OPERTION_QUERY_COUNT, getQueryCountSql(commandConfig, schema, quotation, queryFilterSql));
         return map;
@@ -204,6 +208,15 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return map;
     }
 
+    /**
+     * 是否支持游标查询
+     *
+     * @return
+     */
+    protected boolean enableCursor(){
+        return false;
+    }
+
     /**
      * 健康检查
      *
@@ -237,6 +250,34 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return schema.toString();
     }
 
+    /**
+     * 返回主键名称
+     *
+     * @param commandConfig
+     * @param quotation
+     * @return
+     */
+    protected String findOriginalTablePrimaryKey(CommandConfig commandConfig, String quotation) {
+        Table table = commandConfig.getOriginalTable();
+        if (null != table) {
+            List<Field> column = table.getColumn();
+            if (!CollectionUtils.isEmpty(column)) {
+                for (Field c : column) {
+                    if (c.isPk()) {
+                        return new StringBuilder(quotation).append(c.getName()).append(quotation).toString();
+                    }
+                }
+            }
+        }
+
+        DatabaseConfig cfg = (DatabaseConfig) commandConfig.getConnectorConfig();
+        if (StringUtil.isBlank(cfg.getPrimaryKey())) {
+            throw new ConnectorException("Table primary key can not be empty.");
+        }
+
+        return new StringBuilder(quotation).append(cfg.getPrimaryKey()).append(quotation).toString();
+    }
+
     /**
      * 获取表列表
      *
@@ -252,6 +293,74 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return Collections.EMPTY_LIST;
     }
 
+    /**
+     * 获取数据库表元数据信息
+     *
+     * @param databaseTemplate
+     * @param metaSql          查询元数据
+     * @param schema           架构名
+     * @param tableName        表名
+     * @return
+     */
+    protected MetaInfo getMetaInfo(DatabaseTemplate databaseTemplate, String metaSql, String schema, String tableName) throws SQLException {
+        SqlRowSet sqlRowSet = databaseTemplate.queryForRowSet(metaSql);
+        ResultSetWrappingSqlRowSet rowSet = (ResultSetWrappingSqlRowSet) sqlRowSet;
+        SqlRowSetMetaData metaData = rowSet.getMetaData();
+
+        // 查询表字段信息
+        int columnCount = metaData.getColumnCount();
+        if (1 > columnCount) {
+            throw new ConnectorException("查询表字段不能为空.");
+        }
+        List<Field> fields = new ArrayList<>(columnCount);
+        Map<String, List<String>> tables = new HashMap<>();
+        try {
+            Connection connection = databaseTemplate.getConnection();
+            DatabaseMetaData md = connection.getMetaData();
+            final String catalog = connection.getCatalog();
+            schema = StringUtil.isNotBlank(schema) ? schema : null;
+            String name = null;
+            String label = null;
+            String typeName = null;
+            String table = null;
+            int columnType;
+            boolean pk;
+            for (int i = 1; i <= columnCount; i++) {
+                table = StringUtil.isNotBlank(tableName) ? tableName : metaData.getTableName(i);
+                if (null == tables.get(table)) {
+                    tables.putIfAbsent(table, findTablePrimaryKeys(md, catalog, schema, table));
+                }
+                name = metaData.getColumnName(i);
+                label = metaData.getColumnLabel(i);
+                typeName = metaData.getColumnTypeName(i);
+                columnType = metaData.getColumnType(i);
+                pk = isPk(tables, table, name);
+                fields.add(new Field(label, typeName, columnType, pk));
+            }
+        } finally {
+            tables.clear();
+        }
+        return new MetaInfo().setColumn(fields);
+    }
+
+    /**
+     * 获取查询游标SQL
+     *
+     * @param commandConfig
+     * @return
+     */
+    protected String getQueryCursorSql(CommandConfig commandConfig) {
+        final String table = commandConfig.getTable().getName();
+        final String quotation = buildSqlWithQuotation();
+        final String pk = findOriginalTablePrimaryKey(commandConfig, quotation);
+        final String schema = getSchema((DatabaseConfig) commandConfig.getConnectorConfig(), quotation);
+
+        // select `id` from test.`my_user` order by `id` limit ?
+        StringBuilder queryCursor = new StringBuilder();
+        queryCursor.append("SELECT").append(pk).append(" FROM ").append(schema).append(table).append(" ORDER BY ").append(pk).append(" LIMIT 1");
+        return queryCursor.toString();
+    }
+
     /**
      * 获取查询总数SQL
      *
@@ -349,7 +458,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
      * @param queryFilterSQL
      * @return
      */
-    protected String buildSql(String type, CommandConfig commandConfig, String schema, String queryFilterSQL) {
+    private String buildSql(String type, CommandConfig commandConfig, String schema, String queryFilterSQL) {
         Table table = commandConfig.getTable();
         if (null == table) {
             logger.error("Table can not be null.");
@@ -388,88 +497,10 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
             pk = findOriginalTablePrimaryKey(commandConfig, "");
         }
 
-        SqlBuilderConfig config = new SqlBuilderConfig(this, commandConfig, schema, tableName, pk, fields, queryFilterSQL, buildSqlWithQuotation());
+        SqlBuilderConfig config = new SqlBuilderConfig(this, schema, tableName, pk, fields, queryFilterSQL, buildSqlWithQuotation());
         return SqlBuilderEnum.getSqlBuilder(type).buildSql(config);
     }
 
-    /**
-     * 获取数据库表元数据信息
-     *
-     * @param databaseTemplate
-     * @param metaSql          查询元数据
-     * @param schema           架构名
-     * @param tableName        表名
-     * @return
-     */
-    protected MetaInfo getMetaInfo(DatabaseTemplate databaseTemplate, String metaSql, String schema, String tableName) throws SQLException {
-        SqlRowSet sqlRowSet = databaseTemplate.queryForRowSet(metaSql);
-        ResultSetWrappingSqlRowSet rowSet = (ResultSetWrappingSqlRowSet) sqlRowSet;
-        SqlRowSetMetaData metaData = rowSet.getMetaData();
-
-        // 查询表字段信息
-        int columnCount = metaData.getColumnCount();
-        if (1 > columnCount) {
-            throw new ConnectorException("查询表字段不能为空.");
-        }
-        List<Field> fields = new ArrayList<>(columnCount);
-        Map<String, List<String>> tables = new HashMap<>();
-        try {
-            Connection connection = databaseTemplate.getConnection();
-            DatabaseMetaData md = connection.getMetaData();
-            final String catalog = connection.getCatalog();
-            schema = StringUtil.isNotBlank(schema) ? schema : null;
-            String name = null;
-            String label = null;
-            String typeName = null;
-            String table = null;
-            int columnType;
-            boolean pk;
-            for (int i = 1; i <= columnCount; i++) {
-                table = StringUtil.isNotBlank(tableName) ? tableName : metaData.getTableName(i);
-                if (null == tables.get(table)) {
-                    tables.putIfAbsent(table, findTablePrimaryKeys(md, catalog, schema, table));
-                }
-                name = metaData.getColumnName(i);
-                label = metaData.getColumnLabel(i);
-                typeName = metaData.getColumnTypeName(i);
-                columnType = metaData.getColumnType(i);
-                pk = isPk(tables, table, name);
-                fields.add(new Field(label, typeName, columnType, pk));
-            }
-        } finally {
-            tables.clear();
-        }
-        return new MetaInfo().setColumn(fields);
-    }
-
-    /**
-     * 返回主键名称
-     *
-     * @param commandConfig
-     * @param quotation
-     * @return
-     */
-    protected String findOriginalTablePrimaryKey(CommandConfig commandConfig, String quotation) {
-        Table table = commandConfig.getOriginalTable();
-        if (null != table) {
-            List<Field> column = table.getColumn();
-            if (!CollectionUtils.isEmpty(column)) {
-                for (Field c : column) {
-                    if (c.isPk()) {
-                        return new StringBuilder(quotation).append(c.getName()).append(quotation).toString();
-                    }
-                }
-            }
-        }
-
-        DatabaseConfig cfg = (DatabaseConfig) commandConfig.getConnectorConfig();
-        if (StringUtil.isBlank(cfg.getPrimaryKey())) {
-            throw new ConnectorException("Table primary key can not be empty.");
-        }
-
-        return new StringBuilder(quotation).append(cfg.getPrimaryKey()).append(quotation).toString();
-    }
-
     /**
      * 返回表主键
      *

+ 13 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/Database.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.database;
 
+import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.model.PageSql;
 
 public interface Database {
@@ -12,13 +13,22 @@ public interface Database {
      */
     String getPageSql(PageSql config);
 
+    /**
+     * 获取分页游标SQL
+     *
+     * @param pageSql
+     * @return
+     */
+    default String getPageCursorSql(PageSql pageSql) {
+        return "";
+    }
+
     /**
      * 获取分页SQL
      *
-     * @param pageIndex
-     * @param pageSize
+     * @param config
      * @return
      */
-    Object[] getPageArgs(int pageIndex, int pageSize);
+    Object[] getPageArgs(ReaderConfig config);
 
 }

+ 22 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryCursor.java

@@ -0,0 +1,22 @@
+package org.dbsyncer.connector.database.sqlbuilder;
+
+import org.dbsyncer.connector.config.SqlBuilderConfig;
+import org.dbsyncer.connector.database.Database;
+import org.dbsyncer.connector.model.PageSql;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/8/9 0:03
+ */
+public class SqlBuilderQueryCursor extends SqlBuilderQuery {
+
+    @Override
+    public String buildSql(SqlBuilderConfig config) {
+        // 分页语句
+        Database database = config.getDatabase();
+        PageSql pageSql = new PageSql(config, buildQuerySql(config), config.getPk());
+        return database.getPageCursorSql(pageSql);
+    }
+
+}

+ 5 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/SqlBuilderEnum.java

@@ -4,7 +4,6 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.sqlbuilder.*;
-import org.dbsyncer.connector.database.sqlbuilder.SqlBuilder;
 
 /**
  * @author AE86
@@ -28,7 +27,11 @@ public enum SqlBuilderEnum {
     /**
      * 查询SQL生成器
      */
-    QUERY(ConnectorConstant.OPERTION_QUERY, new SqlBuilderQuery());
+    QUERY(ConnectorConstant.OPERTION_QUERY, new SqlBuilderQuery()),
+    /**
+     * 查询游标SQL生成器
+     */
+    QUERY_CURSOR(ConnectorConstant.OPERTION_QUERY_CURSOR, new SqlBuilderQueryCursor());
 
     /**
      * SQL构造器名称

+ 36 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.connector.mysql;
 
-import org.dbsyncer.connector.constant.DatabaseConstant;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.model.PageSql;
@@ -17,16 +18,47 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
 
     @Override
     public String getPageSql(PageSql config) {
-        return config.getQuerySql() + DatabaseConstant.MYSQL_PAGE_SQL;
+        final String quotation = buildSqlWithQuotation();
+        final String pk = config.getPk();
+        // select * from test.`my_user` where `id` > ? order by `id` limit ?
+        StringBuilder querySql = new StringBuilder(config.getQuerySql());
+        String queryFilter = config.getSqlBuilderConfig().getQueryFilter();
+        if (StringUtil.isNotBlank(queryFilter)) {
+            querySql.append(" AND ");
+        }else {
+            querySql.append(" WHERE ");
+        }
+        querySql.append(" ").append(quotation).append(pk).append(quotation).append(" > ? ORDER BY ").append(quotation).append(pk).append(quotation).append(" limit ?");
+        return querySql.toString();
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
-        return new Object[] {(pageIndex - 1) * pageSize, pageSize};
+    public String getPageCursorSql(PageSql config){
+        final String quotation = buildSqlWithQuotation();
+        final String pk = config.getPk();
+        // select * from test.`my_user` order by `id` limit ?
+        StringBuilder querySql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ").append(quotation).append(pk).append(quotation).append(" limit ?");
+        return querySql.toString();
+    }
+
+    @Override
+    public Object[] getPageArgs(ReaderConfig config) {
+        int pageSize = config.getPageSize();
+        String cursor = config.getCursor();
+        if (StringUtil.isBlank(cursor)) {
+            return new Object[]{pageSize};
+        }
+        return new Object[]{cursor, pageSize};
+    }
+
+    @Override
+    protected boolean enableCursor() {
+        return true;
     }
 
     @Override
     public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
         return super.getTable(connectorMapper, "show tables");
     }
+
 }

+ 6 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -4,11 +4,12 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.model.PageSql;
-import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
+import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.model.Table;
 
 import java.util.Collections;
 import java.util.List;
@@ -33,7 +34,9 @@ public final class OracleConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
+    public Object[] getPageArgs(ReaderConfig config) {
+        int pageSize = config.getPageSize();
+        int pageIndex = config.getPageIndex();
         return new Object[]{pageIndex * pageSize, (pageIndex - 1) * pageSize};
     }
 

+ 6 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -4,12 +4,13 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.model.PageSql;
-import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.enums.TableTypeEnum;
+import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.model.Table;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -40,7 +41,9 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
+    public Object[] getPageArgs(ReaderConfig config) {
+        int pageSize = config.getPageSize();
+        int pageIndex = config.getPageIndex();
         return new Object[]{pageSize, (pageIndex - 1) * pageSize};
     }
 

+ 5 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLMysqlConnector.java

@@ -1,8 +1,9 @@
 package org.dbsyncer.connector.sql;
 
 import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
+import org.dbsyncer.connector.model.PageSql;
 
 import java.util.Map;
 
@@ -14,7 +15,9 @@ public final class DQLMysqlConnector extends AbstractDQLConnector {
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
+    public Object[] getPageArgs(ReaderConfig config) {
+        int pageSize = config.getPageSize();
+        int pageIndex = config.getPageIndex();
         return new Object[]{(pageIndex - 1) * pageSize, pageSize};
     }
 

+ 5 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java

@@ -1,7 +1,8 @@
 package org.dbsyncer.connector.sql;
 
-import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
+import org.dbsyncer.connector.model.PageSql;
 
 public final class DQLOracleConnector extends AbstractDQLConnector {
 
@@ -11,7 +12,9 @@ public final class DQLOracleConnector extends AbstractDQLConnector {
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
+    public Object[] getPageArgs(ReaderConfig config) {
+        int pageSize = config.getPageSize();
+        int pageIndex = config.getPageIndex();
         return new Object[]{pageIndex * pageSize, (pageIndex - 1) * pageSize};
     }
 

+ 5 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLPostgreSQLConnector.java

@@ -1,7 +1,8 @@
 package org.dbsyncer.connector.sql;
 
-import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
+import org.dbsyncer.connector.model.PageSql;
 
 public final class DQLPostgreSQLConnector extends AbstractDQLConnector {
 
@@ -11,7 +12,9 @@ public final class DQLPostgreSQLConnector extends AbstractDQLConnector {
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
+    public Object[] getPageArgs(ReaderConfig config) {
+        int pageSize = config.getPageSize();
+        int pageIndex = config.getPageIndex();
         return new Object[]{pageSize, (pageIndex - 1) * pageSize};
     }
 

+ 4 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.sql;
 
+import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.model.PageSql;
 import org.slf4j.Logger;
@@ -15,7 +16,9 @@ public final class DQLSqlServerConnector extends AbstractDQLConnector {
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
+    public Object[] getPageArgs(ReaderConfig config) {
+        int pageSize = config.getPageSize();
+        int pageIndex = config.getPageIndex();
         return new Object[]{(pageIndex - 1) * pageSize + 1, pageIndex * pageSize};
     }
 

+ 4 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -3,6 +3,7 @@ package org.dbsyncer.connector.sqlserver;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
@@ -25,7 +26,9 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
+    public Object[] getPageArgs(ReaderConfig config) {
+        int pageSize = config.getPageSize();
+        int pageIndex = config.getPageIndex();
         return new Object[] {(pageIndex - 1) * pageSize + 1, pageIndex * pageSize};
     }
 

+ 3 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -80,6 +80,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         Meta meta = manager.getMeta(task.getId());
         Map<String, String> snapshot = meta.getSnapshot();
         task.setPageIndex(NumberUtil.toInt(snapshot.get(ParserEnum.PAGE_INDEX.getCode()), ParserEnum.PAGE_INDEX.getDefaultValue()));
+        task.setCursor(snapshot.get(ParserEnum.CURSOR.getCode()));
         task.setTableGroupIndex(NumberUtil.toInt(snapshot.get(ParserEnum.TABLE_GROUP_INDEX.getCode()), ParserEnum.TABLE_GROUP_INDEX.getDefaultValue()));
         flush(task);
 
@@ -90,6 +91,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
             }
             parser.execute(task, mapping, list.get(i), executorService);
             task.setPageIndex(ParserEnum.PAGE_INDEX.getDefaultValue());
+            task.setCursor("");
             task.setTableGroupIndex(++i);
             flush(task);
         }
@@ -97,6 +99,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         // 记录结束时间
         task.setEndTime(Instant.now().toEpochMilli());
         task.setPageIndex(ParserEnum.PAGE_INDEX.getDefaultValue());
+        task.setCursor("");
         task.setTableGroupIndex(ParserEnum.TABLE_GROUP_INDEX.getDefaultValue());
         flush(task);
     }

+ 23 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -78,6 +78,7 @@ public class ParserFactory implements Parser {
     @Qualifier("taskExecutor")
     private Executor taskExecutor;
 
+    @Qualifier("webApplicationContext")
     @Autowired
     private ApplicationContext applicationContext;
 
@@ -244,11 +245,14 @@ public class ParserFactory implements Parser {
         Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
         // 获取同步字段
         Picker picker = new Picker(fieldMapping);
+        String pk = picker.getSourcePrimaryKeyName();
 
         int pageSize = mapping.getReadNum();
         int batchSize = mapping.getBatchNum();
         ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
         ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
+        String cursor = task.getCursor();
+        int pageIndex = task.getPageIndex();
 
         for (; ; ) {
             if (!task.isRunning()) {
@@ -257,7 +261,7 @@ public class ParserFactory implements Parser {
             }
 
             // 1、获取数据源数据
-            Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), task.getPageIndex(), pageSize));
+            Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), cursor, pageIndex, pageSize));
             List<Map> data = reader.getSuccessData();
             if (CollectionUtils.isEmpty(data)) {
                 logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
@@ -278,7 +282,9 @@ public class ParserFactory implements Parser {
             Result writer = writeBatch(batchWriter, executorService);
 
             // 6、更新结果
-            task.setPageIndex(task.getPageIndex() + 1);
+            cursor = getLastCursor(data, pk);
+            task.setPageIndex(pageIndex + 1);
+            task.setCursor(cursor);
             flush(task, writer);
 
             // 7、判断尾页
@@ -415,4 +421,19 @@ public class ParserFactory implements Parser {
         return getConnector(connectorId).getConfig();
     }
 
+    /**
+     * 获取最新游标值
+     *
+     * @param data
+     * @param pk
+     * @return
+     */
+    private String getLastCursor(List<Map> data, String pk) {
+        if(CollectionUtils.isEmpty(data)){
+            return "";
+        }
+        Object value = data.get(data.size() - 1).get(pk);
+        return value == null ? "" : String.valueOf(value);
+    }
+
 }

+ 5 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/enums/ParserEnum.java

@@ -9,6 +9,11 @@ package org.dbsyncer.parser.enums;
  */
 public enum ParserEnum {
 
+    /**
+     * 游标
+     */
+    CURSOR("cursor", 0),
+
     /**
      * 页数
      */

+ 10 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -2,6 +2,7 @@ package org.dbsyncer.parser.model;
 
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.parser.ParserException;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -63,6 +64,15 @@ public class Picker {
         }
     }
 
+    public String getSourcePrimaryKeyName() {
+        for (Field f : sourceFields) {
+            if (f.isPk()) {
+                return f.getName();
+            }
+        }
+        throw new ParserException("主键为空");
+    }
+
     public List<Field> getTargetFields() {
         return targetFields;
     }

+ 10 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Task.java

@@ -10,6 +10,8 @@ public class Task {
 
     private int pageIndex;
 
+    private String cursor;
+
     private long beginTime;
 
     private long endTime;
@@ -54,6 +56,14 @@ public class Task {
         this.pageIndex = pageIndex;
     }
 
+    public String getCursor() {
+        return cursor;
+    }
+
+    public void setCursor(String cursor) {
+        this.cursor = cursor;
+    }
+
     public long getBeginTime() {
         return beginTime;
     }

+ 5 - 2
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -34,7 +34,10 @@ import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
 import java.io.*;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -299,7 +302,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         }
 
         List<Field> fields = executor.getFieldPairs().stream().map(p -> new Field(p.columnName, p.labelName)).collect(Collectors.toList());
-        final SqlBuilderConfig config = new SqlBuilderConfig(connector, null, "", table, ConfigConstant.CONFIG_MODEL_ID, fields, "", "");
+        final SqlBuilderConfig config = new SqlBuilderConfig(connector, "", table, ConfigConstant.CONFIG_MODEL_ID, fields, "", "");
 
         String query = SqlBuilderEnum.QUERY.getSqlBuilder().buildQuerySql(config);
         String insert = SqlBuilderEnum.INSERT.getSqlBuilder().buildSql(config);