Explorar el Código

支持转换sqlserver 关键字 https://gitee.com/ghi/dbsyncer/issues/I7R9Y3

Signed-off-by: AE86 <836391306@qq.com>
AE86 hace 1 año
padre
commit
34a90885bf
Se han modificado 24 ficheros con 472 adiciones y 308 borrados
  1. 10 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/CollectionUtils.java
  2. 7 8
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java
  3. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/constant/ConnectorConstant.java
  4. 104 158
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  5. 67 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/Database.java
  6. 9 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderDelete.java
  7. 13 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderInsert.java
  8. 13 10
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQuery.java
  9. 41 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryCount.java
  10. 8 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryCursor.java
  11. 33 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryExist.java
  12. 12 6
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderUpdate.java
  13. 17 13
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/SqlBuilderEnum.java
  14. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java
  15. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java
  16. 14 14
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/PageSql.java
  17. 9 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java
  18. 13 11
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java
  19. 11 9
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java
  20. 4 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java
  21. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java
  22. 43 15
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java
  23. 39 32
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PrimaryKeyUtil.java
  24. 1 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

+ 10 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/CollectionUtils.java

@@ -1,7 +1,10 @@
 package org.dbsyncer.common.util;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 
 public abstract class CollectionUtils {
 
@@ -16,4 +19,11 @@ public abstract class CollectionUtils {
     public static boolean isEmpty(Map<?, ?> map) {
         return map == null || map.isEmpty();
     }
+
+    public static <T> HashSet<T> newHashSet(T... elements) {
+        Objects.requireNonNull(elements);
+        HashSet<T> set = new HashSet(elements.length);
+        Collections.addAll(set, elements);
+        return set;
+    }
 }

+ 7 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java

@@ -8,27 +8,30 @@ import java.util.List;
 public class SqlBuilderConfig {
 
     private Database database;
+
     // 架构名
     private String schema;
+
     // 表名
     private String tableName;
+
     // 主键列表
     private List<String> primaryKeys;
+
     // 字段
     private List<Field> fields;
+
     // 过滤条件
     private String queryFilter;
-    // 引号
-    private String quotation;
 
-    public SqlBuilderConfig(Database database, String schema, String tableName, List<String> primaryKeys, List<Field> fields, String queryFilter, String quotation) {
+    public SqlBuilderConfig(Database database, String schema, String tableName, List<String> primaryKeys,
+                            List<Field> fields, String queryFilter) {
         this.database = database;
         this.schema = schema;
         this.tableName = tableName;
         this.primaryKeys = primaryKeys;
         this.fields = fields;
         this.queryFilter = queryFilter;
-        this.quotation = quotation;
     }
 
     public Database getDatabase() {
@@ -54,8 +57,4 @@ public class SqlBuilderConfig {
     public String getQueryFilter() {
         return queryFilter;
     }
-
-    public String getQuotation() {
-        return quotation;
-    }
 }

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/constant/ConnectorConstant.java

@@ -45,6 +45,6 @@ public class ConnectorConstant {
     /**
      * 查询数据行是否存在
      */
-    public static final String OPERTION_QUERY_COUNT_EXIST = "QUERY_COUNT_EXIST";
+    public static final String OPERTION_QUERY_EXIST = "QUERY_COUNT_EXIST";
 
 }

+ 104 - 158
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -157,7 +157,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             throw new ConnectorException("writer data can not be empty.");
         }
         List<Field> fields = new ArrayList<>(config.getFields());
-        List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeys(config);
+        List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
         // Update / Delete
         if (!isInsert(event)) {
             if (isDelete(event)) {
@@ -193,126 +193,102 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
     @Override
     public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
+        Table table = commandConfig.getTable();
+        List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(table);
+        if (CollectionUtils.isEmpty(primaryKeys)) {
+            return new HashMap<>();
+        }
+
+        String tableName = table.getName();
+        if (StringUtil.isBlank(tableName)) {
+            logger.error("数据源表不能为空.");
+            throw new ConnectorException("数据源表不能为空.");
+        }
+
+        // 架构名
+        String schema = getSchemaWithQuotation((DatabaseConfig) commandConfig.getConnectorConfig());
+        // 同步字段
+        List<Field> column = filterColumn(table.getColumn());
         // 获取过滤SQL
-        final String queryFilterSql = getQueryFilterSql(commandConfig.getFilter());
-        final String quotation = buildSqlWithQuotation();
+        final String queryFilterSql = getQueryFilterSql(commandConfig);
+        SqlBuilderConfig config = new SqlBuilderConfig(this, schema, tableName, primaryKeys, column, queryFilterSql);
 
         // 获取查询SQL
         Map<String, String> map = new HashMap<>();
-        String schema = getSchema((DatabaseConfig) commandConfig.getConnectorConfig(), quotation);
-        map.put(ConnectorConstant.OPERTION_QUERY, buildSql(ConnectorConstant.OPERTION_QUERY, commandConfig, schema, queryFilterSql));
+        buildSql(map, SqlBuilderEnum.QUERY, config);
         // 是否支持游标
         if (enableCursor()) {
-            map.put(ConnectorConstant.OPERTION_QUERY_CURSOR, buildSql(ConnectorConstant.OPERTION_QUERY_CURSOR, commandConfig, schema, queryFilterSql));
+            buildSql(map, SqlBuilderEnum.QUERY_CURSOR, config);
         }
         // 获取查询总数SQL
-        map.put(ConnectorConstant.OPERTION_QUERY_COUNT, getQueryCountSql(commandConfig, schema, quotation, queryFilterSql));
+        map.put(ConnectorConstant.OPERTION_QUERY_COUNT, getQueryCountSql(commandConfig, primaryKeys, schema, queryFilterSql));
         return map;
     }
 
     @Override
     public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
-        String quotation = buildSqlWithQuotation();
-        String schema = getSchema((DatabaseConfig) commandConfig.getConnectorConfig(), quotation);
-
-        // 获取增删改SQL
-        Map<String, String> map = new HashMap<>();
-        String insert = SqlBuilderEnum.INSERT.getName();
-        map.put(insert, buildSql(insert, commandConfig, schema, null));
-
-        String update = SqlBuilderEnum.UPDATE.getName();
-        map.put(update, buildSql(update, commandConfig, schema, null));
-
-        String delete = SqlBuilderEnum.DELETE.getName();
-        map.put(delete, buildSql(delete, commandConfig, schema, null));
-
-        // 获取查询数据行是否存在
-        String tableName = commandConfig.getTable().getName();
-        List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(commandConfig.getTable());
-        if (!CollectionUtils.isEmpty(primaryKeys)) {
-            StringBuilder queryCount = new StringBuilder("SELECT COUNT(1) FROM ").append(schema).append(quotation).append(tableName).append(quotation).append(" WHERE ");
-            // id = ? AND uid = ?
-            PrimaryKeyUtil.buildSql(queryCount, primaryKeys, quotation, " AND ", " = ? ", true);
+        Table table = commandConfig.getTable();
+        List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(table);
+        if (CollectionUtils.isEmpty(primaryKeys)) {
+            return new HashMap<>();
+        }
 
-            String queryCountExist = ConnectorConstant.OPERTION_QUERY_COUNT_EXIST;
-            map.put(queryCountExist, queryCount.toString());
+        String tableName = table.getName();
+        if (StringUtil.isBlank(tableName)) {
+            logger.error("目标源表不能为空.");
+            throw new ConnectorException("目标源表不能为空.");
         }
-        return map;
-    }
 
-    /**
-     * 查询语句表名和字段带上引号(默认不加)
-     *
-     * @return
-     */
-    public String buildSqlWithQuotation() {
-        return "";
-    }
+        // 架构名
+        String schema = getSchemaWithQuotation((DatabaseConfig) commandConfig.getConnectorConfig());
+        // 同步字段
+        List<Field> column = filterColumn(table.getColumn());
+        SqlBuilderConfig config = new SqlBuilderConfig(this, schema, tableName, primaryKeys, column, null);
 
-    /**
-     * 是否使用游标查询
-     *
-     * @return
-     */
-    protected boolean enableCursor() {
-        return false;
+        // 获取增删改SQL
+        Map<String, String> map = new HashMap<>();
+        buildSql(map, SqlBuilderEnum.INSERT, config);
+        buildSql(map, SqlBuilderEnum.UPDATE, config);
+        buildSql(map, SqlBuilderEnum.DELETE, config);
+        buildSql(map, SqlBuilderEnum.QUERY_EXIST, config);
+        return map;
     }
 
     /**
-     * 是否支持游标配置
+     * 获取架构名
      *
      * @param config
      * @return
      */
-    protected boolean isSupportedCursor(PageSql config) {
-        final List<String> primaryKeys = config.getPrimaryKeys();
-        final SqlBuilderConfig sqlBuilderConfig = config.getSqlBuilderConfig();
-        final Map<String, Integer> typeAliases = PrimaryKeyUtil.findPrimaryKeyType(sqlBuilderConfig.getFields());
-        return PrimaryKeyUtil.isSupportedCursor(typeAliases, primaryKeys);
+    protected String getSchema(DatabaseConfig config) {
+        return config.getSchema();
     }
 
     /**
-     * 如果满足游标,追加主键排序
+     * 获取架构名
      *
      * @param config
-     * @param sql
-     */
-    protected void appendOrderByPkIfSupportedCursor(PageSql config, StringBuilder sql) {
-        if (isSupportedCursor(config)) {
-            sql.append(" ORDER BY ");
-            final List<String> primaryKeys = config.getPrimaryKeys();
-            final String quotation = config.getQuotation();
-            PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
-        }
-    }
-
-    /**
-     * 健康检查
-     *
      * @return
      */
-    protected String getValidationQuery() {
-        return "select 1";
-    }
-
-    /**
-     * 获取条件值引号
-     *
-     * @param value
-     * @return
-     */
-    protected String buildSqlFilterWithQuotation(String value) {
-        return "'";
+    protected String getSchemaWithQuotation(DatabaseConfig config) {
+        StringBuilder schema = new StringBuilder();
+        if (StringUtil.isNotBlank(config.getSchema())) {
+            String quotation = buildSqlWithQuotation();
+            schema.append(quotation).append(config.getSchema()).append(quotation).append(".").toString();
+        }
+        return schema.toString();
     }
 
     /**
-     * 获取架构名
+     * 满足游标查询条件,追加主键排序
      *
      * @param config
-     * @return
+     * @param sql
      */
-    protected String getSchema(DatabaseConfig config) {
-        return config.getSchema();
+    protected void appendOrderByPk(PageSql config, StringBuilder sql) {
+        sql.append(" ORDER BY ");
+        final String quotation = buildSqlWithQuotation();
+        PrimaryKeyUtil.buildSql(sql, config.getPrimaryKeys(), quotation, ",", "", true);
     }
 
     /**
@@ -369,50 +345,45 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
      * 获取查询总数SQL
      *
      * @param commandConfig
+     * @param primaryKeys
      * @param schema
-     * @param quotation
      * @param queryFilterSql
      * @return
      */
-    protected String getQueryCountSql(CommandConfig commandConfig, String schema, String quotation, String queryFilterSql) {
-        String table = commandConfig.getTable().getName();
-        List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(commandConfig.getTable());
-        StringBuilder sql = new StringBuilder();
-        if (!CollectionUtils.isEmpty(primaryKeys)) {
-            sql.append("SELECT COUNT(1) FROM (SELECT 1 FROM ").append(schema).append(quotation).append(table).append(quotation);
-            if (StringUtil.isNotBlank(queryFilterSql)) {
-                sql.append(queryFilterSql);
-            }
-            sql.append(" GROUP BY ");
-            // id,uid
-            PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
-            sql.append(") DBSYNCER_T");
-        }
-        return sql.toString();
+    protected String getQueryCountSql(CommandConfig commandConfig, List<String> primaryKeys, String schema, String queryFilterSql) {
+        Table table = commandConfig.getTable();
+        String tableName = table.getName();
+        SqlBuilderConfig config = new SqlBuilderConfig(this, schema, tableName, primaryKeys, table.getColumn(), queryFilterSql);
+        return SqlBuilderEnum.QUERY_COUNT.getSqlBuilder().buildSql(config);
     }
 
     /**
      * 获取查询条件SQL
      *
-     * @param filter
+     * @param commandConfig
      * @return
      */
-    protected String getQueryFilterSql(List<Filter> filter) {
+    protected String getQueryFilterSql(CommandConfig commandConfig) {
+        List<Filter> filter = commandConfig.getFilter();
         if (CollectionUtils.isEmpty(filter)) {
             return "";
         }
+        Table table = commandConfig.getTable();
+        Map<String, Field> fieldMap = new HashMap<>();
+        table.getColumn().forEach(field -> fieldMap.put(field.getName(), field));
+
         // 过滤条件SQL
         StringBuilder sql = new StringBuilder();
 
         // 拼接并且SQL
-        String addSql = getFilterSql(OperationEnum.AND.getName(), filter);
+        String addSql = getFilterSql(fieldMap, OperationEnum.AND.getName(), filter);
         // 如果Add条件存在
         if (StringUtil.isNotBlank(addSql)) {
             sql.append(addSql);
         }
 
         // 拼接或者SQL
-        String orSql = getFilterSql(OperationEnum.OR.getName(), filter);
+        String orSql = getFilterSql(fieldMap, OperationEnum.OR.getName(), filter);
         // 如果Or条件和Add条件都存在
         if (StringUtil.isNotBlank(orSql) && StringUtil.isNotBlank(addSql)) {
             sql.append(" OR ");
@@ -428,18 +399,25 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     }
 
     /**
-     * 获取架构
+     * 去重列
      *
-     * @param config
-     * @param quotation
+     * @param column
      * @return
      */
-    protected String getSchema(DatabaseConfig config, String quotation) {
-        StringBuilder schema = new StringBuilder();
-        if (StringUtil.isNotBlank(config.getSchema())) {
-            schema.append(quotation).append(config.getSchema()).append(quotation).append(".");
+    private List<Field> filterColumn(List<Field> column) {
+        Set<String> mark = new HashSet<>();
+        List<Field> fields = new ArrayList<>();
+        for (Field c : column) {
+            String name = c.getName();
+            if (StringUtil.isBlank(name)) {
+                throw new ConnectorException("The field name can not be empty.");
+            }
+            if (!mark.contains(name)) {
+                fields.add(c);
+                mark.add(name);
+            }
         }
-        return schema.toString();
+        return fields;
     }
 
     /**
@@ -472,11 +450,12 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     /**
      * 根据过滤条件获取查询SQL
      *
+     * @param fieldMap
      * @param queryOperator and/or
      * @param filter
      * @return
      */
-    private String getFilterSql(String queryOperator, List<Filter> filter) {
+    private String getFilterSql(Map<String, Field> fieldMap, String queryOperator, List<Filter> filter) {
         List<Filter> list = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), queryOperator)).collect(Collectors.toList());
         if (CollectionUtils.isEmpty(list)) {
             return "";
@@ -490,8 +469,12 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         String quotation = buildSqlWithQuotation();
         for (int i = 0; i < size; i++) {
             c = list.get(i);
+            Field field = fieldMap.get(c.getName());
+            Assert.notNull(field, "条件字段无效.");
             // "USER" = 'zhangsan'
-            sql.append(quotation).append(c.getName()).append(quotation);
+            sql.append(quotation);
+            sql.append(buildFieldName(field));
+            sql.append(quotation);
             sql.append(" ").append(c.getFilter()).append(" ");
             // 如果使用了函数则不加引号
             String filterValueQuotation = buildSqlFilterWithQuotation(c.getValue());
@@ -505,51 +488,14 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     }
 
     /**
-     * 获取查询SQL
+     * 生成SQL
      *
-     * @param type           {@link SqlBuilderEnum}
-     * @param commandConfig
-     * @param schema
-     * @param queryFilterSQL
-     * @return
+     * @param map
+     * @param builderType
+     * @param config
      */
-    private String buildSql(String type, CommandConfig commandConfig, String schema, String queryFilterSQL) {
-        Table table = commandConfig.getTable();
-        if (null == table) {
-            logger.error("Table can not be null.");
-            throw new ConnectorException("Table can not be null.");
-        }
-        List<Field> column = table.getColumn();
-        if (CollectionUtils.isEmpty(column)) {
-            return null;
-        }
-        Set<String> mark = new HashSet<>();
-        List<Field> fields = new ArrayList<>();
-        for (Field c : column) {
-            String name = c.getName();
-            if (StringUtil.isBlank(name)) {
-                throw new ConnectorException("The field name can not be empty.");
-            }
-            if (!mark.contains(name)) {
-                fields.add(c);
-                mark.add(name);
-            }
-        }
-        if (CollectionUtils.isEmpty(fields)) {
-            logger.error("The fields can not be empty.");
-            throw new ConnectorException("The fields can not be empty.");
-        }
-        String tableName = table.getName();
-        if (StringUtil.isBlank(tableName)) {
-            logger.error("Table name can not be empty.");
-            throw new ConnectorException("Table name can not be empty.");
-        }
-        List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(commandConfig.getTable());
-        if (!CollectionUtils.isEmpty(primaryKeys)) {
-            SqlBuilderConfig config = new SqlBuilderConfig(this, schema, tableName, primaryKeys, fields, queryFilterSQL, buildSqlWithQuotation());
-            return SqlBuilderEnum.getSqlBuilder(type).buildSql(config);
-        }
-        return "";
+    private void buildSql(Map<String, String> map, SqlBuilderEnum builderType, SqlBuilderConfig config) {
+        map.put(builderType.getName(), builderType.getSqlBuilder().buildSql(config));
     }
 
     /**
@@ -594,7 +540,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
                              Map row) {
         if (isUpdate(config.getEvent()) || isInsert(config.getEvent())) {
             // 存在执行覆盖更新,否则写入
-            final String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
+            final String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_EXIST);
             int size = pkFields.size();
             Object[] args = new Object[size];
             for (int i = 0; i < size; i++) {

+ 67 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/Database.java

@@ -1,12 +1,65 @@
 package org.dbsyncer.connector.database;
 
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
+import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.PageSql;
 
+import java.util.List;
+
 public interface Database {
 
+    /**
+     * 查询语句表名和字段带上引号(默认不加)
+     *
+     * @return
+     */
+    default String buildSqlWithQuotation() {
+        return StringUtil.EMPTY;
+    }
+
+    /**
+     * 获取条件值引号
+     *
+     * @param value
+     * @return
+     */
+    default String buildSqlFilterWithQuotation(String value) {
+        return "'";
+    }
+
+    /**
+     * 获取表名称(可自定义处理系统关键字,函数名)
+     *
+     * @param tableName
+     * @return
+     */
+    default String buildTableName(String tableName) {
+        return tableName;
+    }
+
+    /**
+     * 获取字段名称(可自定义处理系统关键字,函数名)
+     *
+     * @param field
+     * @return
+     */
+    default String buildFieldName(Field field) {
+        return field.getName();
+    }
+
+    /**
+     * 获取主键字段名称(可自定义处理系统关键字,函数名)
+     *
+     * @param primaryKeys
+     * @return
+     */
+    default List<String> buildPrimaryKeys(List<String> primaryKeys) {
+        return primaryKeys;
+    }
+
     /**
      * 获取分页SQL
      *
@@ -39,17 +92,26 @@ public interface Database {
      * @param config
      * @return
      */
-    default Object[] getPageCursorArgs(ReaderConfig config){
+    default Object[] getPageCursorArgs(ReaderConfig config) {
         throw new ConnectorException("Unsupported override method getPageCursorArgs:" + getClass().getName());
     }
 
     /**
-     * 获取字段名称(可重写实现系统关键字,函数名处理)
+     * 健康检查
      *
-     * @param field
      * @return
      */
-    default String buildFieldName(Field field){
-        return field.getName();
+    default String getValidationQuery() {
+        return "select 1";
+    }
+
+    /**
+     * 是否使用游标查询
+     *
+     * @return
+     */
+    default boolean enableCursor() {
+        return false;
     }
+
 }

+ 9 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderDelete.java

@@ -2,6 +2,7 @@ package org.dbsyncer.connector.database.sqlbuilder;
 
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
+import org.dbsyncer.connector.database.Database;
 import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
 import java.util.List;
@@ -15,11 +16,16 @@ public class SqlBuilderDelete extends AbstractSqlBuilder {
 
     @Override
     public String buildSql(SqlBuilderConfig config) {
+        Database database = config.getDatabase();
+        String quotation = database.buildSqlWithQuotation();
         String tableName = config.getTableName();
-        String quotation = config.getQuotation();
-        List<String> primaryKeys = config.getPrimaryKeys();
+        List<String> primaryKeys = database.buildPrimaryKeys(config.getPrimaryKeys());
         // DELETE FROM "USER" WHERE "ID"=? AND "UID" = ?
-        StringBuilder sql = new StringBuilder().append("DELETE FROM ").append(config.getSchema()).append(quotation).append(tableName).append(quotation).append(" WHERE ");
+        StringBuilder sql = new StringBuilder().append("DELETE FROM ").append(config.getSchema());
+        sql.append(quotation);
+        sql.append(database.buildTableName(tableName));
+        sql.append(quotation);
+        sql.append(" WHERE ");
         PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " = ? ", true);
         return sql.toString();
     }

+ 13 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderInsert.java

@@ -1,8 +1,9 @@
 package org.dbsyncer.connector.database.sqlbuilder;
 
-import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
+import org.dbsyncer.connector.database.Database;
+import org.dbsyncer.connector.model.Field;
 
 import java.util.List;
 
@@ -15,18 +16,19 @@ public class SqlBuilderInsert extends AbstractSqlBuilder {
 
     @Override
     public String buildSql(SqlBuilderConfig config) {
-        String tableName = config.getTableName();
+        Database database = config.getDatabase();
+        String quotation = database.buildSqlWithQuotation();
         List<Field> fields = config.getFields();
-        String quotation = config.getQuotation();
 
-        StringBuilder sql = new StringBuilder();
         StringBuilder fs = new StringBuilder();
         StringBuilder vs = new StringBuilder();
         int size = fields.size();
         int end = size - 1;
         for (int i = 0; i < size; i++) {
             // "USERNAME"
-            fs.append(quotation).append(fields.get(i).getName()).append(quotation);
+            fs.append(quotation);
+            fs.append(database.buildFieldName(fields.get(i)));
+            fs.append(quotation);
             vs.append("?");
             //如果不是最后一个字段
             if (i < end) {
@@ -35,8 +37,12 @@ public class SqlBuilderInsert extends AbstractSqlBuilder {
             }
         }
         // INSERT INTO "USER"("USERNAME","AGE") VALUES (?,?)
-        sql.insert(0, "INSERT INTO ").append(config.getSchema()).append(quotation).append(tableName).append(quotation).append("(").append(fs).append(") VALUES (")
-                .append(vs).append(")");
+        StringBuilder sql = new StringBuilder("INSERT INTO ");
+        sql.append(config.getSchema());
+        sql.append(quotation);
+        sql.append(database.buildTableName(config.getTableName()));
+        sql.append(quotation);
+        sql.append("(").append(fs).append(") VALUES (").append(vs).append(")");
         return sql.toString();
     }
 

+ 13 - 10
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQuery.java

@@ -1,11 +1,11 @@
 package org.dbsyncer.connector.database.sqlbuilder;
 
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.model.Field;
-import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
 import org.dbsyncer.connector.database.Database;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.PageSql;
 
 import java.util.List;
 
@@ -19,20 +19,21 @@ public class SqlBuilderQuery extends AbstractSqlBuilder {
     @Override
     public String buildSql(SqlBuilderConfig config) {
         // 分页语句
-        Database database = config.getDatabase();
-        PageSql pageSql = new PageSql(config, buildQuerySql(config), config.getQuotation(), config.getPrimaryKeys());
-        return database.getPageSql(pageSql);
+        List<String> primaryKeys = config.getPrimaryKeys();
+        String queryFilter = config.getQueryFilter();
+        List<Field> fields = config.getFields();
+        PageSql pageSql = new PageSql(buildQuerySql(config), queryFilter, primaryKeys, fields);
+        return config.getDatabase().getPageSql(pageSql);
     }
 
     @Override
     public String buildQuerySql(SqlBuilderConfig config) {
-        String tableName = config.getTableName();
+        Database database = config.getDatabase();
+        String quotation = database.buildSqlWithQuotation();
         List<Field> fields = config.getFields();
-        String quotation = config.getQuotation();
         String queryFilter = config.getQueryFilter();
-        Database database = config.getDatabase();
 
-        StringBuilder sql = new StringBuilder();
+        StringBuilder sql = new StringBuilder("SELECT ");
         int size = fields.size();
         int end = size - 1;
         Field field = null;
@@ -57,7 +58,9 @@ public class SqlBuilderQuery extends AbstractSqlBuilder {
             }
         }
         // SELECT "ID","NAME" FROM "USER"
-        sql.insert(0, "SELECT ").append(" FROM ").append(config.getSchema()).append(quotation).append(tableName).append(quotation);
+        sql.append(" FROM ").append(config.getSchema()).append(quotation);
+        sql.append(database.buildTableName(config.getTableName()));
+        sql.append(quotation);
         // 解析查询条件
         if (StringUtil.isNotBlank(queryFilter)) {
             sql.append(queryFilter);

+ 41 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryCount.java

@@ -0,0 +1,41 @@
+package org.dbsyncer.connector.database.sqlbuilder;
+
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.config.SqlBuilderConfig;
+import org.dbsyncer.connector.database.Database;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
+
+import java.util.List;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2023/8/8 0:03
+ */
+public class SqlBuilderQueryCount extends SqlBuilderQuery {
+
+    @Override
+    public String buildSql(SqlBuilderConfig config) {
+        Database database = config.getDatabase();
+        String quotation = database.buildSqlWithQuotation();
+        String tableName = config.getTableName();
+        List<String> primaryKeys = database.buildPrimaryKeys(config.getPrimaryKeys());
+        String schema = config.getSchema();
+        String queryFilter = config.getQueryFilter();
+
+        StringBuilder sql = new StringBuilder();
+        sql.append("SELECT COUNT(1) FROM (SELECT 1 FROM ").append(schema);
+        sql.append(quotation);
+        sql.append(database.buildTableName(tableName));
+        sql.append(quotation);
+        if (StringUtil.isNotBlank(queryFilter)) {
+            sql.append(queryFilter);
+        }
+        sql.append(" GROUP BY ");
+        // id,uid
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        sql.append(") DBSYNCER_T");
+        return sql.toString();
+    }
+
+}

+ 8 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryCursor.java

@@ -2,8 +2,11 @@ package org.dbsyncer.connector.database.sqlbuilder;
 
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.Database;
+import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.PageSql;
 
+import java.util.List;
+
 /**
  * @author AE86
  * @version 1.0.0
@@ -15,8 +18,11 @@ public class SqlBuilderQueryCursor extends SqlBuilderQuery {
     public String buildSql(SqlBuilderConfig config) {
         // 分页语句
         Database database = config.getDatabase();
-        PageSql pageSql = new PageSql(config, buildQuerySql(config), config.getQuotation(), config.getPrimaryKeys());
-        return database.getPageCursorSql(pageSql);
+        String queryFilter = config.getQueryFilter();
+        List<String> primaryKeys = database.buildPrimaryKeys(config.getPrimaryKeys());
+        List<Field> fields = config.getFields();
+        PageSql pageSql = new PageSql(buildQuerySql(config), queryFilter, primaryKeys, fields);
+        return config.getDatabase().getPageCursorSql(pageSql);
     }
 
 }

+ 33 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryExist.java

@@ -0,0 +1,33 @@
+package org.dbsyncer.connector.database.sqlbuilder;
+
+import org.dbsyncer.connector.config.SqlBuilderConfig;
+import org.dbsyncer.connector.database.Database;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
+
+import java.util.List;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2023/8/8 0:03
+ */
+public class SqlBuilderQueryExist extends SqlBuilderQuery {
+
+    @Override
+    public String buildSql(SqlBuilderConfig config) {
+        Database database = config.getDatabase();
+        String quotation = database.buildSqlWithQuotation();
+        String tableName = config.getTableName();
+        String schema = config.getSchema();
+        List<String> primaryKeys = database.buildPrimaryKeys(config.getPrimaryKeys());
+        StringBuilder sql = new StringBuilder("SELECT COUNT(1) FROM ");
+        sql.append(schema).append(quotation);
+        sql.append(database.buildTableName(tableName));
+        sql.append(quotation);
+        sql.append(" WHERE ");
+        // id = ? AND uid = ?
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " = ? ", true);
+        return sql.toString();
+    }
+
+}

+ 12 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderUpdate.java

@@ -3,11 +3,11 @@ package org.dbsyncer.connector.database.sqlbuilder;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
+import org.dbsyncer.connector.database.Database;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
 import java.util.List;
-import java.util.Set;
 
 /**
  * @author AE86
@@ -18,13 +18,16 @@ public class SqlBuilderUpdate extends AbstractSqlBuilder {
 
     @Override
     public String buildSql(SqlBuilderConfig config) {
-        String tableName = config.getTableName();
+        Database database = config.getDatabase();
+        String quotation = database.buildSqlWithQuotation();
         List<Field> fields = config.getFields();
-        String quotation = config.getQuotation();
+
         StringBuilder sql = new StringBuilder();
-        List<String> primaryKeys = config.getPrimaryKeys();
+        sql.append("UPDATE ").append(config.getSchema());
+        sql.append(quotation);
+        sql.append(database.buildTableName(config.getTableName()));
+        sql.append(quotation).append(" SET ");
         int size = fields.size();
-        sql.append("UPDATE ").append(config.getSchema()).append(quotation).append(tableName).append(quotation).append(" SET ");
         for (int i = 0; i < size; i++) {
             // skip pk
             if (fields.get(i).isPk()) {
@@ -32,7 +35,9 @@ public class SqlBuilderUpdate extends AbstractSqlBuilder {
             }
 
             // "USERNAME"=?
-            sql.append(quotation).append(fields.get(i).getName()).append(quotation).append("=?");
+            sql.append(quotation);
+            sql.append(database.buildFieldName(fields.get(i)));
+            sql.append(quotation).append("=?");
             if (i < size - 1) {
                 sql.append(",");
             }
@@ -46,6 +51,7 @@ public class SqlBuilderUpdate extends AbstractSqlBuilder {
 
         // UPDATE "USER" SET "USERNAME"=?,"AGE"=? WHERE "ID"=? AND "UID" = ?
         sql.append(" WHERE ");
+        List<String> primaryKeys = database.buildPrimaryKeys(config.getPrimaryKeys());
         PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " = ? ", true);
         return sql.toString();
     }

+ 17 - 13
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/SqlBuilderEnum.java

@@ -1,9 +1,14 @@
 package org.dbsyncer.connector.enums;
 
-import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.constant.ConnectorConstant;
-import org.dbsyncer.connector.database.sqlbuilder.*;
+import org.dbsyncer.connector.database.sqlbuilder.SqlBuilder;
+import org.dbsyncer.connector.database.sqlbuilder.SqlBuilderDelete;
+import org.dbsyncer.connector.database.sqlbuilder.SqlBuilderInsert;
+import org.dbsyncer.connector.database.sqlbuilder.SqlBuilderQuery;
+import org.dbsyncer.connector.database.sqlbuilder.SqlBuilderQueryCount;
+import org.dbsyncer.connector.database.sqlbuilder.SqlBuilderQueryExist;
+import org.dbsyncer.connector.database.sqlbuilder.SqlBuilderQueryCursor;
+import org.dbsyncer.connector.database.sqlbuilder.SqlBuilderUpdate;
 
 /**
  * @author AE86
@@ -31,7 +36,15 @@ public enum SqlBuilderEnum {
     /**
      * 查询游标SQL生成器
      */
-    QUERY_CURSOR(ConnectorConstant.OPERTION_QUERY_CURSOR, new SqlBuilderQueryCursor());
+    QUERY_CURSOR(ConnectorConstant.OPERTION_QUERY_CURSOR, new SqlBuilderQueryCursor()),
+    /**
+     * 查询总数
+     */
+    QUERY_COUNT(ConnectorConstant.OPERTION_QUERY_COUNT, new SqlBuilderQueryCount()),
+    /**
+     * 查询行数据是否存在
+     */
+    QUERY_EXIST(ConnectorConstant.OPERTION_QUERY_EXIST, new SqlBuilderQueryExist());
 
     /**
      * SQL构造器名称
@@ -48,15 +61,6 @@ public enum SqlBuilderEnum {
         this.sqlBuilder = sqlBuilder;
     }
 
-    public static SqlBuilder getSqlBuilder(String name) throws ConnectorException {
-        for (SqlBuilderEnum e : SqlBuilderEnum.values()) {
-            if (StringUtil.equals(name, e.getName())) {
-                return e.getSqlBuilder();
-            }
-        }
-        throw new ConnectorException(String.format("SqlBuilder name \"%s\" does not exist.", name));
-    }
-
     public String getName() {
         return name;
     }

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -199,7 +199,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
 
         final Result result = new Result();
         final ESConfig cfg = connectorMapper.getConfig();
-        final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeys(config);
+        final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
         try {
             BulkRequest request = new BulkRequest();
             // 默认取第一个主键

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -85,7 +85,7 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
 
         Result result = new Result();
         final KafkaConfig cfg = connectorMapper.getConfig();
-        final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeys(config);
+        final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
         try {
             String topic = cfg.getTopic();
             // 默认取第一个主键

+ 14 - 14
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/PageSql.java

@@ -1,39 +1,39 @@
 package org.dbsyncer.connector.model;
 
-import org.dbsyncer.connector.config.SqlBuilderConfig;
-
 import java.util.List;
 
 public class PageSql {
 
-    private SqlBuilderConfig sqlBuilderConfig;
-
     private String querySql;
 
-    private String quotation;
+    // 过滤条件
+    private String queryFilter;
 
     private List<String> primaryKeys;
 
-    public PageSql(SqlBuilderConfig sqlBuilderConfig, String querySql, String quotation, List<String> primaryKeys) {
-        this.sqlBuilderConfig = sqlBuilderConfig;
+    // 字段
+    private List<Field> fields;
+
+    public PageSql(String querySql, String queryFilter, List<String> primaryKeys, List<Field> fields) {
         this.querySql = querySql;
-        this.quotation = quotation;
+        this.queryFilter = queryFilter;
         this.primaryKeys = primaryKeys;
-    }
-
-    public SqlBuilderConfig getSqlBuilderConfig() {
-        return sqlBuilderConfig;
+        this.fields = fields;
     }
 
     public String getQuerySql() {
         return querySql;
     }
 
-    public String getQuotation() {
-        return quotation;
+    public String getQueryFilter() {
+        return queryFilter;
     }
 
     public List<String> getPrimaryKeys() {
         return primaryKeys;
     }
+
+    public List<Field> getFields() {
+        return fields;
+    }
 }

+ 9 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -24,7 +24,9 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
     public String getPageSql(PageSql config) {
         // select * from test.`my_user` where `id` > ? and `uid` > ? order by `id`,`uid` limit ?,?
         StringBuilder sql = new StringBuilder(config.getQuerySql());
-        appendOrderByPkIfSupportedCursor(config, sql);
+        if (PrimaryKeyUtil.isSupportedCursor(config.getFields())) {
+            appendOrderByPk(config, sql);
+        }
         sql.append(DatabaseConstant.MYSQL_PAGE_SQL);
         return sql.toString();
     }
@@ -32,23 +34,23 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
     @Override
     public String getPageCursorSql(PageSql config) {
         // 不支持游标查询
-        if (!isSupportedCursor(config)) {
+        if (!PrimaryKeyUtil.isSupportedCursor(config.getFields())) {
             logger.debug("不支持游标查询,主键包含非数字类型");
-            return "";
+            return StringUtil.EMPTY;
         }
 
         // select * from test.`my_user` where `id` > ? and `uid` > ? order by `id`,`uid` limit ?,?
         StringBuilder sql = new StringBuilder(config.getQuerySql());
         boolean skipFirst = false;
         // 没有过滤条件
-        if (StringUtil.isBlank(config.getSqlBuilderConfig().getQueryFilter())) {
+        if (StringUtil.isBlank(config.getQueryFilter())) {
             skipFirst = true;
             sql.append(" WHERE ");
         }
-        final String quotation = config.getQuotation();
+        final String quotation = buildSqlWithQuotation();
         final List<String> primaryKeys = config.getPrimaryKeys();
         PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " > ? ", skipFirst);
-        appendOrderByPkIfSupportedCursor(config, sql);
+        appendOrderByPk(config, sql);
         sql.append(DatabaseConstant.MYSQL_PAGE_SQL);
         return sql.toString();
     }
@@ -76,7 +78,7 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    protected boolean enableCursor() {
+    public boolean enableCursor() {
         return true;
     }
 

+ 13 - 11
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -42,7 +42,9 @@ public final class OracleConnector extends AbstractDatabaseConnector {
         StringBuilder sql = new StringBuilder();
         sql.append(DatabaseConstant.ORACLE_PAGE_SQL_START);
         sql.append(config.getQuerySql());
-        appendOrderByPkIfSupportedCursor(config, sql);
+        if (PrimaryKeyUtil.isSupportedCursor(config.getFields())) {
+            appendOrderByPk(config, sql);
+        }
         sql.append(DatabaseConstant.ORACLE_PAGE_SQL_END);
         return sql.toString();
     }
@@ -50,9 +52,9 @@ public final class OracleConnector extends AbstractDatabaseConnector {
     @Override
     public String getPageCursorSql(PageSql config) {
         // 不支持游标查询
-        if (!isSupportedCursor(config)) {
+        if (!PrimaryKeyUtil.isSupportedCursor(config.getFields())) {
             logger.debug("不支持游标查询,主键包含非数字类型");
-            return "";
+            return StringUtil.EMPTY;
         }
 
         // SELECT * FROM (SELECT A.*, ROWNUM RN FROM (select * from test."my_user" where "id" > ? and "uid" > ? order by "id","uid")A WHERE ROWNUM <= ?) WHERE RN > ?
@@ -61,14 +63,14 @@ public final class OracleConnector extends AbstractDatabaseConnector {
         sql.append(config.getQuerySql());
         boolean skipFirst = false;
         // 没有过滤条件
-        if (StringUtil.isBlank(config.getSqlBuilderConfig().getQueryFilter())) {
+        if (StringUtil.isBlank(config.getQueryFilter())) {
             skipFirst = true;
             sql.append(" WHERE ");
         }
-        final String quotation = config.getQuotation();
+        final String quotation = buildSqlWithQuotation();
         final List<String> primaryKeys = config.getPrimaryKeys();
         PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " > ? ", skipFirst);
-        appendOrderByPkIfSupportedCursor(config, sql);
+        appendOrderByPk(config, sql);
         sql.append(DatabaseConstant.ORACLE_PAGE_SQL_END);
         return sql.toString();
     }
@@ -96,12 +98,12 @@ public final class OracleConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    protected boolean enableCursor() {
+    public boolean enableCursor() {
         return true;
     }
 
     @Override
-    protected String buildSqlFilterWithQuotation(String value) {
+    public String buildSqlFilterWithQuotation(String value) {
         if (StringUtil.isNotBlank(value)) {
             String val = value.toLowerCase();
             // 支持Oracle系统函数, Example: to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')
@@ -114,15 +116,15 @@ public final class OracleConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    protected String getValidationQuery() {
+    public String getValidationQuery() {
         return "select 1 from dual";
     }
 
     @Override
-    protected String getQueryCountSql(CommandConfig commandConfig, String schema, String quotation, String queryFilterSql) {
+    protected String getQueryCountSql(CommandConfig commandConfig, List<String> primaryKeys, String schema, String queryFilterSql) {
         final Table table = commandConfig.getTable();
         if (StringUtil.isNotBlank(queryFilterSql) || TableTypeEnum.isView(table.getType())) {
-            return super.getQueryCountSql(commandConfig, schema, quotation, queryFilterSql);
+            return super.getQueryCountSql(commandConfig, primaryKeys, schema, queryFilterSql);
         }
 
         // 从系统表查询

+ 11 - 9
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -34,7 +34,9 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
     public String getPageSql(PageSql config) {
         // select * from test."my_user" where "id" > ? and "uid" > ? order by "id","uid" limit ? OFFSET ?
         StringBuilder sql = new StringBuilder(config.getQuerySql());
-        appendOrderByPkIfSupportedCursor(config, sql);
+        if (PrimaryKeyUtil.isSupportedCursor(config.getFields())) {
+            appendOrderByPk(config, sql);
+        }
         sql.append(DatabaseConstant.POSTGRESQL_PAGE_SQL);
         return sql.toString();
     }
@@ -42,23 +44,23 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
     @Override
     public String getPageCursorSql(PageSql config) {
         // 不支持游标查询
-        if (!isSupportedCursor(config)) {
+        if (!PrimaryKeyUtil.isSupportedCursor(config.getFields())) {
             logger.debug("不支持游标查询,主键包含非数字类型");
-            return "";
+            return StringUtil.EMPTY;
         }
 
         // select * from test."my_user" where "id" > ? and "uid" > ? order by "id","uid" limit ? OFFSET ?
         StringBuilder sql = new StringBuilder(config.getQuerySql());
         boolean skipFirst = false;
         // 没有过滤条件
-        if (StringUtil.isBlank(config.getSqlBuilderConfig().getQueryFilter())) {
+        if (StringUtil.isBlank(config.getQueryFilter())) {
             skipFirst = true;
             sql.append(" WHERE ");
         }
         final List<String> primaryKeys = config.getPrimaryKeys();
-        final String quotation = config.getQuotation();
+        final String quotation = buildSqlWithQuotation();
         PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " > ? ", skipFirst);
-        appendOrderByPkIfSupportedCursor(config, sql);
+        appendOrderByPk(config, sql);
         sql.append(DatabaseConstant.POSTGRESQL_PAGE_SQL);
         return sql.toString();
     }
@@ -86,10 +88,10 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    protected String getQueryCountSql(CommandConfig commandConfig, String schema, String quotation, String queryFilterSql) {
+    protected String getQueryCountSql(CommandConfig commandConfig, List<String> primaryKeys, String schema, String queryFilterSql) {
         final Table table = commandConfig.getTable();
         if (StringUtil.isNotBlank(queryFilterSql) || TableTypeEnum.isView(table.getType())) {
-            return super.getQueryCountSql(commandConfig, schema, quotation, queryFilterSql);
+            return super.getQueryCountSql(commandConfig, primaryKeys, schema, queryFilterSql);
         }
 
         // 从系统表查询
@@ -98,7 +100,7 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    protected boolean enableCursor() {
+    public boolean enableCursor() {
         return true;
     }
 

+ 4 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java

@@ -72,7 +72,7 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
      */
     protected Map<String, String> getDqlSourceCommand(CommandConfig commandConfig, boolean groupByPK) {
         // 获取过滤SQL
-        String queryFilterSql = getQueryFilterSql(commandConfig.getFilter());
+        String queryFilterSql = getQueryFilterSql(commandConfig);
         Table table = commandConfig.getTable();
         Map<String, String> map = new HashMap<>();
         List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(table);
@@ -87,8 +87,8 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
         if (StringUtil.isNotBlank(queryFilterSql)) {
             querySql += queryFilterSql;
         }
-        String quotation = buildSqlWithQuotation();
-        map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(new PageSql(null, querySql, quotation, primaryKeys)));
+        PageSql pageSql = new PageSql(querySql, StringUtil.EMPTY, primaryKeys, table.getColumn());
+        map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(pageSql));
 
         // 获取查询总数SQL
         StringBuilder queryCount = new StringBuilder();
@@ -98,6 +98,7 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
         if (groupByPK) {
             queryCount.append(" GROUP BY ");
             // id,id2
+            String quotation = buildSqlWithQuotation();
             PrimaryKeyUtil.buildSql(queryCount, primaryKeys, quotation, ",", "", true);
         }
         queryCount.append(") DBSYNCER_T");

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java

@@ -24,7 +24,7 @@ public final class DQLOracleConnector extends AbstractDQLConnector {
     }
 
     @Override
-    protected String getValidationQuery() {
+    public String getValidationQuery() {
         return "select 1 from dual";
     }
 

+ 43 - 15
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -15,6 +15,7 @@ import org.dbsyncer.connector.model.Table;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -33,7 +34,7 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
     /**
      * 系统关键字段名
      */
-    private final String SYS_FIELD_EXPRESSION = "(convert)";
+    private final Set<String> SYS_FIELDS = CollectionUtils.newHashSet("convert", "user", "type", "version", "close", "bulk", "source");
 
     @Override
     public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
@@ -54,20 +55,11 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
     public Object[] getPageArgs(ReaderConfig config) {
         int pageSize = config.getPageSize();
         int pageIndex = config.getPageIndex();
-        return new Object[]{(pageIndex - 1) * pageSize + 1, pageIndex * pageSize};
+        return new Object[] {(pageIndex - 1) * pageSize + 1, pageIndex * pageSize};
     }
 
     @Override
-    public String buildFieldName(Field field) {
-        // 处理系统关键字
-        if (containsKeyword(SYS_FIELD_EXPRESSION, field.getName())) {
-            return new StringBuilder("[").append(field.getName()).append("]").toString();
-        }
-        return field.getName();
-    }
-
-    @Override
-    protected String buildSqlFilterWithQuotation(String value) {
+    public String buildSqlFilterWithQuotation(String value) {
         // 支持SqlServer系统函数, Example: (select CONVERT(varchar(10),GETDATE(),120))
         if (containsKeyword(SYS_EXPRESSION, value)) {
             return StringUtil.EMPTY;
@@ -76,16 +68,35 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    protected String getQueryCountSql(CommandConfig commandConfig, String schema, String quotation, String queryFilterSql) {
+    public String buildTableName(String tableName) {
+        return containsKeyword(tableName) ? convertKey(tableName) : tableName;
+    }
+
+    @Override
+    public String buildFieldName(Field field) {
+        return containsKeyword(field.getName()) ? convertKey(field.getName()) : field.getName();
+    }
+
+    @Override
+    public List<String> buildPrimaryKeys(List<String> primaryKeys) {
+        if (CollectionUtils.isEmpty(primaryKeys)) {
+            return primaryKeys;
+        }
+        return primaryKeys.stream().map(pk -> SYS_FIELDS.contains(pk) ? convertKey(pk) : pk).collect(Collectors.toList());
+    }
+
+    @Override
+    protected String getQueryCountSql(CommandConfig commandConfig, List<String> primaryKeys, String schema, String queryFilterSql) {
         // 视图或有过滤条件,走默认方式
         final Table table = commandConfig.getTable();
         if (StringUtil.isNotBlank(queryFilterSql) || TableTypeEnum.isView(table.getType())) {
-            return new StringBuilder("SELECT COUNT(1) FROM ").append(schema).append(quotation).append(table.getName()).append(quotation).append(queryFilterSql).toString();
+            return super.getQueryCountSql(commandConfig, primaryKeys, schema, queryFilterSql);
         }
 
         DatabaseConfig cfg = (DatabaseConfig) commandConfig.getConnectorConfig();
         // 从存储过程查询(定时更新总数,可能存在误差)
-        return String.format("select rows from sysindexes where id = object_id('%s.%s') and indid in (0, 1)", cfg.getSchema(), table.getName());
+        return String.format("select rows from sysindexes where id = object_id('%s.%s') and indid in (0, 1)", cfg.getSchema(),
+                table.getName());
     }
 
     private List<Table> getTables(DatabaseConnectorMapper connectorMapper, String sql, TableTypeEnum type) {
@@ -96,6 +107,23 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
         return new ArrayList<>();
     }
 
+    private String convertKey(String key) {
+        return new StringBuilder("[").append(key).append("]").toString();
+    }
+
+    /**
+     * 是否包含系统关键字
+     *
+     * @param val
+     * @return
+     */
+    private boolean containsKeyword(String val) {
+        if (StringUtil.isNotBlank(val)) {
+            return SYS_FIELDS.contains(val.toLowerCase());
+        }
+        return false;
+    }
+
     /**
      * 是否包含系统关键字
      *

+ 39 - 32
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PrimaryKeyUtil.java

@@ -12,10 +12,13 @@ import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 public abstract class PrimaryKeyUtil {
 
@@ -31,39 +34,37 @@ public abstract class PrimaryKeyUtil {
         }
 
         // 获取表同步的主键字段
-        List<String> primaryKeys = new ArrayList<>();
-        if (!CollectionUtils.isEmpty(table.getColumn())) {
-            table.getColumn().forEach(c -> {
-                if (c.isPk()) {
-                    primaryKeys.add(c.getName());
-                }
-            });
-        }
+        List<String> primaryKeys = findPrimaryKeys(table.getColumn());
 
         // 如果存在表字段映射关系,没有配置主键则抛出异常提示
         if (!CollectionUtils.isEmpty(table.getColumn()) && CollectionUtils.isEmpty(primaryKeys)) {
             throw new ConnectorException(String.format("目标表 %s 缺少主键.", table.getName()));
         }
-        return Collections.unmodifiableList(primaryKeys);
+        return primaryKeys;
+    }
+
+    /**
+     * 返回主键名称
+     *
+     * @param fields
+     * @return
+     */
+    public static List<String> findPrimaryKeys(List<Field> fields) {
+        return findPrimaryKeyFields(fields).stream().map(f -> f.getName()).collect(Collectors.toList());
     }
 
     /**
-     * 返回主键集合
+     * 返回主键属性字段集合
      *
      * @param config
      * @return
      */
-    public static List<Field> findConfigPrimaryKeys(WriterBatchConfig config) {
+    public static List<Field> findConfigPrimaryKeyFields(WriterBatchConfig config) {
         if (null == config) {
             throw new ConnectorException("The config is null.");
         }
 
-        List<Field> list = new ArrayList<>();
-        for (Field f : config.getFields()) {
-            if (f.isPk()) {
-                list.add(f);
-            }
-        }
+        List<Field> list = findPrimaryKeyFields(config.getFields());
         if (CollectionUtils.isEmpty(list)) {
             throw new ConnectorException("主键为空");
         }
@@ -71,22 +72,23 @@ public abstract class PrimaryKeyUtil {
     }
 
     /**
-     * 返回主键字段类型
+     * 返回主键属性字段集合
      *
      * @param fields
      * @return
      */
-    public static Map<String, Integer> findPrimaryKeyType(List<Field> fields) {
-        Map<String, Integer> map = new HashMap<>();
+    public static List<Field> findPrimaryKeyFields(List<Field> fields) {
+        List<Field> list = new ArrayList<>();
         if (!CollectionUtils.isEmpty(fields)) {
-            fields.forEach(field -> {
-                if (field.isPk()) {
-                    map.put(field.getName(), field.getType());
+            Set<String> mark = new HashSet<>();
+            fields.forEach(f -> {
+                if (f.isPk() && !mark.contains(f.getName())) {
+                    list.add(f);
+                    mark.add(f.getName());
                 }
             });
         }
-
-        return Collections.unmodifiableMap(map);
+        return Collections.unmodifiableList(list);
     }
 
     public static void buildSql(StringBuilder sql, List<String> primaryKeys, String quotation, String join, String value, boolean skipFirst) {
@@ -109,22 +111,27 @@ public abstract class PrimaryKeyUtil {
     /**
      * 游标主键必须为数字类型,否则会导致分页失效
      *
-     * @param typeAliases
-     * @param primaryKeys
+     * @param fields
      * @return
      */
-    public static boolean isSupportedCursor(Map<String, Integer> typeAliases, List<String> primaryKeys) {
-        if (CollectionUtils.isEmpty(typeAliases) || CollectionUtils.isEmpty(primaryKeys)) {
+    public static boolean isSupportedCursor(List<Field> fields) {
+        if (CollectionUtils.isEmpty(fields)) {
             return false;
         }
 
-        for (String pk : primaryKeys) {
-            Integer pkType = typeAliases.get(pk);
+        Map<String, Integer> typeAliases = new HashMap<>();
+        fields.forEach(field -> {
+            if (field.isPk()) {
+                typeAliases.put(field.getName(), field.getType());
+            }
+        });
+
+        for (Integer pkType : typeAliases.values()) {
             if (!isSupportedCursorType(pkType)) {
                 return false;
             }
         }
-        return true;
+        return !CollectionUtils.isEmpty(typeAliases);
     }
 
     /**

+ 1 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -404,7 +404,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         List<Field> fields = executor.getFields();
         List<String> primaryKeys = new ArrayList<>();
         primaryKeys.add(ConfigConstant.CONFIG_MODEL_ID);
-        final SqlBuilderConfig config = new SqlBuilderConfig(connector, "", table, primaryKeys, fields, "", "");
+        final SqlBuilderConfig config = new SqlBuilderConfig(connector, "", table, primaryKeys, fields, "");
 
         String query = SqlBuilderEnum.QUERY.getSqlBuilder().buildQuerySql(config);
         String insert = SqlBuilderEnum.INSERT.getSqlBuilder().buildSql(config);