AE86 il y a 3 ans
Parent
commit
d75dda5508

+ 21 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/PageArgConfig.java

@@ -0,0 +1,21 @@
+package org.dbsyncer.connector.config;
+
+public class PageArgConfig {
+
+    private String querySql;
+
+    private Object[] args;
+
+    public PageArgConfig(String querySql, Object[] args) {
+        this.querySql = querySql;
+        this.args = args;
+    }
+
+    public String getQuerySql() {
+        return querySql;
+    }
+
+    public Object[] getArgs() {
+        return args;
+    }
+}

+ 21 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/PageSqlBuilderConfig.java

@@ -0,0 +1,21 @@
+package org.dbsyncer.connector.config;
+
+public class PageSqlBuilderConfig {
+
+    private String querySql;
+
+    private SqlBuilderConfig config;
+
+    public PageSqlBuilderConfig(SqlBuilderConfig config, String querySql) {
+        this.config = config;
+        this.querySql = querySql;
+    }
+
+    public SqlBuilderConfig getConfig() {
+        return config;
+    }
+
+    public String getQuerySql() {
+        return querySql;
+    }
+}

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java

@@ -18,6 +18,11 @@ public class SqlBuilderConfig {
     // 引号
     private String      quotation;
 
+    public SqlBuilderConfig(String name, String pk) {
+        this.tableName = name;
+        this.pk = pk;
+    }
+
     public SqlBuilderConfig(Database database, String tableName, String pk, List<Field> fields, String queryFilter,
                             String quotation) {
         this.database = database;

+ 16 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/constant/DatabaseConstant.java

@@ -21,12 +21,26 @@ public class DatabaseConstant {
 
     //*********************************** SqlServer **************************************//
     /**
-     * SqlServer分页语句开始
+     * SqlServer分页语句
+     *
+     * <pre>select w.* from my_org a,
+     * (
+     *     select top 4 t.* from
+     * 		(
+     * 			select top 10 s.* from my_org s order by id
+     * 		) t
+     * ) w where a.id = w.id order by a.id
+     * </pre>
+     */
+    public static final String SQLSERVER_PAGE_SQL = "SELECT W.* FROM %s A, (SELECT TOP ? T.* FROM (SELECT TOP ? S.* FROM %s S ORDER BY %s) T) W WHERE A.%s = W.%s ORDER BY A.%s";
+
+    /**
+     * SqlServer分页语句开始(2012版本支持)
      */
     public static final String SQLSERVER_PAGE_SQL_START = " ORDER BY ";
 
     /**
-     * SqlServer分页语句结束
+     * SqlServer分页语句结束(2012版本支持)
      */
     public static final String SQLSERVER_PAGE_SQL_END = " OFFSET ? ROWS FETCH NEXT ? ROWS ONLY";
 

+ 6 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -168,10 +168,11 @@ public abstract class AbstractDatabaseConnector implements Database {
             jdbcTemplate = getJdbcTemplate(cfg);
 
             // 3、设置参数
-            Collections.addAll(config.getArgs(), getPageArgs(config.getPageIndex(), config.getPageSize()));
+            PageArgConfig pageArgConfig = prepareSetArgs(querySql, config.getPageIndex(), config.getPageSize());
+            Collections.addAll(config.getArgs(), pageArgConfig.getArgs());
 
             // 4、执行SQL
-            List<Map<String, Object>> list = jdbcTemplate.queryForList(querySql, config.getArgs().toArray());
+            List<Map<String, Object>> list = jdbcTemplate.queryForList(pageArgConfig.getQuerySql(), config.getArgs().toArray());
 
             // 5、返回结果集
             return new Result(new ArrayList<>(list));
@@ -381,7 +382,9 @@ public abstract class AbstractDatabaseConnector implements Database {
         }
         String quotation = buildSqlWithQuotation();
         String pk = DatabaseUtil.findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
-        map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(querySql, pk));
+        SqlBuilderConfig sqlBuilderConfig = new SqlBuilderConfig(table.getName(), pk);
+        PageSqlBuilderConfig config = new PageSqlBuilderConfig(sqlBuilderConfig, querySql);
+        map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(config));
 
         // 获取查询总数SQL
         StringBuilder queryCount = new StringBuilder();

+ 7 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/Database.java

@@ -2,6 +2,8 @@ package org.dbsyncer.connector.database;
 
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.PageArgConfig;
+import org.dbsyncer.connector.config.PageSqlBuilderConfig;
 import org.springframework.jdbc.core.JdbcTemplate;
 
 public interface Database extends Connector {
@@ -13,19 +15,19 @@ public interface Database extends Connector {
     /**
      * 获取分页SQL
      *
-     * @param querySQL
-     * @param pk
+     * @param config
      * @return
      */
-    String getPageSql(String querySQL, String pk);
+    String getPageSql(PageSqlBuilderConfig config);
 
     /**
-     * 获取分页参数
+     * 预设置分页参数
      *
+     * @param sql
      * @param pageIndex
      * @param pageSize
      * @return
      */
-    Object[] getPageArgs(int pageIndex, int pageSize);
+    PageArgConfig prepareSetArgs(String sql, int pageIndex, int pageSize);
 
 }

+ 2 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQuery.java

@@ -2,6 +2,7 @@ package org.dbsyncer.connector.database.sqlbuilder;
 
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.config.PageSqlBuilderConfig;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
 import org.dbsyncer.connector.database.Database;
@@ -19,7 +20,7 @@ public class SqlBuilderQuery extends AbstractSqlBuilder {
     public String buildSql(SqlBuilderConfig config) {
         // 分页语句
         Database database = config.getDatabase();
-        return database.getPageSql(buildQuerySql(config), config.getPk());
+        return database.getPageSql(new PageSqlBuilderConfig(config, buildQuerySql(config)));
     }
 
     @Override

+ 6 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -1,6 +1,8 @@
 package org.dbsyncer.connector.mysql;
 
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.PageArgConfig;
+import org.dbsyncer.connector.config.PageSqlBuilderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 
@@ -12,13 +14,13 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public String getPageSql(String querySQL, String pk) {
-        return querySQL + DatabaseConstant.MYSQL_PAGE_SQL;
+    public String getPageSql(PageSqlBuilderConfig config) {
+        return config.getQuerySql() + DatabaseConstant.MYSQL_PAGE_SQL;
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
-        return new Object[]{(pageIndex - 1) * pageSize, pageSize};
+    public PageArgConfig prepareSetArgs(String sql, int pageIndex, int pageSize) {
+        return new PageArgConfig(sql, new Object[] {(pageIndex - 1) * pageSize, pageSize});
     }
 
 }

+ 7 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -1,6 +1,8 @@
 package org.dbsyncer.connector.oracle;
 
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.PageArgConfig;
+import org.dbsyncer.connector.config.PageSqlBuilderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 
@@ -12,17 +14,17 @@ public final class OracleConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public String getPageSql(String querySQL, String pk) {
-        return DatabaseConstant.ORACLE_PAGE_SQL_START + querySQL + DatabaseConstant.ORACLE_PAGE_SQL_END;
+    public String getPageSql(PageSqlBuilderConfig config) {
+        return DatabaseConstant.ORACLE_PAGE_SQL_START + config.getQuerySql() + DatabaseConstant.ORACLE_PAGE_SQL_END;
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
-        return new Object[]{pageIndex * pageSize, (pageIndex - 1) * pageSize};
+    public PageArgConfig prepareSetArgs(String sql, int pageIndex, int pageSize) {
+        return new PageArgConfig(sql, new Object[] {pageIndex * pageSize, (pageIndex - 1) * pageSize});
     }
 
     @Override
-    protected String buildSqlWithQuotation(){
+    protected String buildSqlWithQuotation() {
         return "\"";
     }
 }

+ 5 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLMysqlConnector.java

@@ -1,9 +1,6 @@
 package org.dbsyncer.connector.sql;
 
-import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.MetaInfo;
+import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 
@@ -18,13 +15,13 @@ public final class DQLMysqlConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public String getPageSql(String querySQL, String pk) {
-        return querySQL + DatabaseConstant.MYSQL_PAGE_SQL;
+    public String getPageSql(PageSqlBuilderConfig config) {
+        return config.getQuerySql() + DatabaseConstant.MYSQL_PAGE_SQL;
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
-        return new Object[] {(pageIndex - 1) * pageSize, pageSize};
+    public PageArgConfig prepareSetArgs(String sql, int pageIndex, int pageSize) {
+        return new PageArgConfig(sql, new Object[] {(pageIndex - 1) * pageSize, pageSize});
     }
 
     @Override

+ 7 - 10
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java

@@ -1,9 +1,6 @@
 package org.dbsyncer.connector.sql;
 
-import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.MetaInfo;
+import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 
@@ -18,13 +15,13 @@ public final class DQLOracleConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public String getPageSql(String querySQL, String pk) {
-        return DatabaseConstant.ORACLE_PAGE_SQL_START + querySQL + DatabaseConstant.ORACLE_PAGE_SQL_END;
+    public String getPageSql(PageSqlBuilderConfig config) {
+        return DatabaseConstant.ORACLE_PAGE_SQL_START + config.getQuerySql() + DatabaseConstant.ORACLE_PAGE_SQL_END;
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
-        return new Object[]{pageIndex * pageSize, (pageIndex - 1) * pageSize};
+    public PageArgConfig prepareSetArgs(String sql, int pageIndex, int pageSize) {
+        return new PageArgConfig(sql, new Object[] {pageIndex * pageSize, (pageIndex - 1) * pageSize});
     }
 
     @Override
@@ -39,11 +36,11 @@ public final class DQLOracleConnector extends AbstractDatabaseConnector {
 
     @Override
     public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
-        return super.getDqlSourceCommand(commandConfig,false);
+        return super.getDqlSourceCommand(commandConfig, false);
     }
 
     @Override
-    protected String buildSqlWithQuotation(){
+    protected String buildSqlWithQuotation() {
         return "\"";
     }
 

+ 13 - 12
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java

@@ -2,10 +2,7 @@ package org.dbsyncer.connector.sql;
 
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.ConnectorException;
-import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.MetaInfo;
+import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.slf4j.Logger;
@@ -24,17 +21,21 @@ public final class DQLSqlServerConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public String getPageSql(String querySQL, String pk) {
-        if(StringUtils.isBlank(pk)){
-            logger.error("Table primary key can not be empty.");
-            throw new ConnectorException("Table primary key can not be empty.");
+    public String getPageSql(PageSqlBuilderConfig config) {
+        String pk = config.getConfig().getPk();
+        String tableName = config.getConfig().getTableName();
+        if (StringUtils.isBlank(pk) || StringUtils.isBlank(tableName)) {
+            logger.error("Table primary key and name can not be empty.");
+            throw new ConnectorException("Table primary key and name can not be empty.");
         }
-        return new StringBuilder(querySQL).append(DatabaseConstant.SQLSERVER_PAGE_SQL_START).append(pk).append(DatabaseConstant.SQLSERVER_PAGE_SQL_END).toString();
+        return String.format(DatabaseConstant.SQLSERVER_PAGE_SQL, tableName, tableName, pk, pk, pk, pk);
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
-        return new Object[]{(pageIndex - 1) * pageSize, pageSize};
+    public PageArgConfig prepareSetArgs(String sql, int pageIndex, int pageSize) {
+        sql = sql.replaceFirst("\\?", String.valueOf(pageSize));
+        sql = sql.replaceFirst("\\?", String.valueOf((pageIndex - 1) * pageSize + 1));
+        return new PageArgConfig(sql, new Object[] {});
     }
 
     @Override
@@ -49,7 +50,7 @@ public final class DQLSqlServerConnector extends AbstractDatabaseConnector {
 
     @Override
     public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
-        return super.getDqlSourceCommand(commandConfig,false);
+        return super.getDqlSourceCommand(commandConfig, false);
     }
 
 }

+ 16 - 12
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -2,9 +2,7 @@ package org.dbsyncer.connector.sqlserver;
 
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.ConnectorException;
-import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
@@ -24,17 +22,22 @@ public final class SqlServerConnector extends AbstractDatabaseConnector implemen
     }
 
     @Override
-    public String getPageSql(String querySQL, String pk) {
-        if(StringUtils.isBlank(pk)){
-            logger.error("Table primary key can not be empty.");
-            throw new ConnectorException("Table primary key can not be empty.");
+    public String getPageSql(PageSqlBuilderConfig config) {
+        String pk = config.getConfig().getPk();
+        String tableName = config.getConfig().getTableName();
+        if (StringUtils.isBlank(pk) || StringUtils.isBlank(tableName)) {
+            logger.error("Table primary key and name can not be empty.");
+            throw new ConnectorException("Table primary key and name can not be empty.");
         }
-        return new StringBuilder(querySQL).append(DatabaseConstant.SQLSERVER_PAGE_SQL_START).append(pk).append(DatabaseConstant.SQLSERVER_PAGE_SQL_END).toString();
+        return String.format(DatabaseConstant.SQLSERVER_PAGE_SQL, tableName, tableName, pk, pk, pk, pk);
     }
 
     @Override
-    public Object[] getPageArgs(int pageIndex, int pageSize) {
-        return new Object[]{(pageIndex - 1) * pageSize, pageSize};
+    public PageArgConfig prepareSetArgs(String sql, int pageIndex, int pageSize) {
+        // FIXME 分页有问题
+        sql = sql.replaceFirst("\\?", String.valueOf(pageSize));
+        sql = sql.replaceFirst("\\?", String.valueOf((pageIndex - 1) * pageSize + 1));
+        return new PageArgConfig(sql, new Object[] {});
     }
 
     @Override
@@ -53,9 +56,10 @@ public final class SqlServerConnector extends AbstractDatabaseConnector implemen
         StringBuilder queryCount = new StringBuilder();
         if (StringUtils.isNotBlank(queryFilterSql)) {
             queryCount.append("SELECT COUNT(*) FROM ").append(table.getName()).append(queryFilterSql);
-        }else{
+        } else {
             // 从存储过程查询(定时更新总数,可能存在误差)
-            queryCount.append("SELECT ROWS FROM SYSINDEXES WHERE ID = OBJECT_ID('").append("DBO.").append(table.getName()).append("') AND INDID IN (0, 1)");
+            queryCount.append("SELECT ROWS FROM SYSINDEXES WHERE ID = OBJECT_ID('").append("DBO.").append(table.getName()).append(
+                    "') AND INDID IN (0, 1)");
         }
         map.put(ConnectorConstant.OPERTION_QUERY_COUNT, queryCount.toString());
         return map;

+ 41 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -339,6 +339,47 @@ public class SqlServerExtractor extends AbstractExtractor {
         return apply;
     }
 
+    enum TableOperation {
+        /**
+         * 插入
+         */
+        INSERT(2),
+        /**
+         * 更新(旧值)
+         */
+        UPDATE_BEFORE(3),
+        /**
+         * 更新(新值)
+         */
+        UPDATE_AFTER(4),
+        /**
+         * 删除
+         */
+        DELETE(1);
+
+        private final int code;
+
+        TableOperation(int code) {
+            this.code = code;
+        }
+
+        public static boolean isInsert(int code) {
+            return INSERT.getCode() == code;
+        }
+
+        public static boolean isUpdate(int code) {
+            return UPDATE_AFTER.getCode() == code;
+        }
+
+        public static boolean isDelete(int code) {
+            return DELETE.getCode() == code;
+        }
+
+        public int getCode() {
+            return code;
+        }
+    }
+
     final class Worker extends Thread {
 
         @Override