Prechádzať zdrojové kódy

支持pg游标查询 https://gitee.com/ghi/dbsyncer/issues/I5RWUX

AE86 2 rokov pred
rodič
commit
55e9318b4c

+ 3 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java

@@ -7,11 +7,11 @@ public class ReaderConfig {
 
 
     private Map<String, String> command;
     private Map<String, String> command;
     private List<Object> args;
     private List<Object> args;
-    private String cursor;
+    private Object cursor;
     private int pageIndex;
     private int pageIndex;
     private int pageSize;
     private int pageSize;
 
 
-    public ReaderConfig(Map<String,String> command, List<Object> args, String cursor, int pageIndex, int pageSize) {
+    public ReaderConfig(Map<String,String> command, List<Object> args, Object cursor, int pageIndex, int pageSize) {
         this.command = command;
         this.command = command;
         this.args = args;
         this.args = args;
         this.cursor = cursor;
         this.cursor = cursor;
@@ -27,7 +27,7 @@ public class ReaderConfig {
         return args;
         return args;
     }
     }
 
 
-    public String getCursor() {
+    public Object getCursor() {
         return cursor;
         return cursor;
     }
     }
 
 

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -110,7 +110,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     @Override
     @Override
     public Result reader(DatabaseConnectorMapper connectorMapper, ReaderConfig config) {
     public Result reader(DatabaseConnectorMapper connectorMapper, ReaderConfig config) {
         // 1、获取select SQL
         // 1、获取select SQL
-        String queryKey = enableCursor() && StringUtil.isBlank(config.getCursor()) ? ConnectorConstant.OPERTION_QUERY_CURSOR : SqlBuilderEnum.QUERY.getName();
+        String queryKey = enableCursor() && null == config.getCursor() ? ConnectorConstant.OPERTION_QUERY_CURSOR : SqlBuilderEnum.QUERY.getName();
         String querySql = config.getCommand().get(queryKey);
         String querySql = config.getCommand().get(queryKey);
         Assert.hasText(querySql, "查询语句不能为空.");
         Assert.hasText(querySql, "查询语句不能为空.");
 
 

+ 6 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -21,7 +21,7 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
         String queryFilter = config.getSqlBuilderConfig().getQueryFilter();
         String queryFilter = config.getSqlBuilderConfig().getQueryFilter();
         if (StringUtil.isNotBlank(queryFilter)) {
         if (StringUtil.isNotBlank(queryFilter)) {
             querySql.append(" AND ");
             querySql.append(" AND ");
-        }else {
+        } else {
             querySql.append(" WHERE ");
             querySql.append(" WHERE ");
         }
         }
         querySql.append(quotation).append(pk).append(quotation).append(" > ? ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
         querySql.append(quotation).append(pk).append(quotation).append(" > ? ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
@@ -29,7 +29,7 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
     }
     }
 
 
     @Override
     @Override
-    public String getPageCursorSql(PageSql config){
+    public String getPageCursorSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
         final String quotation = buildSqlWithQuotation();
         final String pk = config.getPk();
         final String pk = config.getPk();
         // select * from test.`my_user` order by `id` limit ?
         // select * from test.`my_user` order by `id` limit ?
@@ -40,11 +40,11 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
     @Override
     @Override
     public Object[] getPageArgs(ReaderConfig config) {
     public Object[] getPageArgs(ReaderConfig config) {
         int pageSize = config.getPageSize();
         int pageSize = config.getPageSize();
-        String cursor = config.getCursor();
-        if (StringUtil.isBlank(cursor)) {
-            return new Object[] {pageSize};
+        Object cursor = config.getCursor();
+        if (null == cursor) {
+            return new Object[]{pageSize};
         }
         }
-        return new Object[] {cursor, pageSize};
+        return new Object[]{cursor, pageSize};
     }
     }
 
 
     @Override
     @Override

+ 34 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -4,7 +4,6 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
-import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.model.PageSql;
@@ -12,21 +11,42 @@ import org.dbsyncer.connector.model.Table;
 
 
 public final class PostgreSQLConnector extends AbstractDatabaseConnector {
 public final class PostgreSQLConnector extends AbstractDatabaseConnector {
 
 
+    @Override
+    protected String buildSqlWithQuotation() {
+        return "\"";
+    }
+
     @Override
     @Override
     public String getPageSql(PageSql config) {
     public String getPageSql(PageSql config) {
-        return config.getQuerySql() + DatabaseConstant.POSTGRESQL_PAGE_SQL;
+        final String quotation = buildSqlWithQuotation();
+        final String pk = config.getPk();
+        StringBuilder querySql = new StringBuilder(config.getQuerySql());
+        String queryFilter = config.getSqlBuilderConfig().getQueryFilter();
+        if (StringUtil.isNotBlank(queryFilter)) {
+            querySql.append(" AND ");
+        } else {
+            querySql.append(" WHERE ");
+        }
+        querySql.append(quotation).append(pk).append(quotation).append(" > ? ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
+        return querySql.toString();
     }
     }
 
 
     @Override
     @Override
-    public Object[] getPageArgs(ReaderConfig config) {
-        int pageSize = config.getPageSize();
-        int pageIndex = config.getPageIndex();
-        return new Object[]{pageSize, (pageIndex - 1) * pageSize};
+    public String getPageCursorSql(PageSql config) {
+        final String quotation = buildSqlWithQuotation();
+        final String pk = config.getPk();
+        StringBuilder querySql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
+        return querySql.toString();
     }
     }
 
 
     @Override
     @Override
-    protected String buildSqlWithQuotation() {
-        return "\"";
+    public Object[] getPageArgs(ReaderConfig config) {
+        int pageSize = config.getPageSize();
+        Object cursor = config.getCursor();
+        if (null == cursor) {
+            return new Object[]{pageSize};
+        }
+        return new Object[]{cursor, pageSize};
     }
     }
 
 
     @Override
     @Override
@@ -40,4 +60,10 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
         DatabaseConfig cfg = (DatabaseConfig) commandConfig.getConnectorConfig();
         DatabaseConfig cfg = (DatabaseConfig) commandConfig.getConnectorConfig();
         return String.format("SELECT N_LIVE_TUP FROM PG_STAT_USER_TABLES WHERE SCHEMANAME='%s' AND RELNAME='%s'", cfg.getSchema(), table.getName());
         return String.format("SELECT N_LIVE_TUP FROM PG_STAT_USER_TABLES WHERE SCHEMANAME='%s' AND RELNAME='%s'", cfg.getSchema(), table.getName());
     }
     }
+
+    @Override
+    protected boolean enableCursor() {
+        return true;
+    }
+
 }
 }

+ 5 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -98,7 +98,9 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         Meta meta = manager.getMeta(task.getId());
         Meta meta = manager.getMeta(task.getId());
         Map<String, String> snapshot = meta.getSnapshot();
         Map<String, String> snapshot = meta.getSnapshot();
         task.setPageIndex(NumberUtil.toInt(snapshot.get(ParserEnum.PAGE_INDEX.getCode()), ParserEnum.PAGE_INDEX.getDefaultValue()));
         task.setPageIndex(NumberUtil.toInt(snapshot.get(ParserEnum.PAGE_INDEX.getCode()), ParserEnum.PAGE_INDEX.getDefaultValue()));
-        task.setCursor(snapshot.get(ParserEnum.CURSOR.getCode()));
+        // 反序列化游标值类型(通常为数字或字符串类型)
+        String cursorValue = snapshot.get(ParserEnum.CURSOR.getCode());
+        task.setCursor(NumberUtil.isCreatable(cursorValue) ? NumberUtil.toLong(cursorValue) : cursorValue);
         task.setTableGroupIndex(NumberUtil.toInt(snapshot.get(ParserEnum.TABLE_GROUP_INDEX.getCode()), ParserEnum.TABLE_GROUP_INDEX.getDefaultValue()));
         task.setTableGroupIndex(NumberUtil.toInt(snapshot.get(ParserEnum.TABLE_GROUP_INDEX.getCode()), ParserEnum.TABLE_GROUP_INDEX.getDefaultValue()));
         flush(task);
         flush(task);
 
 
@@ -109,7 +111,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
             }
             }
             parser.execute(task, mapping, list.get(i), executorService);
             parser.execute(task, mapping, list.get(i), executorService);
             task.setPageIndex(ParserEnum.PAGE_INDEX.getDefaultValue());
             task.setPageIndex(ParserEnum.PAGE_INDEX.getDefaultValue());
-            task.setCursor("");
+            task.setCursor(null);
             task.setTableGroupIndex(++i);
             task.setTableGroupIndex(++i);
             flush(task);
             flush(task);
         }
         }
@@ -134,7 +136,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         meta.setEndTime(task.getEndTime());
         meta.setEndTime(task.getEndTime());
         Map<String, String> snapshot = meta.getSnapshot();
         Map<String, String> snapshot = meta.getSnapshot();
         snapshot.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(task.getPageIndex()));
         snapshot.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(task.getPageIndex()));
-        snapshot.put(ParserEnum.CURSOR.getCode(), task.getCursor());
+        snapshot.put(ParserEnum.CURSOR.getCode(), String.valueOf(task.getCursor()));
         snapshot.put(ParserEnum.TABLE_GROUP_INDEX.getCode(), String.valueOf(task.getTableGroupIndex()));
         snapshot.put(ParserEnum.TABLE_GROUP_INDEX.getCode(), String.valueOf(task.getTableGroupIndex()));
         manager.editMeta(meta);
         manager.editMeta(meta);
     }
     }

+ 3 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -276,7 +276,7 @@ public class ParserFactory implements Parser {
             pluginFactory.convert(group.getPlugin(), data, target);
             pluginFactory.convert(group.getPlugin(), data, target);
 
 
             // 5、写入目标源
             // 5、写入目标源
-            BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, sTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
+            BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, tTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
             Result writer = writeBatch(batchWriter, executorService);
             Result writer = writeBatch(batchWriter, executorService);
 
 
             // 6、更新结果
             // 6、更新结果
@@ -425,12 +425,8 @@ public class ParserFactory implements Parser {
      * @param pk
      * @param pk
      * @return
      * @return
      */
      */
-    private String getLastCursor(List<Map> data, String pk) {
-        if(CollectionUtils.isEmpty(data)){
-            return "";
-        }
-        Object value = data.get(data.size() - 1).get(pk);
-        return value == null ? "" : String.valueOf(value);
+    private Object getLastCursor(List<Map> data, String pk) {
+        return CollectionUtils.isEmpty(data) ? null : data.get(data.size() - 1).get(pk);
     }
     }
 
 
 }
 }

+ 3 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Task.java

@@ -10,7 +10,7 @@ public class Task {
 
 
     private int pageIndex;
     private int pageIndex;
 
 
-    private String cursor;
+    private Object cursor;
 
 
     private long beginTime;
     private long beginTime;
 
 
@@ -56,11 +56,11 @@ public class Task {
         this.pageIndex = pageIndex;
         this.pageIndex = pageIndex;
     }
     }
 
 
-    public String getCursor() {
+    public Object getCursor() {
         return cursor;
         return cursor;
     }
     }
 
 
-    public void setCursor(String cursor) {
+    public void setCursor(Object cursor) {
         this.cursor = cursor;
         this.cursor = cursor;
     }
     }