|
@@ -7,8 +7,11 @@ import org.dbsyncer.listener.enums.QuartzFilterEnum;
|
|
|
import org.springframework.util.Assert;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Comparator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
@@ -25,10 +28,27 @@ public final class DatabaseQuartzExtractor extends AbstractQuartzExtractor {
|
|
|
protected Point checkLastPoint(Map<String, String> command, int index) {
|
|
|
// 检查是否存在系统参数
|
|
|
final String query = command.get(ConnectorConstant.OPERTION_QUERY);
|
|
|
- List<QuartzFilterEnum> filterEnums = Stream.of(QuartzFilterEnum.values()).filter(f -> {
|
|
|
- Assert.isTrue(appearNotMoreThanOnce(query, f.getType()), String.format("系统参数%s存在多个.", f.getType()));
|
|
|
- return StringUtil.contains(query, f.getType());
|
|
|
- }).collect(Collectors.toList());
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 排序开始/结束时间,防止系统生成的开始时间大于结束时间,导致无法查询有效范围结果集
|
|
|
+ * <p>fixed:select * from user where end_time > $timestamp_end$ and begin_time <= $timestamp_begin$
|
|
|
+ * <p>normal:select * from user where begin_time > $timestamp_begin$ and end_time <= $timestamp_end$
|
|
|
+ */
|
|
|
+ AtomicBoolean reversed = new AtomicBoolean();
|
|
|
+ AtomicLong lastIndex = new AtomicLong();
|
|
|
+ List<QuartzFilterEnum> filterEnums = Stream.of(QuartzFilterEnum.values())
|
|
|
+ .sorted(Comparator.comparing(QuartzFilterEnum::getIndex))
|
|
|
+ .filter(f -> {
|
|
|
+ int currentIndex = StringUtil.indexOf(query, f.getType());
|
|
|
+ Assert.isTrue((currentIndex == StringUtil.lastIndexOf(query, f.getType())), String.format("系统参数%s存在多个.", f.getType()));
|
|
|
+ boolean exist = StringUtil.contains(query, f.getType());
|
|
|
+ if (exist && !reversed.get()) {
|
|
|
+ reversed.set(lastIndex.get() > currentIndex);
|
|
|
+ lastIndex.set(currentIndex);
|
|
|
+ }
|
|
|
+ return exist;
|
|
|
+ }).collect(Collectors.toList());
|
|
|
+
|
|
|
if (CollectionUtils.isEmpty(filterEnums)) {
|
|
|
return new Point(command, new ArrayList<>());
|
|
|
}
|
|
@@ -47,7 +67,7 @@ public final class DatabaseQuartzExtractor extends AbstractQuartzExtractor {
|
|
|
final String key = index + type;
|
|
|
|
|
|
// 开始位置
|
|
|
- if(f.begin()){
|
|
|
+ if (f.begin()) {
|
|
|
if (!snapshot.containsKey(key)) {
|
|
|
final Object val = f.getObject();
|
|
|
point.addArg(val);
|
|
@@ -68,11 +88,11 @@ public final class DatabaseQuartzExtractor extends AbstractQuartzExtractor {
|
|
|
point.setBeginValue(f.toString(val));
|
|
|
}
|
|
|
point.setCommand(ConnectorConstant.OPERTION_QUERY, replaceQuery);
|
|
|
+ if (reversed.get()) {
|
|
|
+ point.reverseArgs();
|
|
|
+ }
|
|
|
|
|
|
return point;
|
|
|
}
|
|
|
|
|
|
- private boolean appearNotMoreThanOnce(String str, String searchStr) {
|
|
|
- return StringUtil.indexOf(str, searchStr) == StringUtil.lastIndexOf(str, searchStr);
|
|
|
- }
|
|
|
}
|