AE86 пре 2 година
родитељ
комит
033a570eb8
28 измењених фајлова са 331 додато и 214 уклоњено
  1. 12 4
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java
  2. 5 9
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java
  3. 7 6
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java
  4. 27 23
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  5. 8 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderDelete.java
  6. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQuery.java
  7. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryCursor.java
  8. 7 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderUpdate.java
  9. 16 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/PageSql.java
  10. 13 16
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Table.java
  11. 27 19
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java
  12. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java
  13. 27 17
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java
  14. 11 9
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java
  15. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java
  16. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLPostgreSQLConnector.java
  17. 8 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java
  18. 7 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java
  19. 70 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PrimaryKeyUtil.java
  20. 42 31
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractDatabaseExtractor.java
  21. 3 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/DqlOracleExtractor.java
  22. 7 14
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java
  23. 6 5
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/TableGroupCommand.java
  24. 5 3
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java
  25. 2 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  26. 6 16
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  27. 5 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Task.java
  28. 5 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

+ 12 - 4
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -24,10 +24,13 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * @author AE86
@@ -132,19 +135,24 @@ public class TableGroupChecker extends AbstractChecker {
         checker.dealIncrementStrategy(mapping, tableGroup);
     }
 
-    private Table getTable(String connectorId, String tableName, String primaryKey) {
+    private Table getTable(String connectorId, String tableName, String primaryKeyStr) {
         MetaInfo metaInfo = manager.getMetaInfo(connectorId, tableName);
         Assert.notNull(metaInfo, "无法获取连接器表信息:" + tableName);
         // 自定义主键
-        if (StringUtil.isNotBlank(primaryKey) && !CollectionUtils.isEmpty(metaInfo.getColumn())) {
+        Set<String> primaryKeys = new HashSet<>();
+        if (StringUtil.isNotBlank(primaryKeyStr)) {
+            String[] pks = StringUtil.split(primaryKeyStr, ",");
+            primaryKeys.addAll(Arrays.asList(pks));
+        }
+        if (!CollectionUtils.isEmpty(primaryKeys) && !CollectionUtils.isEmpty(metaInfo.getColumn())) {
             for (Field field : metaInfo.getColumn()) {
-                if (StringUtil.equals(field.getName(), primaryKey)) {
+                if (primaryKeys.contains(field.getName())) {
                     field.setPk(true);
                     break;
                 }
             }
         }
-        return new Table(tableName, metaInfo.getTableType(), primaryKey, metaInfo.getColumn(), metaInfo.getSql());
+        return new Table(tableName, metaInfo.getTableType(), primaryKeys, metaInfo.getColumn(), metaInfo.getSql());
     }
 
     private void checkRepeatedTable(String mappingId, String sourceTable, String targetTable) {

+ 5 - 9
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java

@@ -7,14 +7,14 @@ public class ReaderConfig {
 
     private Map<String, String> command;
     private List<Object> args;
-    private Object cursor;
+    private Object[] cursors;
     private int pageIndex;
     private int pageSize;
 
-    public ReaderConfig(Map<String,String> command, List<Object> args, Object cursor, int pageIndex, int pageSize) {
+    public ReaderConfig(Map<String,String> command, List<Object> args, Object[] cursors, int pageIndex, int pageSize) {
         this.command = command;
         this.args = args;
-        this.cursor = cursor;
+        this.cursors = cursors;
         this.pageIndex = pageIndex;
         this.pageSize = pageSize;
     }
@@ -27,12 +27,8 @@ public class ReaderConfig {
         return args;
     }
 
-    public Object getCursor() {
-        return cursor;
-    }
-
-    public void setCursor(String cursor) {
-        this.cursor = cursor;
+    public Object[] getCursors() {
+        return cursors;
     }
 
     public int getPageIndex() {

+ 7 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java

@@ -4,6 +4,7 @@ import org.dbsyncer.connector.database.Database;
 import org.dbsyncer.connector.model.Field;
 
 import java.util.List;
+import java.util.Set;
 
 public class SqlBuilderConfig {
 
@@ -12,8 +13,8 @@ public class SqlBuilderConfig {
     private String schema;
     // 表名
     private String tableName;
-    // 主键
-    private String pk;
+    // 主键列表
+    private Set<String> primaryKeys;
     // 字段
     private List<Field> fields;
     // 过滤条件
@@ -21,11 +22,11 @@ public class SqlBuilderConfig {
     // 引号
     private String quotation;
 
-    public SqlBuilderConfig(Database database, String schema, String tableName, String pk, List<Field> fields, String queryFilter, String quotation) {
+    public SqlBuilderConfig(Database database, String schema, String tableName, Set<String> primaryKeys, List<Field> fields, String queryFilter, String quotation) {
         this.database = database;
         this.schema = schema;
         this.tableName = tableName;
-        this.pk = pk;
+        this.primaryKeys = primaryKeys;
         this.fields = fields;
         this.queryFilter = queryFilter;
         this.quotation = quotation;
@@ -43,8 +44,8 @@ public class SqlBuilderConfig {
         return tableName;
     }
 
-    public String getPk() {
-        return pk;
+    public Set<String> getPrimaryKeys() {
+        return primaryKeys;
     }
 
     public List<Field> getFields() {

+ 27 - 23
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -121,7 +121,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     @Override
     public Result reader(DatabaseConnectorMapper connectorMapper, ReaderConfig config) {
         // 1、获取select SQL
-        String queryKey = enableCursor() && null == config.getCursor() ? ConnectorConstant.OPERTION_QUERY_CURSOR : SqlBuilderEnum.QUERY.getName();
+        String queryKey = enableCursor() && null == config.getCursors() ? ConnectorConstant.OPERTION_QUERY_CURSOR : SqlBuilderEnum.QUERY.getName();
         String querySql = config.getCommand().get(queryKey);
         Assert.hasText(querySql, "查询语句不能为空.");
 
@@ -223,39 +223,41 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
         // 获取查询数据行是否存在
         String tableName = commandConfig.getTable().getName();
-        String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
-        StringBuilder queryCount = new StringBuilder("SELECT COUNT(1) FROM ").append(schema).append(quotation).append(tableName).append(quotation)
-                .append(" WHERE ").append(quotation).append(pk).append(quotation).append(" = ?");
+        Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
+        StringBuilder queryCount = new StringBuilder("SELECT COUNT(1) FROM ").append(schema).append(quotation).append(tableName).append(quotation).append(" WHERE ");
+        // id = ? AND uid = ?
+        PrimaryKeyUtil.buildSql(queryCount, primaryKeys, quotation, " AND ", " = ? ", true);
+
         String queryCountExist = ConnectorConstant.OPERTION_QUERY_COUNT_EXIST;
         map.put(queryCountExist, queryCount.toString());
         return map;
     }
 
     /**
-     * 是否支持游标查询
+     * 查询语句表名和字段带上引号(默认不加)
      *
      * @return
      */
-    protected boolean enableCursor() {
-        return false;
+    public String buildSqlWithQuotation() {
+        return "";
     }
 
     /**
-     * 健康检查
+     * 是否支持游标查询
      *
      * @return
      */
-    protected String getValidationQuery() {
-        return "select 1";
+    protected boolean enableCursor() {
+        return false;
     }
 
     /**
-     * 查询语句表名和字段带上引号(默认不加)
+     * 健康检
      *
      * @return
      */
-    protected String buildSqlWithQuotation() {
-        return "";
+    protected String getValidationQuery() {
+        return "select 1";
     }
 
     /**
@@ -339,14 +341,17 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
      */
     protected String getQueryCountSql(CommandConfig commandConfig, String schema, String quotation, String queryFilterSql) {
         String table = commandConfig.getTable().getName();
-        String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
-        StringBuilder queryCount = new StringBuilder();
-        queryCount.append("SELECT COUNT(1) FROM (SELECT 1 FROM ").append(schema).append(quotation).append(table).append(quotation);
+        Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
+        StringBuilder sql = new StringBuilder();
+        sql.append("SELECT COUNT(1) FROM (SELECT 1 FROM ").append(schema).append(quotation).append(table).append(quotation);
         if (StringUtil.isNotBlank(queryFilterSql)) {
-            queryCount.append(queryFilterSql);
+            sql.append(queryFilterSql);
         }
-        queryCount.append(" GROUP BY ").append(quotation).append(pk).append(quotation).append(") DBSYNCER_T");
-        return queryCount.toString();
+        sql.append(" GROUP BY ");
+        // id,uid
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        sql.append(") DBSYNCER_T");
+        return sql.toString();
     }
 
     /**
@@ -487,9 +492,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             logger.error("Table name can not be empty.");
             throw new ConnectorException("Table name can not be empty.");
         }
-        String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
-
-        SqlBuilderConfig config = new SqlBuilderConfig(this, schema, tableName, pk, fields, queryFilterSQL, buildSqlWithQuotation());
+        Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
+        SqlBuilderConfig config = new SqlBuilderConfig(this, schema, tableName, primaryKeys, fields, queryFilterSQL, buildSqlWithQuotation());
         return SqlBuilderEnum.getSqlBuilder(type).buildSql(config);
     }
 
@@ -548,7 +552,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
     private void forceUpdate(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField,
                              Map row) {
-        if(isUpdate(config.getEvent()) || isInsert(config.getEvent())){
+        if (isUpdate(config.getEvent()) || isInsert(config.getEvent())) {
             // 存在执行覆盖更新,否则写入
             final String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
             final String event = existRow(connectorMapper, queryCount, row.get(pkField.getName())) ? ConnectorConstant.OPERTION_UPDATE : ConnectorConstant.OPERTION_INSERT;

+ 8 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderDelete.java

@@ -2,6 +2,9 @@ package org.dbsyncer.connector.database.sqlbuilder;
 
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
+
+import java.util.Set;
 
 /**
  * @author AE86
@@ -14,9 +17,11 @@ public class SqlBuilderDelete extends AbstractSqlBuilder {
     public String buildSql(SqlBuilderConfig config) {
         String tableName = config.getTableName();
         String quotation = config.getQuotation();
-        // DELETE FROM "USER" WHERE "ID"=?
-        return new StringBuilder().append("DELETE FROM ").append(config.getSchema()).append(quotation).append(tableName).append(quotation).append(" WHERE ").append(quotation).append(config.getPk()).append(quotation)
-                .append("=?").toString();
+        Set<String> primaryKeys = config.getPrimaryKeys();
+        // DELETE FROM "USER" WHERE "ID"=? AND "UID" = ?
+        StringBuilder sql = new StringBuilder().append("DELETE FROM ").append(config.getSchema()).append(quotation).append(tableName).append(quotation).append(" WHERE ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " = ? ", true);
+        return sql.toString();
     }
 
 }

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQuery.java

@@ -20,7 +20,7 @@ public class SqlBuilderQuery extends AbstractSqlBuilder {
     public String buildSql(SqlBuilderConfig config) {
         // 分页语句
         Database database = config.getDatabase();
-        PageSql pageSql = new PageSql(config, buildQuerySql(config), config.getPk());
+        PageSql pageSql = new PageSql(config, buildQuerySql(config), config.getPrimaryKeys());
         return database.getPageSql(pageSql);
     }
 

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryCursor.java

@@ -15,7 +15,7 @@ public class SqlBuilderQueryCursor extends SqlBuilderQuery {
     public String buildSql(SqlBuilderConfig config) {
         // 分页语句
         Database database = config.getDatabase();
-        PageSql pageSql = new PageSql(config, buildQuerySql(config), config.getPk());
+        PageSql pageSql = new PageSql(config, buildQuerySql(config), config.getPrimaryKeys());
         return database.getPageCursorSql(pageSql);
     }
 

+ 7 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderUpdate.java

@@ -4,8 +4,10 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
 import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
 import java.util.List;
+import java.util.Set;
 
 /**
  * @author AE86
@@ -20,8 +22,8 @@ public class SqlBuilderUpdate extends AbstractSqlBuilder {
         List<Field> fields = config.getFields();
         String quotation = config.getQuotation();
         StringBuilder sql = new StringBuilder();
+        Set<String> primaryKeys = config.getPrimaryKeys();
         int size = fields.size();
-        int end = size - 1;
         sql.append("UPDATE ").append(config.getSchema()).append(quotation).append(tableName).append(quotation).append(" SET ");
         for (int i = 0; i < size; i++) {
             // skip pk
@@ -31,7 +33,7 @@ public class SqlBuilderUpdate extends AbstractSqlBuilder {
 
             // "USERNAME"=?
             sql.append(quotation).append(fields.get(i).getName()).append(quotation).append("=?");
-            if (i < end) {
+            if (i < size - 1) {
                 sql.append(",");
             }
         }
@@ -42,8 +44,9 @@ public class SqlBuilderUpdate extends AbstractSqlBuilder {
             sql.deleteCharAt(last);
         }
 
-        // UPDATE "USER" SET "USERNAME"=?,"AGE"=? WHERE "ID"=?
-        sql.append(" WHERE ").append(quotation).append(config.getPk()).append(quotation).append("=?");
+        // UPDATE "USER" SET "USERNAME"=?,"AGE"=? WHERE "ID"=? AND "UID" = ?
+        sql.append(" WHERE ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " = ? ", true);
         return sql.toString();
     }
 

+ 16 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/PageSql.java

@@ -2,23 +2,28 @@ package org.dbsyncer.connector.model;
 
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 
+import java.util.Set;
+
 public class PageSql {
 
     private SqlBuilderConfig sqlBuilderConfig;
 
     private String querySql;
 
-    private String pk;
+    private String quotation;
+
+    private Set<String> primaryKeys;
 
-    public PageSql(String querySql, String pk) {
+    public PageSql(String querySql, String quotation, Set<String> primaryKeys) {
         this.querySql = querySql;
-        this.pk = pk;
+        this.quotation = quotation;
+        this.primaryKeys = primaryKeys;
     }
 
-    public PageSql(SqlBuilderConfig sqlBuilderConfig, String querySql, String pk) {
+    public PageSql(SqlBuilderConfig sqlBuilderConfig, String querySql, Set<String> primaryKeys) {
         this.sqlBuilderConfig = sqlBuilderConfig;
         this.querySql = querySql;
-        this.pk = pk;
+        this.primaryKeys = primaryKeys;
     }
 
     public SqlBuilderConfig getSqlBuilderConfig() {
@@ -29,7 +34,11 @@ public class PageSql {
         return querySql;
     }
 
-    public String getPk() {
-        return pk;
+    public String getQuotation() {
+        return quotation;
+    }
+
+    public Set<String> getPrimaryKeys() {
+        return primaryKeys;
     }
 }

+ 13 - 16
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Table.java

@@ -3,6 +3,7 @@ package org.dbsyncer.connector.model;
 import org.dbsyncer.connector.enums.TableTypeEnum;
 
 import java.util.List;
+import java.util.Set;
 
 /**
  * @author AE86
@@ -22,9 +23,9 @@ public class Table {
     private String type;
 
     /**
-     * 主键
+     * 主键列表
      */
-    private String primaryKey;
+    private Set<String> primaryKeys;
 
     /**
      * 属性字段
@@ -51,10 +52,10 @@ public class Table {
         this(name, type, null, null, null);
     }
 
-    public Table(String name, String type, String primaryKey, List<Field> column, String sql) {
+    public Table(String name, String type, Set<String> primaryKeys, List<Field> column, String sql) {
         this.name = name;
         this.type = type;
-        this.primaryKey = primaryKey;
+        this.primaryKeys = primaryKeys;
         this.column = column;
         this.sql = sql;
     }
@@ -63,44 +64,40 @@ public class Table {
         return name;
     }
 
-    public Table setName(String name) {
+    public void setName(String name) {
         this.name = name;
-        return this;
     }
 
     public String getType() {
         return type;
     }
 
-    public Table setType(String type) {
+    public void setType(String type) {
         this.type = type;
-        return this;
     }
 
-    public String getPrimaryKey() {
-        return primaryKey;
+    public Set<String> getPrimaryKeys() {
+        return primaryKeys;
     }
 
-    public void setPrimaryKey(String primaryKey) {
-        this.primaryKey = primaryKey;
+    public void setPrimaryKeys(Set<String> primaryKeys) {
+        this.primaryKeys = primaryKeys;
     }
 
     public List<Field> getColumn() {
         return column;
     }
 
-    public Table setColumn(List<Field> column) {
+    public void setColumn(List<Field> column) {
         this.column = column;
-        return this;
     }
 
     public String getSql() {
         return sql;
     }
 
-    public Table setSql(String sql) {
+    public void setSql(String sql) {
         this.sql = sql;
-        return this;
     }
 
     public long getCount() {

+ 27 - 19
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -4,47 +4,55 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
+
+import java.util.Set;
 
 public final class MysqlConnector extends AbstractDatabaseConnector {
 
     @Override
-    protected String buildSqlWithQuotation() {
+    public String buildSqlWithQuotation() {
         return "`";
     }
 
     @Override
     public String getPageSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
-        final String pk = config.getPk();
-        // select * from test.`my_user` where `id` > ? order by `id` limit ?
-        StringBuilder querySql = new StringBuilder(config.getQuerySql());
-        String queryFilter = config.getSqlBuilderConfig().getQueryFilter();
-        if (StringUtil.isNotBlank(queryFilter)) {
-            querySql.append(" AND ");
-        } else {
-            querySql.append(" WHERE ");
-        }
-        querySql.append(quotation).append(pk).append(quotation).append(" > ? ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
-        return querySql.toString();
+        final Set<String> primaryKeys = config.getPrimaryKeys();
+        // select * from test.`my_user` where `id` > ? and `uid` > ? order by `id`,`uid` limit ?
+        StringBuilder sql = new StringBuilder(config.getQuerySql());
+        boolean blank = StringUtil.isBlank(config.getSqlBuilderConfig().getQueryFilter());
+        sql.append(blank ? " WHERE " : " AND ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " > ? ", blank);
+        sql.append(" ORDER BY ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        sql.append(" LIMIT ?");
+        return sql.toString();
     }
 
     @Override
     public String getPageCursorSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
-        final String pk = config.getPk();
-        // select * from test.`my_user` order by `id` limit ?
-        StringBuilder querySql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
-        return querySql.toString();
+        final Set<String> primaryKeys = config.getPrimaryKeys();
+        // select * from test.`my_user` order by `id`,`uid` limit ?
+        StringBuilder sql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        sql.append(" LIMIT ?");
+        return sql.toString();
     }
 
     @Override
     public Object[] getPageArgs(ReaderConfig config) {
         int pageSize = config.getPageSize();
-        Object cursor = config.getCursor();
-        if (null == cursor) {
+        Object[] cursors = config.getCursors();
+        if (null == cursors) {
             return new Object[]{pageSize};
         }
-        return new Object[]{cursor, pageSize};
+        int cursorsLen = cursors.length;
+        Object[] newCursors = new Object[cursorsLen + 1];
+        System.arraycopy(cursors, 0, newCursors, 0, cursorsLen);
+        newCursors[cursorsLen] = pageSize;
+        return newCursors;
     }
 
     @Override

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -31,7 +31,7 @@ public final class OracleConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    protected String buildSqlWithQuotation() {
+    public String buildSqlWithQuotation() {
         return "\"";
     }
 

+ 27 - 17
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -8,8 +8,10 @@ import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
 import java.sql.Types;
+import java.util.Set;
 
 public final class PostgreSQLConnector extends AbstractDatabaseConnector {
 
@@ -18,41 +20,49 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    protected String buildSqlWithQuotation() {
+    public String buildSqlWithQuotation() {
         return "\"";
     }
 
     @Override
     public String getPageSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
-        final String pk = config.getPk();
-        StringBuilder querySql = new StringBuilder(config.getQuerySql());
-        String queryFilter = config.getSqlBuilderConfig().getQueryFilter();
-        if (StringUtil.isNotBlank(queryFilter)) {
-            querySql.append(" AND ");
-        } else {
-            querySql.append(" WHERE ");
-        }
-        querySql.append(quotation).append(pk).append(quotation).append(" > ? ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
-        return querySql.toString();
+        final Set<String> primaryKeys = config.getPrimaryKeys();
+        // select * from test.`my_user` where `id` > ? and `uid` > ? order by `id`,`uid` limit ?
+        StringBuilder sql = new StringBuilder(config.getQuerySql());
+        boolean blank = StringUtil.isBlank(config.getSqlBuilderConfig().getQueryFilter());
+        sql.append(blank ? " WHERE " : " AND ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " > ? ", blank);
+        sql.append(" ORDER BY ");
+        // id,uid
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        sql.append(" LIMIT ?");
+        return sql.toString();
     }
 
     @Override
     public String getPageCursorSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
-        final String pk = config.getPk();
-        StringBuilder querySql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
-        return querySql.toString();
+        final Set<String> primaryKeys = config.getPrimaryKeys();
+        // select * from test.`my_user` order by `id`,`uid` limit ?
+        StringBuilder sql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        sql.append(" LIMIT ?");
+        return sql.toString();
     }
 
     @Override
     public Object[] getPageArgs(ReaderConfig config) {
         int pageSize = config.getPageSize();
-        Object cursor = config.getCursor();
-        if (null == cursor) {
+        Object[] cursors = config.getCursors();
+        if (null == cursors) {
             return new Object[]{pageSize};
         }
-        return new Object[]{cursor, pageSize};
+        int cursorsLen = cursors.length;
+        Object[] newCursors = new Object[cursorsLen + 1];
+        System.arraycopy(cursors, 0, newCursors, 0, cursorsLen);
+        newCursors[cursorsLen] = pageSize;
+        return newCursors;
     }
 
     @Override

+ 11 - 9
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java

@@ -8,6 +8,7 @@ import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
+import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.model.SqlTable;
@@ -15,9 +16,11 @@ import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * @author AE86
@@ -33,9 +36,7 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
         List<Table> tables = new ArrayList<>();
         if (!CollectionUtils.isEmpty(sqlTables)) {
             sqlTables.forEach(s -> {
-                Table table = new Table(s.getTable());
-                table.setSql(s.getSql());
-                tables.add(table);
+                tables.add(new Table(s.getSqlName(), TableTypeEnum.TABLE.getCode(), Collections.EMPTY_SET, Collections.EMPTY_LIST, s.getSql()));
             });
         }
         return tables;
@@ -47,11 +48,11 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) {
+    public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String sqlName) {
         DatabaseConfig cfg = connectorMapper.getConfig();
         List<SqlTable> sqlTables = cfg.getSqlTables();
         for (SqlTable s : sqlTables) {
-            if (s.getTable().equals(tableName)) {
+            if (StringUtil.equals(s.getSqlName(), sqlName)) {
                 String sql = s.getSql().toUpperCase();
                 sql = sql.replace("\t", " ");
                 sql = sql.replace("\r", " ");
@@ -74,7 +75,7 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
         // 获取过滤SQL
         String queryFilterSql = getQueryFilterSql(commandConfig.getFilter());
         Table table = commandConfig.getTable();
-        String primaryKey = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
+        Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
 
         // 获取查询SQL
         Map<String, String> map = new HashMap<>();
@@ -85,8 +86,7 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
             querySql += queryFilterSql;
         }
         String quotation = buildSqlWithQuotation();
-        String pk = new StringBuilder(quotation).append(primaryKey).append(quotation).toString();
-        map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(new PageSql(querySql, pk)));
+        map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(new PageSql(querySql, quotation, primaryKeys)));
 
         // 获取查询总数SQL
         StringBuilder queryCount = new StringBuilder();
@@ -94,7 +94,9 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
 
         // Mysql
         if (groupByPK) {
-            queryCount.append(" GROUP BY ").append(pk);
+            queryCount.append(" GROUP BY ");
+            // id,id2
+            PrimaryKeyUtil.buildSql(queryCount, primaryKeys, quotation, ",", "", true);
         }
         queryCount.append(") DBSYNCER_T");
         map.put(ConnectorConstant.OPERTION_QUERY_COUNT, queryCount.toString());

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java

@@ -19,7 +19,7 @@ public final class DQLOracleConnector extends AbstractDQLConnector {
     }
 
     @Override
-    protected String buildSqlWithQuotation() {
+    public String buildSqlWithQuotation() {
         return "\"";
     }
 

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLPostgreSQLConnector.java

@@ -19,7 +19,7 @@ public final class DQLPostgreSQLConnector extends AbstractDQLConnector {
     }
 
     @Override
-    protected String buildSqlWithQuotation() {
+    public String buildSqlWithQuotation() {
         return "\"";
     }
 }

+ 8 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java

@@ -3,16 +3,19 @@ package org.dbsyncer.connector.sql;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.model.PageSql;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
-public final class DQLSqlServerConnector extends AbstractDQLConnector {
+import java.util.Set;
 
-    private final Logger logger = LoggerFactory.getLogger(getClass());
+public final class DQLSqlServerConnector extends AbstractDQLConnector {
 
     @Override
     public String getPageSql(PageSql config) {
-        return String.format(DatabaseConstant.SQLSERVER_PAGE_SQL, config.getPk(), config.getQuerySql());
+        String quotation = config.getQuotation();
+        Set<String> primaryKeys = config.getPrimaryKeys();
+        StringBuilder orderBy = new StringBuilder();
+        PrimaryKeyUtil.buildSql(orderBy, primaryKeys, quotation, " AND ", " = ? ", true);
+        return String.format(DatabaseConstant.SQLSERVER_PAGE_SQL, orderBy.toString(), config.getQuerySql());
     }
 
     @Override

+ 7 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -11,9 +11,11 @@ import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public final class SqlServerConnector extends AbstractDatabaseConnector {
@@ -32,7 +34,11 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
 
     @Override
     public String getPageSql(PageSql config) {
-        return String.format(DatabaseConstant.SQLSERVER_PAGE_SQL, config.getPk(), config.getQuerySql());
+        String quotation = config.getQuotation();
+        Set<String> primaryKeys = config.getPrimaryKeys();
+        StringBuilder orderBy = new StringBuilder();
+        PrimaryKeyUtil.buildSql(orderBy, primaryKeys, quotation, " AND ", " = ? ", true);
+        return String.format(DatabaseConstant.SQLSERVER_PAGE_SQL, orderBy.toString(), config.getQuerySql());
     }
 
     @Override

+ 70 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PrimaryKeyUtil.java

@@ -1,12 +1,19 @@
 package org.dbsyncer.connector.util;
 
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Table;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public abstract class PrimaryKeyUtil {
 
@@ -16,28 +23,84 @@ public abstract class PrimaryKeyUtil {
      * @param table
      * @return
      */
-    public static String findOriginalTablePrimaryKey(Table table) {
+    public static Set<String> findOriginalTablePrimaryKey(Table table) {
         if (null == table) {
-            return null;
+            throw new ConnectorException("The table is null.");
         }
 
         // 获取自定义主键
-        String pk = table.getPrimaryKey();
-        if (StringUtil.isNotBlank(pk)) {
-            return pk;
+        if (!CollectionUtils.isEmpty(table.getPrimaryKeys())) {
+            return Collections.unmodifiableSet(table.getPrimaryKeys());
         }
 
         // 获取表原始主键
+        Set<String> primaryKeys = new HashSet<>();
         List<Field> column = table.getColumn();
         if (!CollectionUtils.isEmpty(column)) {
             for (Field c : column) {
                 if (c.isPk()) {
-                    return c.getName();
+                    primaryKeys.add(c.getName());
                 }
             }
         }
 
-        throw new ConnectorException(String.format("The primary key of table '%s' is null.", table.getName()));
+        if (CollectionUtils.isEmpty(primaryKeys)) {
+            throw new ConnectorException(String.format("The primary key of table '%s' is null.", table.getName()));
+        }
+        return Collections.unmodifiableSet(primaryKeys);
+    }
+
+    public static void buildSql(StringBuilder sql, Set<String> primaryKeys, String quotation, String join, String value, boolean skipFirst) {
+        AtomicBoolean added = new AtomicBoolean();
+        primaryKeys.forEach(pk -> {
+            // skip first pk
+            if (!skipFirst || added.get()) {
+                if (StringUtil.isNotBlank(join)) {
+                    sql.append(join);
+                }
+            }
+            sql.append(quotation).append(pk).append(quotation);
+            if (StringUtil.isNotBlank(value)) {
+                sql.append(value);
+            }
+            added.set(true);
+        });
     }
 
+    /**
+     * 获取最新游标值
+     *
+     * @param data
+     * @param primaryKeys
+     * @return
+     */
+    public static Object[] getLastCursors(List<Map> data, Set<String> primaryKeys) {
+        if (CollectionUtils.isEmpty(data)) {
+            return null;
+        }
+        Map last = data.get(data.size() - 1);
+        Object[] cursors = new Object[primaryKeys.size()];
+        Iterator<String> iterator = primaryKeys.iterator();
+        int i = 0;
+        while (iterator.hasNext()) {
+            String pk = iterator.next();
+            cursors[i] = last.get(pk);
+            i++;
+        }
+        return cursors;
+    }
+
+    public static Object[] getLastCursors(String str) {
+        Object[] cursors = null;
+        if (StringUtil.isNotBlank(str)) {
+            String[] split = StringUtil.split(str, ",");
+            int length = split.length;
+            cursors = new Object[length];
+            for (int i = 0; i < length; i++) {
+                String val = split[i];
+                cursors[i] = NumberUtil.isCreatable(val) ? NumberUtil.toLong(val) : val;
+            }
+        }
+        return cursors;
+    }
 }

+ 42 - 31
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractDatabaseExtractor.java

@@ -4,6 +4,7 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.MetaInfo;
@@ -11,8 +12,11 @@ import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.util.PrimaryKeyUtil;
 import org.springframework.util.Assert;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -60,29 +64,33 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
      */
     protected void postProcessDqlBeforeInitialization() {
         DatabaseConnectorMapper mapper = (DatabaseConnectorMapper) connectorFactory.connect(connectorConfig);
-        for (Table table : sourceTable) {
-            String sql = table.getSql();
-            String tableName = table.getName();
-            String primaryKey = PrimaryKeyUtil.findOriginalTablePrimaryKey(table);
+        AbstractDatabaseConnector connector = (AbstractDatabaseConnector) connectorFactory.getConnector(mapper);
+        String quotation = connector.buildSqlWithQuotation();
+
+        Map<String, String> tableMap = new HashMap<>();
+        mapper.getConfig().getSqlTables().forEach(s -> tableMap.put(s.getSqlName(), s.getTable()));
+
+        for (Table t : sourceTable) {
+            String sql = t.getSql();
+            String sqlName = t.getName();
+            Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(t);
+            String tableName = tableMap.get(sqlName);
             Assert.hasText(sql, "The sql is null.");
             Assert.hasText(tableName, "The tableName is null.");
-            Assert.hasText(primaryKey, "The primaryKey is null.");
 
-            MetaInfo metaInfo = connectorFactory.getMetaInfo(mapper, tableName);
+            MetaInfo metaInfo = connectorFactory.getMetaInfo(mapper, sqlName);
             final List<Field> column = metaInfo.getColumn();
-            Assert.notEmpty(column, String.format("The column of table name '%s' is empty.", tableName));
+            Assert.notEmpty(column, String.format("The column of table name '%s' is empty.", sqlName));
 
             sql = sql.toUpperCase().replace("\t", " ");
             sql = sql.replace("\r", " ");
             sql = sql.replace("\n", " ");
-            StringBuilder querySql = new StringBuilder(table.getSql());
-            if (StringUtil.contains(sql, " WHERE ")) {
-                querySql.append(" AND ");
-            } else {
-                querySql.append(" WHERE ");
-            }
-            querySql.append(primaryKey).append("=?");
-            DqlMapper dqlMapper = new DqlMapper(mapper, querySql.toString(), tableName, column, getPKIndex(column, primaryKey));
+
+            StringBuilder querySql = new StringBuilder(sql);
+            boolean notContainsWhere = !StringUtil.contains(sql, " WHERE ");
+            querySql.append(notContainsWhere ? " WHERE " : " AND ");
+            PrimaryKeyUtil.buildSql(querySql, primaryKeys, quotation, " AND ", " = ? ", notContainsWhere);
+            DqlMapper dqlMapper = new DqlMapper(mapper, querySql.toString(), column, getPrimaryKeyIndexArray(column, primaryKeys));
             dqlMap.putIfAbsent(tableName, dqlMapper);
         }
     }
@@ -91,26 +99,31 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
      * 获取主表主键索引
      *
      * @param column
-     * @param primaryKey
+     * @param primaryKeys
      * @return
      */
-    protected int getPKIndex(List<Field> column, String primaryKey) {
-        int pkIndex = 0;
-        boolean findPkIndex = false;
+    protected Integer[] getPrimaryKeyIndexArray(List<Field> column, Set<String> primaryKeys) {
+        List<Integer> indexList = new ArrayList<>();
         for (Field f : column) {
-            if (f.getName().equals(primaryKey)) {
-                pkIndex = column.indexOf(f);
-                findPkIndex = true;
-                break;
+            if (primaryKeys.contains(f.getName())) {
+                indexList.add(column.indexOf(f));
             }
         }
-        Assert.isTrue(findPkIndex, "The primaryKey is invalid.");
-        return pkIndex;
+        Assert.isTrue(!CollectionUtils.isEmpty(indexList), "The primaryKeys is invalid.");
+        Integer[] indexArray = (Integer[]) indexList.toArray();
+        return indexArray;
     }
 
     private List<Object> queryDqlData(DqlMapper dqlMapper, List<Object> data) {
         if (!CollectionUtils.isEmpty(data)) {
-            Map<String, Object> row = dqlMapper.mapper.execute(databaseTemplate -> databaseTemplate.queryForMap(dqlMapper.sql, data.get(dqlMapper.pkIndex)));
+            Map<String, Object> row = dqlMapper.mapper.execute(databaseTemplate -> {
+                int size = dqlMapper.primaryKeyIndexArray.length;
+                Object[] args = new Object[size];
+                for (int i = 0; i < size; i++) {
+                    args[i] = data.get(dqlMapper.primaryKeyIndexArray[i]);
+                }
+                return databaseTemplate.queryForMap(dqlMapper.sql, args);
+            });
             if (!CollectionUtils.isEmpty(row)) {
                 data.clear();
                 dqlMapper.column.forEach(field -> data.add(row.get(field.getName())));
@@ -122,15 +135,13 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
     final class DqlMapper {
         DatabaseConnectorMapper mapper;
         String sql;
-        String tableName;
         List<Field> column;
-        int pkIndex;
+        Integer[] primaryKeyIndexArray;
 
-        public DqlMapper(DatabaseConnectorMapper mapper, String sql, String tableName, List<Field> column, int pkIndex) {
+        public DqlMapper(DatabaseConnectorMapper mapper, String sql, List<Field> column, Integer[] primaryKeyIndexArray) {
             this.mapper = mapper;
-            this.tableName = tableName;
             this.column = column;
-            this.pkIndex = pkIndex;
+            this.primaryKeyIndexArray = primaryKeyIndexArray;
             this.sql = sql;
         }
     }

+ 3 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/DqlOracleExtractor.java

@@ -4,6 +4,7 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.connector.model.Field;
 
 import java.util.List;
+import java.util.Set;
 
 /**
  * @author AE86
@@ -24,9 +25,9 @@ public class DqlOracleExtractor extends OracleExtractor {
     }
 
     @Override
-    protected int getPKIndex(List<Field> column, String primaryKey) {
+    protected Integer[] getPrimaryKeyIndexArray(List<Field> column, Set<String> primaryKeys) {
         // ROW_ID
-        return 0;
+        return new Integer[]{0};
     }
 
 }

+ 7 - 14
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -9,6 +9,7 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,15 +97,16 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
     private void execute(TableGroupCommand tableGroupCommand, int index) {
         final Map<String, String> command = tableGroupCommand.getCommand();
-        final String pk = tableGroupCommand.getPk();
+        final Set<String> primaryKeys = tableGroupCommand.getPrimaryKeys();
 
         // 检查增量点
         ConnectorMapper connectionMapper = connectorFactory.connect(connectorConfig);
         Point point = checkLastPoint(command, index);
         int pageIndex = 1;
-        String cursor = snapshot.get(index + CURSOR);
+        Object[] cursors = PrimaryKeyUtil.getLastCursors(snapshot.get(index + CURSOR));
+
         while (running) {
-            Result reader = connectorFactory.reader(connectionMapper, new ReaderConfig(point.getCommand(), point.getArgs(), cursor, pageIndex++, READ_NUM));
+            Result reader = connectorFactory.reader(connectionMapper, new ReaderConfig(point.getCommand(), point.getArgs(), cursors, pageIndex++, READ_NUM));
             List<Map> data = reader.getSuccessData();
             if (CollectionUtils.isEmpty(data)) {
                 break;
@@ -133,19 +135,18 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
             }
             // 更新记录点
-            cursor = getLastCursor(data, pk);
+            cursors = PrimaryKeyUtil.getLastCursors(data, primaryKeys);
             point.refresh();
 
             if (data.size() < READ_NUM) {
                 break;
             }
-
         }
 
         // 持久化
         if (point.refreshed()) {
             snapshot.putAll(point.getPosition());
-            snapshot.put(index + CURSOR, cursor);
+            snapshot.put(index + CURSOR, StringUtil.join(cursors, ","));
         }
 
     }
@@ -154,12 +155,4 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
         this.commands = commands;
     }
 
-    private String getLastCursor(List<Map> data, String pk) {
-        if (!CollectionUtils.isEmpty(data)) {
-            Object value = data.get(data.size() - 1).get(pk);
-            return value == null ? "" : String.valueOf(value);
-        }
-        return "";
-    }
-
 }

+ 6 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/TableGroupCommand.java

@@ -1,20 +1,21 @@
 package org.dbsyncer.listener.quartz;
 
 import java.util.Map;
+import java.util.Set;
 
 public class TableGroupCommand {
 
-    private String pk;
+    private Set<String> primaryKeys;
 
     private Map<String, String> command;
 
-    public TableGroupCommand(String pk, Map<String, String> command) {
-        this.pk = pk;
+    public TableGroupCommand(Set<String> primaryKeys, Map<String, String> command) {
+        this.primaryKeys = primaryKeys;
         this.command = command;
     }
 
-    public String getPk() {
-        return pk;
+    public Set<String> getPrimaryKeys() {
+        return primaryKeys;
     }
 
     public Map<String, String> getCommand() {

+ 5 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -1,6 +1,8 @@
 package org.dbsyncer.manager.puller;
 
 import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.enums.ParserEnum;
@@ -100,7 +102,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         task.setPageIndex(NumberUtil.toInt(snapshot.get(ParserEnum.PAGE_INDEX.getCode()), ParserEnum.PAGE_INDEX.getDefaultValue()));
         // 反序列化游标值类型(通常为数字或字符串类型)
         String cursorValue = snapshot.get(ParserEnum.CURSOR.getCode());
-        task.setCursor(NumberUtil.isCreatable(cursorValue) ? NumberUtil.toLong(cursorValue) : cursorValue);
+        task.setCursors(PrimaryKeyUtil.getLastCursors(cursorValue));
         task.setTableGroupIndex(NumberUtil.toInt(snapshot.get(ParserEnum.TABLE_GROUP_INDEX.getCode()), ParserEnum.TABLE_GROUP_INDEX.getDefaultValue()));
         flush(task);
 
@@ -111,7 +113,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
                 break;
             }
             task.setPageIndex(ParserEnum.PAGE_INDEX.getDefaultValue());
-            task.setCursor(null);
+            task.setCursors(null);
             task.setTableGroupIndex(++i);
             flush(task);
         }
@@ -136,7 +138,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         meta.setEndTime(task.getEndTime());
         Map<String, String> snapshot = meta.getSnapshot();
         snapshot.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(task.getPageIndex()));
-        snapshot.put(ParserEnum.CURSOR.getCode(), String.valueOf(task.getCursor()));
+        snapshot.put(ParserEnum.CURSOR.getCode(), StringUtil.join(task.getCursors(), ","));
         snapshot.put(ParserEnum.TABLE_GROUP_INDEX.getCode(), String.valueOf(task.getTableGroupIndex()));
         manager.editConfigModel(meta);
     }

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -146,8 +146,8 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         if (ListenerTypeEnum.isTiming(listenerType)) {
             AbstractQuartzExtractor quartzExtractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
             quartzExtractor.setCommands(list.stream().map(t -> {
-                String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(t.getSourceTable());
-                return new TableGroupCommand(pk, t.getCommand());
+                Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(t.getSourceTable());
+                return new TableGroupCommand(primaryKeys, t.getCommand());
             }).collect(Collectors.toList()));
             quartzExtractor.register(new QuartzListener(mapping, list));
             extractor = quartzExtractor;

+ 6 - 16
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -52,6 +52,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -153,8 +154,8 @@ public class ParserFactory implements Parser {
         AbstractConnectorConfig tConnConfig = getConnectorConfig(mapping.getTargetConnectorId());
         Table sourceTable = tableGroup.getSourceTable();
         Table targetTable = tableGroup.getTargetTable();
-        Table sTable = new Table(sourceTable.getName(), sourceTable.getType(), sourceTable.getPrimaryKey(), new ArrayList<>(), sourceTable.getSql());
-        Table tTable = new Table(targetTable.getName(), targetTable.getType(), targetTable.getPrimaryKey(), new ArrayList<>(), sourceTable.getSql());
+        Table sTable = new Table(sourceTable.getName(), sourceTable.getType(), sourceTable.getPrimaryKeys(), new ArrayList<>(), sourceTable.getSql());
+        Table tTable = new Table(targetTable.getName(), targetTable.getType(), targetTable.getPrimaryKeys(), new ArrayList<>(), sourceTable.getSql());
         List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
         if (!CollectionUtils.isEmpty(fieldMapping)) {
             fieldMapping.forEach(m -> {
@@ -248,7 +249,7 @@ public class ParserFactory implements Parser {
         Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
         // 获取同步字段
         Picker picker = new Picker(fieldMapping);
-        String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(tableGroup.getSourceTable());
+        Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(tableGroup.getSourceTable());
 
         int pageSize = mapping.getReadNum();
         int batchSize = mapping.getBatchNum();
@@ -264,7 +265,7 @@ public class ParserFactory implements Parser {
             }
 
             // 1、获取数据源数据
-            Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), task.getCursor(), task.getPageIndex(), pageSize));
+            Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), task.getCursors(), task.getPageIndex(), pageSize));
             List<Map> source = reader.getSuccessData();
             if (CollectionUtils.isEmpty(source)) {
                 logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
@@ -291,7 +292,7 @@ public class ParserFactory implements Parser {
 
             // 7、更新结果
             task.setPageIndex(task.getPageIndex() + 1);
-            task.setCursor(getLastCursor(source, pk));
+            task.setCursors(PrimaryKeyUtil.getLastCursors(source, primaryKeys));
             result.setTableGroupId(tableGroup.getId());
             result.setTargetTableGroupName(tTableName);
             flush(task, result);
@@ -426,15 +427,4 @@ public class ParserFactory implements Parser {
         return getConnector(connectorId).getConfig();
     }
 
-    /**
-     * 获取最新游标值
-     *
-     * @param source
-     * @param pk
-     * @return
-     */
-    private Object getLastCursor(List<Map> source, String pk) {
-        return CollectionUtils.isEmpty(source) ? null : source.get(source.size() - 1).get(pk);
-    }
-
 }

+ 5 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Task.java

@@ -10,7 +10,7 @@ public class Task {
 
     private int pageIndex;
 
-    private Object cursor;
+    private Object[] cursors;
 
     private long beginTime;
 
@@ -56,12 +56,12 @@ public class Task {
         this.pageIndex = pageIndex;
     }
 
-    public Object getCursor() {
-        return cursor;
+    public Object[] getCursors() {
+        return cursors;
     }
 
-    public void setCursor(Object cursor) {
-        this.cursor = cursor;
+    public void setCursors(Object[] cursors) {
+        this.cursors = cursors;
     }
 
     public long getBeginTime() {

+ 5 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -39,8 +39,10 @@ import java.io.InputStreamReader;
 import java.sql.SQLSyntaxErrorException;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -401,7 +403,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         }
 
         List<Field> fields = executor.getFields();
-        final SqlBuilderConfig config = new SqlBuilderConfig(connector, "", table, ConfigConstant.CONFIG_MODEL_ID, fields, "", "");
+        Set<String> primaryKeys = new HashSet<>();
+        primaryKeys.add(ConfigConstant.CONFIG_MODEL_ID);
+        final SqlBuilderConfig config = new SqlBuilderConfig(connector, "", table, primaryKeys, fields, "", "");
 
         String query = SqlBuilderEnum.QUERY.getSqlBuilder().buildQuerySql(config);
         String insert = SqlBuilderEnum.INSERT.getSqlBuilder().buildSql(config);