AE86 2 rokov pred
rodič
commit
3f510d1b79

+ 7 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java

@@ -7,13 +7,15 @@ public class ReaderConfig {
 
     private Map<String, String> command;
     private List<Object> args;
+    private boolean supportedCursor;
     private Object[] cursors;
     private int pageIndex;
     private int pageSize;
 
-    public ReaderConfig(Map<String,String> command, List<Object> args, Object[] cursors, int pageIndex, int pageSize) {
+    public ReaderConfig(Map<String,String> command, List<Object> args, boolean supportedCursor, Object[] cursors, int pageIndex, int pageSize) {
         this.command = command;
         this.args = args;
+        this.supportedCursor = supportedCursor;
         this.cursors = cursors;
         this.pageIndex = pageIndex;
         this.pageSize = pageSize;
@@ -38,4 +40,8 @@ public class ReaderConfig {
     public int getPageSize() {
         return pageSize;
     }
+
+    public boolean isSupportedCursor() {
+        return supportedCursor;
+    }
 }

+ 2 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -124,7 +124,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     @Override
     public Result reader(DatabaseConnectorMapper connectorMapper, ReaderConfig config) {
         // 1、获取select SQL
-        String queryKey = enableCursor() && null != config.getCursors() ? ConnectorConstant.OPERTION_QUERY_CURSOR : ConnectorConstant.OPERTION_QUERY;
+        boolean supportedCursor = enableCursor() && config.isSupportedCursor() && null != config.getCursors();
+        String queryKey = supportedCursor ? ConnectorConstant.OPERTION_QUERY_CURSOR : ConnectorConstant.OPERTION_QUERY;
         String querySql = config.getCommand().get(queryKey);
         Assert.hasText(querySql, "查询语句不能为空.");
 

+ 26 - 10
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -7,12 +7,16 @@ import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.util.PrimaryKeyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
 
 public final class MysqlConnector extends AbstractDatabaseConnector {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
     @Override
     public String buildSqlWithQuotation() {
         return "`";
@@ -37,6 +41,12 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
         final SqlBuilderConfig sqlBuilderConfig = config.getSqlBuilderConfig();
         Map<String, Integer> typeAliases = PrimaryKeyUtil.findPrimaryKeyType(sqlBuilderConfig.getFields());
 
+        // 不支持游标查询
+        if (!PrimaryKeyUtil.isSupportedCursor(typeAliases, primaryKeys)) {
+            logger.warn("不支持游标查询,主键包含非数字类型");
+            return "";
+        }
+
         // select * from test.`my_user` where `id` > ? and `uid` > ? order by `id`,`uid` limit ?,?
         StringBuilder sql = new StringBuilder(config.getQuerySql());
         boolean skipFirst = false;
@@ -45,7 +55,7 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
             skipFirst = true;
             sql.append(" WHERE ");
         }
-        PrimaryKeyUtil.buildCursorSql(sql, primaryKeys, quotation, " AND ", typeAliases, skipFirst);
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " > ? ", skipFirst);
         sql.append(" ORDER BY ");
         PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
         sql.append(DatabaseConstant.MYSQL_PAGE_SQL);
@@ -55,16 +65,22 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
     @Override
     public Object[] getPageArgs(ReaderConfig config) {
         int pageSize = config.getPageSize();
-        Object[] cursors = config.getCursors();
-        if (null == cursors) {
-            return new Object[]{0, pageSize};
+        // 通过游标分页
+        if (config.isSupportedCursor()) {
+            Object[] cursors = config.getCursors();
+            if (null == cursors) {
+                return new Object[]{0, pageSize};
+            }
+            int cursorsLen = cursors.length;
+            Object[] newCursors = new Object[cursorsLen + 2];
+            System.arraycopy(cursors, 0, newCursors, 0, cursorsLen);
+            newCursors[cursorsLen] = 0;
+            newCursors[cursorsLen + 1] = pageSize;
+            return newCursors;
         }
-        int cursorsLen = cursors.length;
-        Object[] newCursors = new Object[cursorsLen + 2];
-        System.arraycopy(cursors, 0, newCursors, 0, cursorsLen);
-        newCursors[cursorsLen] = 0;
-        newCursors[cursorsLen + 1] = pageSize;
-        return newCursors;
+
+        // 普通分页
+        return new Object[]{(config.getPageIndex() - 1) * pageSize, pageSize};
     }
 
     @Override

+ 22 - 31
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PrimaryKeyUtil.java

@@ -83,44 +83,34 @@ public abstract class PrimaryKeyUtil {
     }
 
     /**
-     * 游标主键要包含数字类型,否则会导致分页失效
+     * 游标主键必须为数字类型,否则会导致分页失效
      *
-     * @param sql
-     * @param primaryKeys
-     * @param quotation
-     * @param join
      * @param typeAliases
-     * @param skipFirst
+     * @param primaryKeys
+     * @return
      */
-    public static void buildCursorSql(StringBuilder sql, List<String> primaryKeys, String quotation, String join, Map<String, Integer> typeAliases, boolean skipFirst) {
-        AtomicBoolean added = new AtomicBoolean();
-        AtomicBoolean supportedCursor = new AtomicBoolean();
-        primaryKeys.forEach(pk -> {
-            if (!typeAliases.containsKey(pk)) {
-                throw new ConnectorException(String.format("Can't find type of primary key %s", pk));
-            }
-
-            // skip first pk
-            if (!skipFirst || added.get()) {
-                if (StringUtil.isNotBlank(join)) {
-                    sql.append(join);
-                }
-            }
-            sql.append(quotation).append(pk).append(quotation);
+    public static boolean isSupportedCursor(Map<String, Integer> typeAliases, List<String> primaryKeys) {
+        if (CollectionUtils.isEmpty(typeAliases) || CollectionUtils.isEmpty(primaryKeys)) {
+            return false;
+        }
 
+        for (String pk : primaryKeys) {
             Integer pkType = typeAliases.get(pk);
-            if (pkType == Types.NUMERIC || pkType == Types.BIGINT || pkType == Types.INTEGER || pkType == Types.TINYINT || pkType == Types.SMALLINT) {
-                sql.append(" > ? ");
-                supportedCursor.set(true);
-            } else {
-                sql.append(" = ? ");
+            if (!isSupportedCursorType(pkType)) {
+                return false;
             }
-            added.set(true);
-        });
-
-        if (!supportedCursor.get()) {
-            throw new ConnectorException("不支持游标查询,主键至少要有一个为数字类型");
         }
+        return true;
+    }
+
+    /**
+     * 是否支持游标类型(数字)
+     *
+     * @param type
+     * @return
+     */
+    private static boolean isSupportedCursorType(Integer type) {
+        return type == Types.NUMERIC || type == Types.BIGINT || type == Types.INTEGER || type == Types.TINYINT || type == Types.SMALLINT;
     }
 
     /**
@@ -159,4 +149,5 @@ public abstract class PrimaryKeyUtil {
         }
         return cursors;
     }
+
 }

+ 7 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -98,6 +98,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
     private void execute(TableGroupCommand tableGroupCommand, int index) {
         final Map<String, String> command = tableGroupCommand.getCommand();
         final List<String> primaryKeys = tableGroupCommand.getPrimaryKeys();
+        boolean supportedCursor = StringUtil.isNotBlank(command.get(ConnectorConstant.OPERTION_QUERY_CURSOR));
 
         // 检查增量点
         ConnectorMapper connectionMapper = connectorFactory.connect(connectorConfig);
@@ -106,9 +107,11 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
         Object[] cursors = PrimaryKeyUtil.getLastCursors(snapshot.get(index + CURSOR));
 
         while (running) {
-            Result reader = connectorFactory.reader(connectionMapper, new ReaderConfig(point.getCommand(), point.getArgs(), cursors, pageIndex++, READ_NUM));
+            ReaderConfig readerConfig = new ReaderConfig(point.getCommand(), point.getArgs(), supportedCursor, cursors, pageIndex++, READ_NUM);
+            Result reader = connectorFactory.reader(connectionMapper, readerConfig);
             List<Map> data = reader.getSuccessData();
             if (CollectionUtils.isEmpty(data)) {
+                cursors = new Object[0];
                 break;
             }
 
@@ -139,6 +142,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
             point.refresh();
 
             if (data.size() < READ_NUM) {
+                cursors = new Object[0];
                 break;
             }
         }
@@ -146,6 +150,8 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
         // 持久化
         if (point.refreshed()) {
             snapshot.putAll(point.getPosition());
+        }
+        if (supportedCursor) {
             snapshot.put(index + CURSOR, StringUtil.join(cursors, ","));
         }
 

+ 4 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -9,6 +9,7 @@ import org.dbsyncer.common.spi.ConnectorMapper;
 import org.dbsyncer.common.spi.ConvertContext;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
@@ -250,7 +251,7 @@ public class ParserFactory implements Parser {
         // 获取同步字段
         Picker picker = new Picker(fieldMapping);
         List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(tableGroup.getSourceTable());
-
+        boolean supportedCursor = StringUtil.isNotBlank(command.get(ConnectorConstant.OPERTION_QUERY_CURSOR));
         int pageSize = mapping.getReadNum();
         int batchSize = mapping.getBatchNum();
         final ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
@@ -265,7 +266,8 @@ public class ParserFactory implements Parser {
             }
 
             // 1、获取数据源数据
-            Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), task.getCursors(), task.getPageIndex(), pageSize));
+            ReaderConfig readerConfig = new ReaderConfig(command, new ArrayList<>(), supportedCursor, task.getCursors(), task.getPageIndex(), pageSize);
+            Result reader = connectorFactory.reader(sConnectorMapper, readerConfig);
             List<Map> source = reader.getSuccessData();
             if (CollectionUtils.isEmpty(source)) {
                 logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);