Răsfoiți Sursa

修复查询binlog数据未按升序执行

Signed-off-by: AE86 <836391306@qq.com>
AE86 2 ani în urmă
părinte
comite
912c1cde66

+ 2 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogService.java

@@ -13,6 +13,7 @@ import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.BinlogSortEnum;
 import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.BooleanFilter;
@@ -193,6 +194,7 @@ public abstract class AbstractBinlogService<Message> implements BinlogRecorder {
             query.setIndexFieldResolverMap(fieldResolvers);
             query.setPageNum(1);
             query.setPageSize(binlogRecorderConfig.getBatchCount());
+            query.setSort(BinlogSortEnum.ASC);
             Paging paging = storageService.query(query);
             if (CollectionUtils.isEmpty(paging.getData())) {
                 return;

+ 39 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/BinlogSortEnum.java

@@ -0,0 +1,39 @@
+package org.dbsyncer.storage.enums;
+
+/**
+ * 支持的排序方式
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2023/6/1 00:07
+ */
+public enum BinlogSortEnum {
+    /**
+     * 升序
+     */
+    ASC("asc"),
+
+    /**
+     * 降序
+     */
+    DESC("desc");
+
+    BinlogSortEnum(String code) {
+        this.code = code;
+    }
+
+    String code;
+
+    public String getCode() {
+        return code;
+    }
+
+    /**
+     * 是否降序
+     *
+     * @return
+     */
+    public boolean isDesc() {
+        return this == DESC;
+    }
+}

+ 14 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Query.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.storage.query;
 
 import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.storage.enums.BinlogSortEnum;
 import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.filter.IntFilter;
@@ -34,6 +35,11 @@ public class Query {
 
     private int pageSize = 20;
 
+    /**
+     * 修改时间和创建默认降序返回
+     */
+    private BinlogSortEnum sort = BinlogSortEnum.DESC;
+
     /**
      * 返回值转换器,限Disk使用
      */
@@ -107,6 +113,14 @@ public class Query {
         this.pageSize = pageSize;
     }
 
+    public BinlogSortEnum getSort() {
+        return sort;
+    }
+
+    public void setSort(BinlogSortEnum sort) {
+        this.sort = sort;
+    }
+
     public Map<String, IndexFieldResolverEnum> getIndexFieldResolverMap() {
         return indexFieldResolverMap;
     }

+ 3 - 2
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -58,9 +58,10 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
             Shard shard = getShard(sharding);
             int pageNum = query.getPageNum() <= 0 ? 1 : query.getPageNum();
             int pageSize = query.getPageSize() <= 0 ? 20 : query.getPageSize();
+            boolean desc = query.getSort().isDesc();
             // 根据修改时间 > 创建时间排序
-            Sort sort = new Sort(new SortField(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, SortField.Type.LONG, true),
-                    new SortField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, SortField.Type.LONG, true));
+            Sort sort = new Sort(new SortField(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, SortField.Type.LONG, desc),
+                    new SortField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, SortField.Type.LONG, desc));
             Option option = new Option();
             option.setQueryTotal(query.isQueryTotal());
             option.setIndexFieldResolverMap(query.getIndexFieldResolverMap());

+ 5 - 4
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -187,10 +187,10 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
 
     private Executor getExecutor(StorageEnum type, String sharding) {
         return tables.computeIfAbsent(sharding, (table) -> {
-            Executor dataTemplate = tables.get(type.getType());
-            Assert.notNull(dataTemplate, "未知的存储类型");
+            Executor executor = tables.get(type.getType());
+            Assert.notNull(executor, "未知的存储类型");
 
-            Executor newExecutor = new Executor(dataTemplate.getType(), dataTemplate.getFields(), dataTemplate.isSystemTable(), dataTemplate.isOrderByUpdateTime());
+            Executor newExecutor = new Executor(executor.getType(), executor.getFields(), executor.isSystemTable(), executor.isOrderByUpdateTime());
             return createTableIfNotExist(table, newExecutor);
         });
     }
@@ -227,7 +227,8 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         if (executor.isOrderByUpdateTime()) {
             sql.append(UnderlineToCamelUtils.camelToUnderline(ConfigConstant.CONFIG_MODEL_UPDATE_TIME)).append(",");
         }
-        sql.append(UnderlineToCamelUtils.camelToUnderline(ConfigConstant.CONFIG_MODEL_CREATE_TIME)).append(" desc");
+        sql.append(UnderlineToCamelUtils.camelToUnderline(ConfigConstant.CONFIG_MODEL_CREATE_TIME));
+        sql.append(" ").append(query.getSort().getCode());
         sql.append(DatabaseConstant.MYSQL_PAGE_SQL);
         args.add((query.getPageNum() - 1) * query.getPageSize());
         args.add(query.getPageSize());