Browse Source

修复过滤时间顺序

AE86 3 năm trước cách đây
mục cha
commit
0f2176890d

+ 11 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/QuartzFilterEnum.java

@@ -15,25 +15,27 @@ public enum QuartzFilterEnum {
     /**
      * 时间戳(开始)
      */
-    TIME_STAMP_BEGIN("$timestamp_begin$", "系统时间戳(开始)", new TimestampFilter(true)),
+    TIME_STAMP_BEGIN(1, "$timestamp_begin$", "系统时间戳(开始)", new TimestampFilter(true)),
     /**
      * 时间戳(结束)
      */
-    TIME_STAMP_END("$timestamp_end$", "系统时间戳(结束)", new TimestampFilter(false)),
+    TIME_STAMP_END(2, "$timestamp_end$", "系统时间戳(结束)", new TimestampFilter(false)),
     /**
      * 日期(开始)
      */
-    DATE_BEGIN("$date_begin$", "系统日期(开始)", new DateFilter(true)),
+    DATE_BEGIN(3, "$date_begin$", "系统日期(开始)", new DateFilter(true)),
     /**
      * 日期(结束)
      */
-    DATE_END("$date_end$", "系统日期(结束)", new DateFilter(false));
+    DATE_END(4, "$date_end$", "系统日期(结束)", new DateFilter(false));
 
+    private Integer index;
     private String type;
     private String message;
     private QuartzFilter quartzFilter;
 
-    QuartzFilterEnum(String type, String message, QuartzFilter quartzFilter) {
+    QuartzFilterEnum(Integer index, String type, String message, QuartzFilter quartzFilter) {
+        this.index = index;
         this.type = type;
         this.message = message;
         this.quartzFilter = quartzFilter;
@@ -52,6 +54,10 @@ public enum QuartzFilterEnum {
         return null;
     }
 
+    public Integer getIndex() {
+        return index;
+    }
+
     public String getType() {
         return type;
     }

+ 4 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -135,6 +135,10 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
             // 更新记录点
             point.refresh();
 
+            if (data.size() < readNum) {
+                break;
+            }
+
         }
 
         // 持久化

+ 30 - 32
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/DatabaseQuartzExtractor.java

@@ -6,7 +6,13 @@ import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.enums.QuartzFilterEnum;
 import org.springframework.util.Assert;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -23,8 +29,25 @@ public final class DatabaseQuartzExtractor extends AbstractQuartzExtractor {
         // 检查是否存在系统参数
         final String query = command.get(ConnectorConstant.OPERTION_QUERY);
 
-        // 对“过滤条件”的对数进行排序处理,并得到参数集合
-        List<QuartzFilterEnum> filterEnums = getListQuartzFilterEnum(query);
+        /**
+         * 排序开始/结束时间,防止系统生成的开始时间大于结束时间,导致无法查询有效范围结果集
+         * <p>fixed:select * from user where end_time > $timestamp_end$ and begin_time <= $timestamp_begin$
+         * <p>normal:select * from user where begin_time > $timestamp_begin$ and end_time <= $timestamp_end$
+         */
+        AtomicBoolean reversed = new AtomicBoolean();
+        AtomicLong lastIndex = new AtomicLong();
+        List<QuartzFilterEnum> filterEnums = Stream.of(QuartzFilterEnum.values())
+                .sorted(Comparator.comparing(QuartzFilterEnum::getIndex))
+                .filter(f -> {
+                    int currentIndex = StringUtil.indexOf(query, f.getType());
+                    Assert.isTrue((currentIndex == StringUtil.lastIndexOf(query, f.getType())), String.format("系统参数%s存在多个.", f.getType()));
+                    boolean exist = StringUtil.contains(query, f.getType());
+                    if (exist && !reversed.get()) {
+                        reversed.set(lastIndex.get() > currentIndex);
+                        lastIndex.set(currentIndex);
+                    }
+                    return exist;
+                }).collect(Collectors.toList());
 
         if (CollectionUtils.isEmpty(filterEnums)) {
             return new Point(command, new ArrayList<>());
@@ -44,7 +67,7 @@ public final class DatabaseQuartzExtractor extends AbstractQuartzExtractor {
             final String key = index + type;
 
             // 开始位置
-            if(f.begin()){
+            if (f.begin()) {
                 if (!snapshot.containsKey(key)) {
                     final Object val = f.getObject();
                     point.addArg(val);
@@ -65,36 +88,11 @@ public final class DatabaseQuartzExtractor extends AbstractQuartzExtractor {
             point.setBeginValue(f.toString(val));
         }
         point.setCommand(ConnectorConstant.OPERTION_QUERY, replaceQuery);
+        if (reversed.get()) {
+            point.reverseArgs();
+        }
 
         return point;
     }
 
-    private boolean appearNotMoreThanOnce(String str, String searchStr) {
-        return StringUtil.indexOf(str, searchStr) == StringUtil.lastIndexOf(str, searchStr);
-    }
-
-    /**
-     * 对“过滤条件”的对数进行排序处理,并得到参数集合
-     * @param query
-     * @return
-     */
-    private List<QuartzFilterEnum>  getListQuartzFilterEnum(String query){
-        Map<Integer,QuartzFilterEnum> map = new TreeMap<Integer,QuartzFilterEnum>(
-                new Comparator<Integer>() {
-                    public int compare(Integer obj1, Integer obj2) {
-                        // 升序排序
-                        return obj1.compareTo(obj2);
-                    }
-                });
-        Stream.of(QuartzFilterEnum.values()).forEach(f -> {
-            Assert.isTrue(appearNotMoreThanOnce(query, f.getType()), String.format("系统参数%s存在多个.", f.getType()));
-            // 记录存在的变量参数位置
-            if (StringUtil.contains(query, f.getType())) {
-                int typeIndex = StringUtil.indexOf(query,f.getType());
-                map.put(typeIndex,f);
-            }
-        });
-        // 需要对当前参数进行排序
-        return map.size() > 0 ? new ArrayList(map.values()) : null;
-    }
 }

+ 4 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/Point.java

@@ -2,10 +2,7 @@ package org.dbsyncer.listener.quartz;
 
 import org.dbsyncer.common.util.StringUtil;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class Point {
 
@@ -66,4 +63,7 @@ public class Point {
         this.beginValue = beginValue;
     }
 
+    public void reverseArgs() {
+        Collections.reverse(args);
+    }
 }