AE86 пре 2 година
родитељ
комит
d06b7ce387

+ 9 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Filter.java

@@ -31,6 +31,15 @@ public class Filter {
      */
     private String value;
 
+    public Filter() {
+    }
+
+    public Filter(String name, FilterEnum filterEnum, Object value) {
+        this.name = name;
+        this.filter = filterEnum.getName();
+        this.value = String.valueOf(value);
+    }
+
     public String getName() {
         return name;
     }

+ 14 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogService.java

@@ -7,18 +7,24 @@ import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.connector.enums.OperationEnum;
+import org.dbsyncer.connector.model.Filter;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageEnum;
+import org.dbsyncer.storage.query.BooleanQuery;
 import org.dbsyncer.storage.query.Query;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import javax.annotation.PostConstruct;
+import java.sql.Timestamp;
 import java.time.Instant;
+import java.time.LocalDateTime;
 import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.Lock;
@@ -151,7 +157,14 @@ public abstract class AbstractBinlogService<Message> implements BinlogRecorder {
             //  TODO 查询[待处理] 或 [处理中 & 处理超时]
             Query query = new Query();
             query.setType(StorageEnum.BINLOG);
-            query.addFilter(ConfigConstant.BINLOG_STATUS, String.valueOf(BinlogConstant.READY));
+            Filter ready = new Filter(ConfigConstant.BINLOG_STATUS, FilterEnum.EQUAL, BinlogConstant.READY);
+            Filter processing = new Filter(ConfigConstant.BINLOG_STATUS, FilterEnum.EQUAL, BinlogConstant.PROCESSING);
+            long maxProcessingSeconds = Timestamp.valueOf(LocalDateTime.now().minusSeconds(binlogRecorderConfig.getMaxProcessingSeconds())).getTime();
+            Filter processingTimeout = new Filter(ConfigConstant.CONFIG_MODEL_CREATE_TIME, FilterEnum.LT, maxProcessingSeconds);
+            BooleanQuery booleanQuery = new BooleanQuery()
+                    .add(new BooleanQuery().add(ready), OperationEnum.OR)
+                    .add(new BooleanQuery().add(processing).add(processingTimeout));
+            query.setBooleanQuery(booleanQuery);
             query.setPageNum(1);
             query.setPageSize(binlogRecorderConfig.getBatchCount());
             Paging paging = storageService.query(query);

+ 51 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/BooleanQuery.java

@@ -0,0 +1,51 @@
+package org.dbsyncer.storage.query;
+
+
+import org.dbsyncer.connector.enums.OperationEnum;
+import org.dbsyncer.connector.model.Filter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BooleanQuery {
+
+    private final List<BooleanQuery> clauses = new ArrayList<>();
+
+    private final List<Filter> filters = new ArrayList<>();
+
+    private OperationEnum operationEnum;
+
+    public BooleanQuery add(BooleanQuery booleanQuery) {
+        return add(booleanQuery, OperationEnum.AND);
+    }
+
+    public BooleanQuery add(BooleanQuery booleanQuery, OperationEnum operationEnum) {
+        clauses.add(booleanQuery);
+        booleanQuery.setOperationEnum(operationEnum);
+        return this;
+    }
+
+    public BooleanQuery add(Filter filter) {
+        filter.setOperation(OperationEnum.AND.getName());
+        filters.add(filter);
+        return this;
+    }
+
+    public BooleanQuery or(Filter filter) {
+        filter.setOperation(OperationEnum.OR.getName());
+        filters.add(filter);
+        return this;
+    }
+
+    public List<Filter> getFilters() {
+        return filters;
+    }
+
+    public OperationEnum getOperationEnum() {
+        return operationEnum;
+    }
+
+    public void setOperationEnum(OperationEnum operationEnum) {
+        this.operationEnum = operationEnum;
+    }
+}

+ 10 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Query.java

@@ -21,6 +21,8 @@ public class Query {
 
     private List<Param> params;
 
+    private BooleanQuery booleanQuery;
+
     private int pageNum = 1;
 
     private int pageSize = 20;
@@ -81,6 +83,14 @@ public class Query {
         this.params = params;
     }
 
+    public BooleanQuery getBooleanQuery() {
+        return booleanQuery;
+    }
+
+    public void setBooleanQuery(BooleanQuery booleanQuery) {
+        this.booleanQuery = booleanQuery;
+    }
+
     public int getPageNum() {
         return pageNum;
     }