Browse Source

支持oracle游标

AE86 2 years ago
parent
commit
29730cbba0

+ 31 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -20,6 +20,7 @@ import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Filter;
 import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.util.DatabaseUtil;
 import org.dbsyncer.connector.util.PrimaryKeyUtil;
@@ -130,7 +131,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         Assert.hasText(querySql, "查询语句不能为空.");
 
         // 2、设置参数
-        Collections.addAll(config.getArgs(), getPageArgs(config));
+        Collections.addAll(config.getArgs(), config.isSupportedCursor() ? getPageCursorArgs(config) : getPageArgs(config));
 
         // 3、执行SQL
         List<Map<String, Object>> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
@@ -249,7 +250,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     }
 
     /**
-     * 是否支持游标查询
+     * 是否使用游标查询
      *
      * @return
      */
@@ -257,6 +258,34 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return false;
     }
 
+    /**
+     * 是否支持游标配置
+     *
+     * @param config
+     * @return
+     */
+    protected boolean isSupportedCursor(PageSql config) {
+        final List<String> primaryKeys = config.getPrimaryKeys();
+        final SqlBuilderConfig sqlBuilderConfig = config.getSqlBuilderConfig();
+        final Map<String, Integer> typeAliases = PrimaryKeyUtil.findPrimaryKeyType(sqlBuilderConfig.getFields());
+        return PrimaryKeyUtil.isSupportedCursor(typeAliases, primaryKeys);
+    }
+
+    /**
+     * 如果满足游标,追加主键排序
+     *
+     * @param config
+     * @param sql
+     */
+    protected void appendOrderByPkIfSupportedCursor(PageSql config, StringBuilder sql) {
+        if (isSupportedCursor(config)) {
+            sql.append(" ORDER BY ");
+            final List<String> primaryKeys = config.getPrimaryKeys();
+            final String quotation = config.getQuotation();
+            PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        }
+    }
+
     /**
      * 健康检查
      *

+ 12 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/Database.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.database;
 
+import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.model.PageSql;
 
@@ -24,11 +25,21 @@ public interface Database {
     }
 
     /**
-     * 获取分页SQL
+     * 获取分页参数
      *
      * @param config
      * @return
      */
     Object[] getPageArgs(ReaderConfig config);
 
+    /**
+     * 获取游标分页参数
+     *
+     * @param config
+     * @return
+     */
+    default Object[] getPageCursorArgs(ReaderConfig config){
+        throw new ConnectorException("Unsupported override method getPageCursorArgs:" + getClass().getName());
+    }
+
 }

+ 22 - 30
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -2,7 +2,6 @@ package org.dbsyncer.connector.mysql;
 
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.ReaderConfig;
-import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.model.PageSql;
@@ -11,7 +10,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.Map;
 
 public final class MysqlConnector extends AbstractDatabaseConnector {
 
@@ -24,25 +22,17 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
 
     @Override
     public String getPageSql(PageSql config) {
-        final String quotation = config.getQuotation();
-        final List<String> primaryKeys = config.getPrimaryKeys();
         // select * from test.`my_user` where `id` > ? and `uid` > ? order by `id`,`uid` limit ?,?
         StringBuilder sql = new StringBuilder(config.getQuerySql());
-        sql.append(" ORDER BY ");
-        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        appendOrderByPkIfSupportedCursor(config, sql);
         sql.append(DatabaseConstant.MYSQL_PAGE_SQL);
         return sql.toString();
     }
 
     @Override
     public String getPageCursorSql(PageSql config) {
-        final String quotation = config.getQuotation();
-        final List<String> primaryKeys = config.getPrimaryKeys();
-        final SqlBuilderConfig sqlBuilderConfig = config.getSqlBuilderConfig();
-        Map<String, Integer> typeAliases = PrimaryKeyUtil.findPrimaryKeyType(sqlBuilderConfig.getFields());
-
         // 不支持游标查询
-        if (!PrimaryKeyUtil.isSupportedCursor(typeAliases, primaryKeys)) {
+        if (!isSupportedCursor(config)) {
             logger.warn("不支持游标查询,主键包含非数字类型");
             return "";
         }
@@ -51,13 +41,14 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
         StringBuilder sql = new StringBuilder(config.getQuerySql());
         boolean skipFirst = false;
         // 没有过滤条件
-        if (StringUtil.isBlank(sqlBuilderConfig.getQueryFilter())) {
+        if (StringUtil.isBlank(config.getSqlBuilderConfig().getQueryFilter())) {
             skipFirst = true;
             sql.append(" WHERE ");
         }
+        final String quotation = config.getQuotation();
+        final List<String> primaryKeys = config.getPrimaryKeys();
         PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " > ? ", skipFirst);
-        sql.append(" ORDER BY ");
-        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        appendOrderByPkIfSupportedCursor(config, sql);
         sql.append(DatabaseConstant.MYSQL_PAGE_SQL);
         return sql.toString();
     }
@@ -65,22 +56,23 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
     @Override
     public Object[] getPageArgs(ReaderConfig config) {
         int pageSize = config.getPageSize();
-        // 通过游标分页
-        if (config.isSupportedCursor()) {
-            Object[] cursors = config.getCursors();
-            if (null == cursors) {
-                return new Object[]{0, pageSize};
-            }
-            int cursorsLen = cursors.length;
-            Object[] newCursors = new Object[cursorsLen + 2];
-            System.arraycopy(cursors, 0, newCursors, 0, cursorsLen);
-            newCursors[cursorsLen] = 0;
-            newCursors[cursorsLen + 1] = pageSize;
-            return newCursors;
-        }
+        int pageIndex = config.getPageIndex();
+        return new Object[]{(pageIndex - 1) * pageSize, pageSize};
+    }
 
-        // 普通分页
-        return new Object[]{(config.getPageIndex() - 1) * pageSize, pageSize};
+    @Override
+    public Object[] getPageCursorArgs(ReaderConfig config) {
+        int pageSize = config.getPageSize();
+        Object[] cursors = config.getCursors();
+        if (null == cursors) {
+            return new Object[]{0, pageSize};
+        }
+        int cursorsLen = cursors.length;
+        Object[] newCursors = new Object[cursorsLen + 2];
+        System.arraycopy(cursors, 0, newCursors, 0, cursorsLen);
+        newCursors[cursorsLen] = 0;
+        newCursors[cursorsLen + 1] = pageSize;
+        return newCursors;
     }
 
     @Override

+ 61 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -9,18 +9,61 @@ import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.sql.Types;
+import java.util.List;
 
 public final class OracleConnector extends AbstractDatabaseConnector {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
     public OracleConnector() {
         VALUE_MAPPERS.put(Types.OTHER, new OracleOtherValueMapper());
     }
 
+    @Override
+    public String buildSqlWithQuotation() {
+        return "\"";
+    }
+
     @Override
     public String getPageSql(PageSql config) {
-        return DatabaseConstant.ORACLE_PAGE_SQL_START + config.getQuerySql() + DatabaseConstant.ORACLE_PAGE_SQL_END;
+        // SELECT * FROM (SELECT A.*, ROWNUM RN FROM (select * from test."my_user" where "id" > ? and "uid" > ? order by "id","uid")A WHERE ROWNUM <= ?) WHERE RN > ?
+        StringBuilder sql = new StringBuilder();
+        sql.append(DatabaseConstant.ORACLE_PAGE_SQL_START);
+        sql.append(config.getQuerySql());
+        appendOrderByPkIfSupportedCursor(config, sql);
+        sql.append(DatabaseConstant.ORACLE_PAGE_SQL_END);
+        return sql.toString();
+    }
+
+    @Override
+    public String getPageCursorSql(PageSql config) {
+        // 不支持游标查询
+        if (!isSupportedCursor(config)) {
+            logger.warn("不支持游标查询,主键包含非数字类型");
+            return "";
+        }
+
+        // SELECT * FROM (SELECT A.*, ROWNUM RN FROM (select * from test."my_user" where "id" > ? and "uid" > ? order by "id","uid")A WHERE ROWNUM <= ?) WHERE RN > ?
+        StringBuilder sql = new StringBuilder();
+        sql.append(DatabaseConstant.ORACLE_PAGE_SQL_START);
+        sql.append(config.getQuerySql());
+        boolean skipFirst = false;
+        // 没有过滤条件
+        if (StringUtil.isBlank(config.getSqlBuilderConfig().getQueryFilter())) {
+            skipFirst = true;
+            sql.append(" WHERE ");
+        }
+        final String quotation = config.getQuotation();
+        final List<String> primaryKeys = config.getPrimaryKeys();
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " > ? ", skipFirst);
+        appendOrderByPkIfSupportedCursor(config, sql);
+        sql.append(DatabaseConstant.MYSQL_PAGE_SQL);
+        return sql.toString();
     }
 
     @Override
@@ -31,8 +74,23 @@ public final class OracleConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public String buildSqlWithQuotation() {
-        return "\"";
+    public Object[] getPageCursorArgs(ReaderConfig config) {
+        int pageSize = config.getPageSize();
+        Object[] cursors = config.getCursors();
+        if (null == cursors) {
+            return new Object[]{pageSize, 0};
+        }
+        int cursorsLen = cursors.length;
+        Object[] newCursors = new Object[cursorsLen + 2];
+        System.arraycopy(cursors, 0, newCursors, 0, cursorsLen);
+        newCursors[cursorsLen] = pageSize;
+        newCursors[cursorsLen + 1] = 0;
+        return newCursors;
+    }
+
+    @Override
+    protected boolean enableCursor() {
+        return true;
     }
 
     @Override

+ 22 - 30
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -4,7 +4,6 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
-import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.enums.TableTypeEnum;
@@ -16,7 +15,6 @@ import org.slf4j.LoggerFactory;
 
 import java.sql.Types;
 import java.util.List;
-import java.util.Map;
 
 public final class PostgreSQLConnector extends AbstractDatabaseConnector {
 
@@ -33,25 +31,17 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
 
     @Override
     public String getPageSql(PageSql config) {
-        final String quotation = config.getQuotation();
-        final List<String> primaryKeys = config.getPrimaryKeys();
         // select * from test."my_user" where "id" > ? and "uid" > ? order by "id","uid" limit ? OFFSET ?
         StringBuilder sql = new StringBuilder(config.getQuerySql());
-        sql.append(" ORDER BY ");
-        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        appendOrderByPkIfSupportedCursor(config, sql);
         sql.append(DatabaseConstant.POSTGRESQL_PAGE_SQL);
         return sql.toString();
     }
 
     @Override
     public String getPageCursorSql(PageSql config) {
-        final String quotation = config.getQuotation();
-        final List<String> primaryKeys = config.getPrimaryKeys();
-        final SqlBuilderConfig sqlBuilderConfig = config.getSqlBuilderConfig();
-        Map<String, Integer> typeAliases = PrimaryKeyUtil.findPrimaryKeyType(sqlBuilderConfig.getFields());
-
         // 不支持游标查询
-        if (!PrimaryKeyUtil.isSupportedCursor(typeAliases, primaryKeys)) {
+        if (!isSupportedCursor(config)) {
             logger.warn("不支持游标查询,主键包含非数字类型");
             return "";
         }
@@ -60,36 +50,38 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
         StringBuilder sql = new StringBuilder(config.getQuerySql());
         boolean skipFirst = false;
         // 没有过滤条件
-        if (StringUtil.isBlank(sqlBuilderConfig.getQueryFilter())) {
+        if (StringUtil.isBlank(config.getSqlBuilderConfig().getQueryFilter())) {
             skipFirst = true;
             sql.append(" WHERE ");
         }
+        final List<String> primaryKeys = config.getPrimaryKeys();
+        final String quotation = config.getQuotation();
         PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " > ? ", skipFirst);
-        sql.append(" ORDER BY ");
-        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        appendOrderByPkIfSupportedCursor(config, sql);
         sql.append(DatabaseConstant.POSTGRESQL_PAGE_SQL);
         return sql.toString();
     }
 
     @Override
     public Object[] getPageArgs(ReaderConfig config) {
+        int pageIndex = config.getPageIndex();
         int pageSize = config.getPageSize();
-        // 通过游标分页
-        if (config.isSupportedCursor()) {
-            Object[] cursors = config.getCursors();
-            if (null == cursors) {
-                return new Object[]{pageSize, 0};
-            }
-            int cursorsLen = cursors.length;
-            Object[] newCursors = new Object[cursorsLen + 2];
-            System.arraycopy(cursors, 0, newCursors, 0, cursorsLen);
-            newCursors[cursorsLen] = pageSize;
-            newCursors[cursorsLen + 1] = 0;
-            return newCursors;
-        }
+        return new Object[]{pageSize, (pageIndex - 1) * pageSize};
+    }
 
-        // 普通分页
-        return new Object[]{pageSize, (config.getPageIndex() - 1) * pageSize};
+    @Override
+    public Object[] getPageCursorArgs(ReaderConfig config) {
+        int pageSize = config.getPageSize();
+        Object[] cursors = config.getCursors();
+        if (null == cursors) {
+            return new Object[]{pageSize, 0};
+        }
+        int cursorsLen = cursors.length;
+        Object[] newCursors = new Object[cursorsLen + 2];
+        System.arraycopy(cursors, 0, newCursors, 0, cursorsLen);
+        newCursors[cursorsLen] = pageSize;
+        newCursors[cursorsLen + 1] = 0;
+        return newCursors;
     }
 
     @Override