Pārlūkot izejas kodu

修复游标查询

AE86 2 gadi atpakaļ
vecāks
revīzija
bc8cfd3d98

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -124,7 +124,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     @Override
     public Result reader(DatabaseConnectorMapper connectorMapper, ReaderConfig config) {
         // 1、获取select SQL
-        String queryKey = enableCursor() && null == config.getCursors() ? ConnectorConstant.OPERTION_QUERY_CURSOR : SqlBuilderEnum.QUERY.getName();
+        String queryKey = enableCursor() && null != config.getCursors() ? ConnectorConstant.OPERTION_QUERY_CURSOR : ConnectorConstant.OPERTION_QUERY;
         String querySql = config.getCommand().get(queryKey);
         Assert.hasText(querySql, "查询语句不能为空.");
 

+ 16 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -2,12 +2,14 @@ package org.dbsyncer.connector.mysql;
 
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.ReaderConfig;
+import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
 import java.util.List;
+import java.util.Map;
 
 public final class MysqlConnector extends AbstractDatabaseConnector {
 
@@ -32,10 +34,18 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
     public String getPageCursorSql(PageSql config) {
         final String quotation = config.getQuotation();
         final List<String> primaryKeys = config.getPrimaryKeys();
+        final SqlBuilderConfig sqlBuilderConfig = config.getSqlBuilderConfig();
+        Map<String, Integer> typeAliases = PrimaryKeyUtil.findPrimaryKeyType(sqlBuilderConfig.getFields());
+
         // select * from test.`my_user` where `id` > ? and `uid` > ? order by `id`,`uid` limit ?,?
         StringBuilder sql = new StringBuilder(config.getQuerySql());
-        boolean hasFilter = StringUtil.isNotBlank(config.getSqlBuilderConfig().getQueryFilter());
-        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " > ? ", !hasFilter);
+        boolean skipFirst = false;
+        // 没有过滤条件
+        if (StringUtil.isBlank(sqlBuilderConfig.getQueryFilter())) {
+            skipFirst = true;
+            sql.append(" WHERE ");
+        }
+        PrimaryKeyUtil.buildCursorSql(sql, primaryKeys, quotation, " AND ", typeAliases, skipFirst);
         sql.append(" ORDER BY ");
         PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
         sql.append(DatabaseConstant.MYSQL_PAGE_SQL);
@@ -47,12 +57,13 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
         int pageSize = config.getPageSize();
         Object[] cursors = config.getCursors();
         if (null == cursors) {
-            return new Object[]{pageSize};
+            return new Object[]{0, pageSize};
         }
         int cursorsLen = cursors.length;
-        Object[] newCursors = new Object[cursorsLen + 1];
+        Object[] newCursors = new Object[cursorsLen + 2];
         System.arraycopy(cursors, 0, newCursors, 0, cursorsLen);
-        newCursors[cursorsLen] = pageSize;
+        newCursors[cursorsLen] = 0;
+        newCursors[cursorsLen + 1] = pageSize;
         return newCursors;
     }
 

+ 62 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PrimaryKeyUtil.java

@@ -7,8 +7,10 @@ import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Table;
 
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +46,25 @@ public abstract class PrimaryKeyUtil {
         return Collections.unmodifiableList(primaryKeys);
     }
 
+    /**
+     * 返回主键字段类型
+     *
+     * @param fields
+     * @return
+     */
+    public static Map<String, Integer> findPrimaryKeyType(List<Field> fields) {
+        Map<String, Integer> map = new HashMap<>();
+        if (!CollectionUtils.isEmpty(fields)) {
+            fields.forEach(field -> {
+                if (field.isPk()) {
+                    map.put(field.getName(), field.getType());
+                }
+            });
+        }
+
+        return Collections.unmodifiableMap(map);
+    }
+
     public static void buildSql(StringBuilder sql, List<String> primaryKeys, String quotation, String join, String value, boolean skipFirst) {
         AtomicBoolean added = new AtomicBoolean();
         primaryKeys.forEach(pk -> {
@@ -61,6 +82,47 @@ public abstract class PrimaryKeyUtil {
         });
     }
 
+    /**
+     * 游标主键要包含数字类型,否则会导致分页失效
+     *
+     * @param sql
+     * @param primaryKeys
+     * @param quotation
+     * @param join
+     * @param typeAliases
+     * @param skipFirst
+     */
+    public static void buildCursorSql(StringBuilder sql, List<String> primaryKeys, String quotation, String join, Map<String, Integer> typeAliases, boolean skipFirst) {
+        AtomicBoolean added = new AtomicBoolean();
+        AtomicBoolean supportedCursor = new AtomicBoolean();
+        primaryKeys.forEach(pk -> {
+            if (!typeAliases.containsKey(pk)) {
+                throw new ConnectorException(String.format("Can't find type of primary key %s", pk));
+            }
+
+            // skip first pk
+            if (!skipFirst || added.get()) {
+                if (StringUtil.isNotBlank(join)) {
+                    sql.append(join);
+                }
+            }
+            sql.append(quotation).append(pk).append(quotation);
+
+            Integer pkType = typeAliases.get(pk);
+            if (pkType == Types.NUMERIC || pkType == Types.BIGINT || pkType == Types.INTEGER || pkType == Types.TINYINT || pkType == Types.SMALLINT) {
+                sql.append(" > ? ");
+                supportedCursor.set(true);
+            } else {
+                sql.append(" = ? ");
+            }
+            added.set(true);
+        });
+
+        if (!supportedCursor.get()) {
+            throw new ConnectorException("不支持游标查询,主键至少要有一个为数字类型");
+        }
+    }
+
     /**
      * 获取最新游标值
      *