Browse Source

fix cursor

AE86 2 years ago
parent
commit
f4e9f3b35f

+ 1 - 20
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -29,8 +29,7 @@ import java.sql.*;
 import java.util.*;
 import java.util.stream.Collectors;
 
-public abstract class AbstractDatabaseConnector extends AbstractConnector
-        implements Connector<DatabaseConnectorMapper, DatabaseConfig>, Database {
+public abstract class AbstractDatabaseConnector extends AbstractConnector implements Connector<DatabaseConnectorMapper, DatabaseConfig>, Database {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -343,24 +342,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return new MetaInfo().setColumn(fields);
     }
 
-    /**
-     * 获取查询游标SQL
-     *
-     * @param commandConfig
-     * @return
-     */
-    protected String getQueryCursorSql(CommandConfig commandConfig) {
-        final String table = commandConfig.getTable().getName();
-        final String quotation = buildSqlWithQuotation();
-        final String pk = findOriginalTablePrimaryKey(commandConfig, quotation);
-        final String schema = getSchema((DatabaseConfig) commandConfig.getConnectorConfig(), quotation);
-
-        // select `id` from test.`my_user` order by `id` limit ?
-        StringBuilder queryCursor = new StringBuilder();
-        queryCursor.append("SELECT").append(pk).append(" FROM ").append(schema).append(table).append(" ORDER BY ").append(pk).append(" LIMIT 1");
-        return queryCursor.toString();
-    }
-
     /**
      * 获取查询总数SQL
      *

+ 4 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -28,7 +28,7 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
         }else {
             querySql.append(" WHERE ");
         }
-        querySql.append(" ").append(quotation).append(pk).append(quotation).append(" > ? ORDER BY ").append(quotation).append(pk).append(quotation).append(" limit ?");
+        querySql.append(quotation).append(pk).append(quotation).append(" > ? ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
         return querySql.toString();
     }
 
@@ -37,7 +37,7 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
         final String quotation = buildSqlWithQuotation();
         final String pk = config.getPk();
         // select * from test.`my_user` order by `id` limit ?
-        StringBuilder querySql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ").append(quotation).append(pk).append(quotation).append(" limit ?");
+        StringBuilder querySql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
         return querySql.toString();
     }
 
@@ -46,9 +46,9 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
         int pageSize = config.getPageSize();
         String cursor = config.getCursor();
         if (StringUtil.isBlank(cursor)) {
-            return new Object[]{pageSize};
+            return new Object[] {pageSize};
         }
-        return new Object[]{cursor, pageSize};
+        return new Object[] {cursor, pageSize};
     }
 
     @Override

+ 1 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -112,6 +112,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         meta.setEndTime(task.getEndTime());
         Map<String, String> snapshot = meta.getSnapshot();
         snapshot.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(task.getPageIndex()));
+        snapshot.put(ParserEnum.CURSOR.getCode(), task.getCursor());
         snapshot.put(ParserEnum.TABLE_GROUP_INDEX.getCode(), String.valueOf(task.getTableGroupIndex()));
         manager.editMeta(meta);
     }