소스 검색

符合主键

Signed-off-by: AE86 <836391306@qq.com>
AE86 2 년 전
부모
커밋
3ad356bca4
23개의 변경된 파일118개의 추가작업 그리고 90개의 파일을 삭제
  1. 11 9
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java
  2. 4 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java
  3. 10 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java
  4. 3 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java
  5. 41 17
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  6. 2 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderDelete.java
  7. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderUpdate.java
  8. 5 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java
  9. 5 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/PageSql.java
  10. 4 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Table.java
  11. 3 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java
  12. 3 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java
  13. 4 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java
  14. 2 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java
  15. 1 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java
  16. 8 9
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PrimaryKeyUtil.java
  17. 2 3
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractDatabaseExtractor.java
  18. 1 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/DqlOracleExtractor.java
  19. 1 1
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java
  20. 4 4
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/TableGroupCommand.java
  21. 1 1
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  22. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  23. 1 3
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

+ 11 - 9
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -26,7 +26,7 @@ import org.springframework.util.Assert;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -139,18 +139,20 @@ public class TableGroupChecker extends AbstractChecker {
         MetaInfo metaInfo = manager.getMetaInfo(connectorId, tableName);
         Assert.notNull(metaInfo, "无法获取连接器表信息:" + tableName);
         // 自定义主键
-        Set<String> primaryKeys = new HashSet<>();
+        List<String> primaryKeys = new ArrayList<>();
         if (StringUtil.isNotBlank(primaryKeyStr)) {
             String[] pks = StringUtil.split(primaryKeyStr, ",");
-            primaryKeys.addAll(Arrays.asList(pks));
+            Set<String> keys = new LinkedHashSet<>(Arrays.asList(pks));
+            primaryKeys.addAll(keys);
         }
         if (!CollectionUtils.isEmpty(primaryKeys) && !CollectionUtils.isEmpty(metaInfo.getColumn())) {
-            for (Field field : metaInfo.getColumn()) {
-                if (primaryKeys.contains(field.getName())) {
-                    field.setPk(true);
-                    break;
-                }
-            }
+            primaryKeys.forEach(pk ->
+                metaInfo.getColumn().forEach(field -> {
+                    if(StringUtil.equalsIgnoreCase(field.getName(), pk)){
+                        field.setPk(true);
+                    }
+                })
+            );
         }
         return new Table(tableName, metaInfo.getTableType(), primaryKeys, metaInfo.getColumn(), metaInfo.getSql());
     }

+ 4 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java

@@ -8,6 +8,10 @@ public abstract class StringUtil {
         return StringUtils.equals(cs1, cs2);
     }
 
+    public static boolean equalsIgnoreCase(CharSequence cs1, CharSequence cs2) {
+        return StringUtils.equalsIgnoreCase(cs1, cs2);
+    }
+
     public static boolean isBlank(CharSequence cs) {
         return StringUtils.isBlank(cs);
     }

+ 10 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java

@@ -11,6 +11,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -70,7 +71,7 @@ public abstract class AbstractConnector {
         for (Map row : config.getData()) {
             // 根据目标字段类型转换值
             for (Field f : config.getFields()) {
-                if(null == f){
+                if (null == f) {
                     continue;
                 }
                 // 根据字段类型转换值
@@ -88,13 +89,18 @@ public abstract class AbstractConnector {
         }
     }
 
-    protected Field getPrimaryKeyField(List<Field> fields) {
+    protected List<Field> getPrimaryKeys(List<Field> fields) {
+        List<Field> list = new ArrayList<>();
+
         for (Field f : fields) {
             if (f.isPk()) {
-                return f;
+                list.add(f);
             }
         }
-        throw new ConnectorException("主键为空");
+        if (CollectionUtils.isEmpty(list)) {
+            throw new ConnectorException("主键为空");
+        }
+        return list;
     }
 
     protected boolean isUpdate(String event) {

+ 3 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java

@@ -4,7 +4,6 @@ import org.dbsyncer.connector.database.Database;
 import org.dbsyncer.connector.model.Field;
 
 import java.util.List;
-import java.util.Set;
 
 public class SqlBuilderConfig {
 
@@ -14,7 +13,7 @@ public class SqlBuilderConfig {
     // 表名
     private String tableName;
     // 主键列表
-    private Set<String> primaryKeys;
+    private List<String> primaryKeys;
     // 字段
     private List<Field> fields;
     // 过滤条件
@@ -22,7 +21,7 @@ public class SqlBuilderConfig {
     // 引号
     private String quotation;
 
-    public SqlBuilderConfig(Database database, String schema, String tableName, Set<String> primaryKeys, List<Field> fields, String queryFilter, String quotation) {
+    public SqlBuilderConfig(Database database, String schema, String tableName, List<String> primaryKeys, List<Field> fields, String queryFilter, String quotation) {
         this.database = database;
         this.schema = schema;
         this.tableName = tableName;
@@ -44,7 +43,7 @@ public class SqlBuilderConfig {
         return tableName;
     }
 
-    public Set<String> getPrimaryKeys() {
+    public List<String> getPrimaryKeys() {
         return primaryKeys;
     }
 

+ 41 - 17
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -152,15 +153,15 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             throw new ConnectorException("writer data can not be empty.");
         }
         List<Field> fields = new ArrayList<>(config.getFields());
-        Field pkField = getPrimaryKeyField(config.getFields());
+        List<Field> pkFields = getPrimaryKeys(config.getFields());
         // Update / Delete
         if (!isInsert(event)) {
             if (isDelete(event)) {
                 fields.clear();
             } else if (isUpdate(event)) {
-                fields.remove(pkField);
+                removeFieldWithPk(fields, pkFields);
             }
-            fields.add(pkField);
+            fields.addAll(pkFields);
         }
 
         Result result = new Result();
@@ -170,7 +171,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             execute = connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(executeSql, batchRows(fields, data)));
         } catch (Exception e) {
             logger.error(e.getMessage());
-            data.forEach(row -> forceUpdate(result, connectorMapper, config, pkField, row));
+            data.forEach(row -> forceUpdate(result, connectorMapper, config, pkFields, row));
         }
 
         if (null != execute) {
@@ -180,7 +181,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
                     result.getSuccessData().add(data.get(i));
                     continue;
                 }
-                forceUpdate(result, connectorMapper, config, pkField, data.get(i));
+                forceUpdate(result, connectorMapper, config, pkFields, data.get(i));
             }
         }
         return result;
@@ -223,7 +224,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
         // 获取查询数据行是否存在
         String tableName = commandConfig.getTable().getName();
-        Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
+        List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
         StringBuilder queryCount = new StringBuilder("SELECT COUNT(1) FROM ").append(schema).append(quotation).append(tableName).append(quotation).append(" WHERE ");
         // id = ? AND uid = ?
         PrimaryKeyUtil.buildSql(queryCount, primaryKeys, quotation, " AND ", " = ? ", true);
@@ -341,7 +342,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
      */
     protected String getQueryCountSql(CommandConfig commandConfig, String schema, String quotation, String queryFilterSql) {
         String table = commandConfig.getTable().getName();
-        Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
+        List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
         StringBuilder sql = new StringBuilder();
         sql.append("SELECT COUNT(1) FROM (SELECT 1 FROM ").append(schema).append(quotation).append(table).append(quotation);
         if (StringUtil.isNotBlank(queryFilterSql)) {
@@ -492,7 +493,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             logger.error("Table name can not be empty.");
             throw new ConnectorException("Table name can not be empty.");
         }
-        Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
+        List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
         SqlBuilderConfig config = new SqlBuilderConfig(this, schema, tableName, primaryKeys, fields, queryFilterSQL, buildSqlWithQuotation());
         return SqlBuilderEnum.getSqlBuilder(type).buildSql(config);
     }
@@ -550,18 +551,24 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return args;
     }
 
-    private void forceUpdate(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField,
+    private void forceUpdate(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, List<Field> pkFields,
                              Map row) {
         if (isUpdate(config.getEvent()) || isInsert(config.getEvent())) {
             // 存在执行覆盖更新,否则写入
             final String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
-            final String event = existRow(connectorMapper, queryCount, row.get(pkField.getName())) ? ConnectorConstant.OPERTION_UPDATE : ConnectorConstant.OPERTION_INSERT;
+            int size = pkFields.size();
+            Object[] args = new Object[size];
+            for (int i = 0; i < size; i++) {
+                args[i] = row.get(pkFields.get(i).getName());
+            }
+            final String event = existRow(connectorMapper, queryCount, args) ? ConnectorConstant.OPERTION_UPDATE
+                    : ConnectorConstant.OPERTION_INSERT;
             logger.warn("{}表执行{}失败, 重新执行{}, {}", config.getTableName(), config.getEvent(), event, row);
-            writer(result, connectorMapper, config, pkField, row, event);
+            writer(result, connectorMapper, config, pkFields, row, event);
         }
     }
 
-    private void writer(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField, Map row,
+    private void writer(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, List<Field> pkFields, Map row,
                         String event) {
         // 1、获取 SQL
         String sql = config.getCommand().get(event);
@@ -572,9 +579,9 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             if (isDelete(event)) {
                 fields.clear();
             } else if (isUpdate(event)) {
-                fields.remove(pkField);
+                fields.remove(pkFields);
             }
-            fields.add(pkField);
+            fields.addAll(pkFields);
         }
 
         try {
@@ -593,12 +600,12 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         }
     }
 
-    private boolean existRow(DatabaseConnectorMapper connectorMapper, String sql, Object value) {
+    private boolean existRow(DatabaseConnectorMapper connectorMapper, String sql, Object[] args) {
         int rowNum = 0;
         try {
-            rowNum = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, new Object[]{value}, Integer.class));
+            rowNum = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, Integer.class, args));
         } catch (Exception e) {
-            logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, value);
+            logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, args);
         }
         return rowNum > 0;
     }
@@ -608,4 +615,21 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return !CollectionUtils.isEmpty(pk) && pk.contains(name);
     }
 
+    private void removeFieldWithPk(List<Field> fields, List<Field> pkFields) {
+        if (CollectionUtils.isEmpty(fields) || CollectionUtils.isEmpty(pkFields)) {
+            return;
+        }
+
+        pkFields.forEach(pkField -> {
+            Iterator<Field> iterator = fields.iterator();
+            while (iterator.hasNext()) {
+                Field next = iterator.next();
+                if (next != null && StringUtil.equals(next.getName(), pkField.getName())) {
+                    iterator.remove();
+                    break;
+                }
+            }
+        });
+    }
+
 }

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderDelete.java

@@ -4,7 +4,7 @@ import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
 import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
-import java.util.Set;
+import java.util.List;
 
 /**
  * @author AE86
@@ -17,7 +17,7 @@ public class SqlBuilderDelete extends AbstractSqlBuilder {
     public String buildSql(SqlBuilderConfig config) {
         String tableName = config.getTableName();
         String quotation = config.getQuotation();
-        Set<String> primaryKeys = config.getPrimaryKeys();
+        List<String> primaryKeys = config.getPrimaryKeys();
         // DELETE FROM "USER" WHERE "ID"=? AND "UID" = ?
         StringBuilder sql = new StringBuilder().append("DELETE FROM ").append(config.getSchema()).append(quotation).append(tableName).append(quotation).append(" WHERE ");
         PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " = ? ", true);

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderUpdate.java

@@ -22,7 +22,7 @@ public class SqlBuilderUpdate extends AbstractSqlBuilder {
         List<Field> fields = config.getFields();
         String quotation = config.getQuotation();
         StringBuilder sql = new StringBuilder();
-        Set<String> primaryKeys = config.getPrimaryKeys();
+        List<String> primaryKeys = config.getPrimaryKeys();
         int size = fields.size();
         sql.append("UPDATE ").append(config.getSchema()).append(quotation).append(tableName).append(quotation).append(" SET ");
         for (int i = 0; i < size; i++) {

+ 5 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -188,11 +188,12 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
 
         final Result result = new Result();
         final ESConfig cfg = connectorMapper.getConfig();
-        final Field pkField = getPrimaryKeyField(config.getFields());
-        final String primaryKeyName = pkField.getName();
+        final List<Field> pkFields = getPrimaryKeys(config.getFields());
         try {
             BulkRequest request = new BulkRequest();
-            data.forEach(row -> addRequest(request, cfg.getIndex(), cfg.getType(), config.getEvent(), String.valueOf(row.get(primaryKeyName)), row));
+            // 默认取第一个主键
+            final String pk = pkFields.get(0).getName();
+            data.forEach(row -> addRequest(request, cfg.getIndex(), cfg.getType(), config.getEvent(), String.valueOf(row.get(pk)), row));
 
             BulkResponse response = connectorMapper.getConnection().bulk(request, RequestOptions.DEFAULT);
             RestStatus restStatus = response.status();
@@ -232,7 +233,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
     public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
         Table table = commandConfig.getTable();
         if (!CollectionUtils.isEmpty(table.getColumn())) {
-            getPrimaryKeyField(table.getColumn());
+            getPrimaryKeys(table.getColumn());
         }
         return Collections.EMPTY_MAP;
     }

+ 5 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/PageSql.java

@@ -2,7 +2,7 @@ package org.dbsyncer.connector.model;
 
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 
-import java.util.Set;
+import java.util.List;
 
 public class PageSql {
 
@@ -12,15 +12,15 @@ public class PageSql {
 
     private String quotation;
 
-    private Set<String> primaryKeys;
+    private List<String> primaryKeys;
 
-    public PageSql(String querySql, String quotation, Set<String> primaryKeys) {
+    public PageSql(String querySql, String quotation, List<String> primaryKeys) {
         this.querySql = querySql;
         this.quotation = quotation;
         this.primaryKeys = primaryKeys;
     }
 
-    public PageSql(SqlBuilderConfig sqlBuilderConfig, String querySql, Set<String> primaryKeys) {
+    public PageSql(SqlBuilderConfig sqlBuilderConfig, String querySql, List<String> primaryKeys) {
         this.sqlBuilderConfig = sqlBuilderConfig;
         this.querySql = querySql;
         this.primaryKeys = primaryKeys;
@@ -38,7 +38,7 @@ public class PageSql {
         return quotation;
     }
 
-    public Set<String> getPrimaryKeys() {
+    public List<String> getPrimaryKeys() {
         return primaryKeys;
     }
 }

+ 4 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Table.java

@@ -3,7 +3,6 @@ package org.dbsyncer.connector.model;
 import org.dbsyncer.connector.enums.TableTypeEnum;
 
 import java.util.List;
-import java.util.Set;
 
 /**
  * @author AE86
@@ -25,7 +24,7 @@ public class Table {
     /**
      * 主键列表
      */
-    private Set<String> primaryKeys;
+    private List<String> primaryKeys;
 
     /**
      * 属性字段
@@ -52,7 +51,7 @@ public class Table {
         this(name, type, null, null, null);
     }
 
-    public Table(String name, String type, Set<String> primaryKeys, List<Field> column, String sql) {
+    public Table(String name, String type, List<String> primaryKeys, List<Field> column, String sql) {
         this.name = name;
         this.type = type;
         this.primaryKeys = primaryKeys;
@@ -76,11 +75,11 @@ public class Table {
         this.type = type;
     }
 
-    public Set<String> getPrimaryKeys() {
+    public List<String> getPrimaryKeys() {
         return primaryKeys;
     }
 
-    public void setPrimaryKeys(Set<String> primaryKeys) {
+    public void setPrimaryKeys(List<String> primaryKeys) {
         this.primaryKeys = primaryKeys;
     }
 

+ 3 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -6,7 +6,7 @@ import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
-import java.util.Set;
+import java.util.List;
 
 public final class MysqlConnector extends AbstractDatabaseConnector {
 
@@ -18,7 +18,7 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
     @Override
     public String getPageSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
-        final Set<String> primaryKeys = config.getPrimaryKeys();
+        final List<String> primaryKeys = config.getPrimaryKeys();
         // select * from test.`my_user` where `id` > ? and `uid` > ? order by `id`,`uid` limit ?
         StringBuilder sql = new StringBuilder(config.getQuerySql());
         boolean blank = StringUtil.isBlank(config.getSqlBuilderConfig().getQueryFilter());
@@ -33,7 +33,7 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
     @Override
     public String getPageCursorSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
-        final Set<String> primaryKeys = config.getPrimaryKeys();
+        final List<String> primaryKeys = config.getPrimaryKeys();
         // select * from test.`my_user` order by `id`,`uid` limit ?
         StringBuilder sql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ");
         PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);

+ 3 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -11,7 +11,7 @@ import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
 import java.sql.Types;
-import java.util.Set;
+import java.util.List;
 
 public final class PostgreSQLConnector extends AbstractDatabaseConnector {
 
@@ -27,7 +27,7 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
     @Override
     public String getPageSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
-        final Set<String> primaryKeys = config.getPrimaryKeys();
+        final List<String> primaryKeys = config.getPrimaryKeys();
         // select * from test.`my_user` where `id` > ? and `uid` > ? order by `id`,`uid` limit ?
         StringBuilder sql = new StringBuilder(config.getQuerySql());
         boolean blank = StringUtil.isBlank(config.getSqlBuilderConfig().getQueryFilter());
@@ -43,7 +43,7 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
     @Override
     public String getPageCursorSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
-        final Set<String> primaryKeys = config.getPrimaryKeys();
+        final List<String> primaryKeys = config.getPrimaryKeys();
         // select * from test.`my_user` order by `id`,`uid` limit ?
         StringBuilder sql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ");
         PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);

+ 4 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java

@@ -20,7 +20,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * @author AE86
@@ -35,9 +34,9 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
         List<SqlTable> sqlTables = cfg.getSqlTables();
         List<Table> tables = new ArrayList<>();
         if (!CollectionUtils.isEmpty(sqlTables)) {
-            sqlTables.forEach(s -> {
-                tables.add(new Table(s.getSqlName(), TableTypeEnum.TABLE.getCode(), Collections.EMPTY_SET, Collections.EMPTY_LIST, s.getSql()));
-            });
+            sqlTables.forEach(s ->
+                tables.add(new Table(s.getSqlName(), TableTypeEnum.TABLE.getCode(), Collections.EMPTY_LIST, Collections.EMPTY_LIST, s.getSql()))
+            );
         }
         return tables;
     }
@@ -75,7 +74,7 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
         // 获取过滤SQL
         String queryFilterSql = getQueryFilterSql(commandConfig.getFilter());
         Table table = commandConfig.getTable();
-        Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
+        List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
 
         // 获取查询SQL
         Map<String, String> map = new HashMap<>();

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java

@@ -5,14 +5,14 @@ import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
-import java.util.Set;
+import java.util.List;
 
 public final class DQLSqlServerConnector extends AbstractDQLConnector {
 
     @Override
     public String getPageSql(PageSql config) {
         String quotation = config.getQuotation();
-        Set<String> primaryKeys = config.getPrimaryKeys();
+        List<String> primaryKeys = config.getPrimaryKeys();
         StringBuilder orderBy = new StringBuilder();
         PrimaryKeyUtil.buildSql(orderBy, primaryKeys, quotation, " AND ", " = ? ", true);
         return String.format(DatabaseConstant.SQLSERVER_PAGE_SQL, orderBy.toString(), config.getQuerySql());

+ 1 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -15,7 +15,6 @@ import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 public final class SqlServerConnector extends AbstractDatabaseConnector {
@@ -35,7 +34,7 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
     @Override
     public String getPageSql(PageSql config) {
         String quotation = config.getQuotation();
-        Set<String> primaryKeys = config.getPrimaryKeys();
+        List<String> primaryKeys = config.getPrimaryKeys();
         StringBuilder orderBy = new StringBuilder();
         PrimaryKeyUtil.buildSql(orderBy, primaryKeys, quotation, " AND ", " = ? ", true);
         return String.format(DatabaseConstant.SQLSERVER_PAGE_SQL, orderBy.toString(), config.getQuerySql());

+ 8 - 9
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PrimaryKeyUtil.java

@@ -7,12 +7,11 @@ import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Table;
 
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public abstract class PrimaryKeyUtil {
@@ -23,22 +22,22 @@ public abstract class PrimaryKeyUtil {
      * @param table
      * @return
      */
-    public static Set<String> findOriginalTablePrimaryKey(Table table) {
+    public static List<String> findOriginalTablePrimaryKey(Table table) {
         if (null == table) {
             throw new ConnectorException("The table is null.");
         }
 
         // 获取自定义主键
         if (!CollectionUtils.isEmpty(table.getPrimaryKeys())) {
-            return Collections.unmodifiableSet(table.getPrimaryKeys());
+            return Collections.unmodifiableList(table.getPrimaryKeys());
         }
 
         // 获取表原始主键
-        Set<String> primaryKeys = new HashSet<>();
+        List<String> primaryKeys = new ArrayList<>();
         List<Field> column = table.getColumn();
         if (!CollectionUtils.isEmpty(column)) {
             for (Field c : column) {
-                if (c.isPk()) {
+                if (c.isPk() && !primaryKeys.contains(c.getName())) {
                     primaryKeys.add(c.getName());
                 }
             }
@@ -47,10 +46,10 @@ public abstract class PrimaryKeyUtil {
         if (CollectionUtils.isEmpty(primaryKeys)) {
             throw new ConnectorException(String.format("The primary key of table '%s' is null.", table.getName()));
         }
-        return Collections.unmodifiableSet(primaryKeys);
+        return Collections.unmodifiableList(primaryKeys);
     }
 
-    public static void buildSql(StringBuilder sql, Set<String> primaryKeys, String quotation, String join, String value, boolean skipFirst) {
+    public static void buildSql(StringBuilder sql, List<String> primaryKeys, String quotation, String join, String value, boolean skipFirst) {
         AtomicBoolean added = new AtomicBoolean();
         primaryKeys.forEach(pk -> {
             // skip first pk
@@ -74,7 +73,7 @@ public abstract class PrimaryKeyUtil {
      * @param primaryKeys
      * @return
      */
-    public static Object[] getLastCursors(List<Map> data, Set<String> primaryKeys) {
+    public static Object[] getLastCursors(List<Map> data, List<String> primaryKeys) {
         if (CollectionUtils.isEmpty(data)) {
             return null;
         }

+ 2 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractDatabaseExtractor.java

@@ -16,7 +16,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -73,7 +72,7 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
         for (Table t : sourceTable) {
             String sql = t.getSql();
             String sqlName = t.getName();
-            Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(t);
+            List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(t);
             String tableName = tableMap.get(sqlName);
             Assert.hasText(sql, "The sql is null.");
             Assert.hasText(tableName, "The tableName is null.");
@@ -102,7 +101,7 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
      * @param primaryKeys
      * @return
      */
-    protected Integer[] getPrimaryKeyIndexArray(List<Field> column, Set<String> primaryKeys) {
+    protected Integer[] getPrimaryKeyIndexArray(List<Field> column, List<String> primaryKeys) {
         List<Integer> indexList = new ArrayList<>();
         for (Field f : column) {
             if (primaryKeys.contains(f.getName())) {

+ 1 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/DqlOracleExtractor.java

@@ -4,7 +4,6 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.connector.model.Field;
 
 import java.util.List;
-import java.util.Set;
 
 /**
  * @author AE86
@@ -25,7 +24,7 @@ public class DqlOracleExtractor extends OracleExtractor {
     }
 
     @Override
-    protected Integer[] getPrimaryKeyIndexArray(List<Field> column, Set<String> primaryKeys) {
+    protected Integer[] getPrimaryKeyIndexArray(List<Field> column, List<String> primaryKeys) {
         // ROW_ID
         return new Integer[]{0};
     }

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -97,7 +97,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
     private void execute(TableGroupCommand tableGroupCommand, int index) {
         final Map<String, String> command = tableGroupCommand.getCommand();
-        final Set<String> primaryKeys = tableGroupCommand.getPrimaryKeys();
+        final List<String> primaryKeys = tableGroupCommand.getPrimaryKeys();
 
         // 检查增量点
         ConnectorMapper connectionMapper = connectorFactory.connect(connectorConfig);

+ 4 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/TableGroupCommand.java

@@ -1,20 +1,20 @@
 package org.dbsyncer.listener.quartz;
 
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 public class TableGroupCommand {
 
-    private Set<String> primaryKeys;
+    private List<String> primaryKeys;
 
     private Map<String, String> command;
 
-    public TableGroupCommand(Set<String> primaryKeys, Map<String, String> command) {
+    public TableGroupCommand(List<String> primaryKeys, Map<String, String> command) {
         this.primaryKeys = primaryKeys;
         this.command = command;
     }
 
-    public Set<String> getPrimaryKeys() {
+    public List<String> getPrimaryKeys() {
         return primaryKeys;
     }
 

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -146,7 +146,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         if (ListenerTypeEnum.isTiming(listenerType)) {
             AbstractQuartzExtractor quartzExtractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
             quartzExtractor.setCommands(list.stream().map(t -> {
-                Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(t.getSourceTable());
+                List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(t.getSourceTable());
                 return new TableGroupCommand(primaryKeys, t.getCommand());
             }).collect(Collectors.toList()));
             quartzExtractor.register(new QuartzListener(mapping, list));

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -249,7 +249,7 @@ public class ParserFactory implements Parser {
         Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
         // 获取同步字段
         Picker picker = new Picker(fieldMapping);
-        Set<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(tableGroup.getSourceTable());
+        List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(tableGroup.getSourceTable());
 
         int pageSize = mapping.getReadNum();
         int batchSize = mapping.getBatchNum();

+ 1 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -39,10 +39,8 @@ import java.io.InputStreamReader;
 import java.sql.SQLSyntaxErrorException;
 import java.sql.Types;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -403,7 +401,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         }
 
         List<Field> fields = executor.getFields();
-        Set<String> primaryKeys = new HashSet<>();
+        List<String> primaryKeys = new ArrayList<>();
         primaryKeys.add(ConfigConstant.CONFIG_MODEL_ID);
         final SqlBuilderConfig config = new SqlBuilderConfig(connector, "", table, primaryKeys, fields, "", "");