Forráskód Böngészése

支持切换mysql存储缓存任务

AE86 2 éve
szülő
commit
5583ff9217
22 módosított fájl, 524 hozzáadás és 560 törlés
  1. 16 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/FilterEnum.java
  2. 0 9
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Filter.java
  3. 4 3
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java
  4. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/AbstractWriterBinlog.java
  5. 0 241
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java
  6. 43 29
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogService.java
  7. 18 23
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/BinlogConstant.java
  8. 1 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java
  9. 98 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Option.java
  10. 2 3
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java
  11. 33 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/AbstractFilter.java
  12. 13 14
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/BooleanFilter.java
  13. 0 93
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Option.java
  14. 0 36
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Param.java
  15. 30 34
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Query.java
  16. 28 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/filter/IntFilter.java
  17. 24 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/filter/LongFilter.java
  18. 25 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/filter/StringFilter.java
  19. 78 21
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java
  20. 96 29
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java
  21. 5 23
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/DocumentUtil.java
  22. 8 0
      dbsyncer-storage/src/main/resources/dbsyncer_binlog.sql

+ 16 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/FilterEnum.java

@@ -71,6 +71,22 @@ public enum FilterEnum {
         this.compareFilter = compareFilter;
         this.compareFilter = compareFilter;
     }
     }
 
 
+    /**
+     * 获取表达式
+     *
+     * @param name
+     * @return
+     * @throws ConnectorException
+     */
+    public static FilterEnum getFilterEnum(String name) throws ConnectorException {
+        for (FilterEnum e : FilterEnum.values()) {
+            if (StringUtil.equals(name, e.getName())) {
+                return e;
+            }
+        }
+        throw new ConnectorException(String.format("FilterEnum name \"%s\" does not exist.", name));
+    }
+
     /**
     /**
      * 获取比较器
      * 获取比较器
      *
      *

+ 0 - 9
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Filter.java

@@ -31,15 +31,6 @@ public class Filter {
      */
      */
     private String value;
     private String value;
 
 
-    public Filter() {
-    }
-
-    public Filter(String name, FilterEnum filterEnum, Object value) {
-        this.name = name;
-        this.filter = filterEnum.getName();
-        this.value = String.valueOf(value);
-    }
-
     public String getName() {
     public String getName() {
         return name;
         return name;
     }
     }

+ 4 - 3
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -4,6 +4,7 @@ import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.Manager;
@@ -98,7 +99,7 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
             query.addFilter(ConfigConstant.DATA_ERROR, error, true);
             query.addFilter(ConfigConstant.DATA_ERROR, error, true);
         }
         }
         // 查询是否成功, 默认查询失败
         // 查询是否成功, 默认查询失败
-        query.addFilter(ConfigConstant.DATA_SUCCESS, StringUtil.isNotBlank(success) ? success : StorageDataStatusEnum.FAIL.getCode(), false, true);
+        query.addFilter(ConfigConstant.DATA_SUCCESS, StringUtil.isNotBlank(success) ? NumberUtil.toInt(success) : StorageDataStatusEnum.FAIL.getValue());
         query.setMetaId(metaId);
         query.setMetaId(metaId);
         return manager.queryData(query);
         return manager.queryData(query);
     }
     }
@@ -196,7 +197,7 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
      * @return
      * @return
      */
      */
     private long getMappingSuccess(List<Meta> metaAll) {
     private long getMappingSuccess(List<Meta> metaAll) {
-        return queryMappingMetricCount(metaAll, (query) -> query.addFilter(ConfigConstant.DATA_SUCCESS, StorageDataStatusEnum.SUCCESS.getCode(), false, true));
+        return queryMappingMetricCount(metaAll, (query) -> query.addFilter(ConfigConstant.DATA_SUCCESS, StorageDataStatusEnum.SUCCESS.getValue()));
     }
     }
 
 
     /**
     /**
@@ -206,7 +207,7 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
      * @return
      * @return
      */
      */
     private long getMappingFail(List<Meta> metaAll) {
     private long getMappingFail(List<Meta> metaAll) {
-        return queryMappingMetricCount(metaAll, (query) -> query.addFilter(ConfigConstant.DATA_SUCCESS, StorageDataStatusEnum.FAIL.getCode(), false, true));
+        return queryMappingMetricCount(metaAll, (query) -> query.addFilter(ConfigConstant.DATA_SUCCESS, StorageDataStatusEnum.FAIL.getValue()));
     }
     }
 
 
     /**
     /**

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/AbstractWriterBinlog.java

@@ -9,7 +9,7 @@ import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.WriterRequest;
 import org.dbsyncer.parser.model.WriterRequest;
-import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
+import org.dbsyncer.storage.binlog.AbstractBinlogService;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.binlog.proto.EventEnum;
 import org.dbsyncer.storage.binlog.proto.EventEnum;
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Queue;
 
 
-public abstract class AbstractWriterBinlog extends AbstractBinlogRecorder<WriterRequest> {
+public abstract class AbstractWriterBinlog extends AbstractBinlogService<WriterRequest> {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 

+ 0 - 241
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -1,241 +0,0 @@
-package org.dbsyncer.storage.binlog;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.IntPoint;
-import org.apache.lucene.document.LongPoint;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.BooleanClause;
-import org.apache.lucene.search.BooleanQuery;
-import org.apache.lucene.search.Sort;
-import org.apache.lucene.search.SortField;
-import org.apache.lucene.util.BytesRef;
-import org.dbsyncer.common.config.BinlogRecorderConfig;
-import org.dbsyncer.common.model.Paging;
-import org.dbsyncer.common.scheduled.ScheduledTaskJob;
-import org.dbsyncer.common.scheduled.ScheduledTaskService;
-import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.storage.binlog.proto.BinlogMessage;
-import org.dbsyncer.storage.constant.BinlogConstant;
-import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
-import org.dbsyncer.storage.lucene.Shard;
-import org.dbsyncer.storage.query.Option;
-import org.dbsyncer.storage.util.DocumentUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import javax.annotation.PostConstruct;
-import java.io.File;
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/6/8 0:53
- */
-public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder, DisposableBean {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    private static final String PATH = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar).append("data").append(File.separatorChar).append("data").append(File.separatorChar).toString();
-
-    @Autowired
-    private ScheduledTaskService scheduledTaskService;
-
-    @Autowired
-    private SnowflakeIdWorker snowflakeIdWorker;
-
-    @Autowired
-    private BinlogRecorderConfig binlogRecorderConfig;
-
-    private static Queue<BinlogMessage> queue;
-
-    private static Shard shard;
-
-    private WriterTask writerTask = new WriterTask();
-
-    private ReaderTask readerTask = new ReaderTask();
-
-    @PostConstruct
-    private void init() throws IOException {
-        queue = new LinkedBlockingQueue(binlogRecorderConfig.getQueueCapacity());
-        shard = new Shard(PATH + getTaskName());
-        scheduledTaskService.start(binlogRecorderConfig.getWriterPeriodMillisecond(), writerTask);
-        scheduledTaskService.start(binlogRecorderConfig.getReaderPeriodMillisecond(), readerTask);
-    }
-
-    /**
-     * 反序列化消息
-     *
-     * @param message
-     * @return
-     */
-    protected abstract Message deserialize(String messageId, BinlogMessage message);
-
-    @Override
-    public void flush(BinlogMessage message) {
-        queue.offer(message);
-    }
-
-    @Override
-    public void destroy() throws IOException {
-        shard.close();
-    }
-
-    @Override
-    public void complete(List<String> messageIds) {
-        if (!CollectionUtils.isEmpty(messageIds)) {
-            try {
-                int size = messageIds.size();
-                Term[] terms = new Term[size];
-                for (int i = 0; i < size; i++) {
-                    terms[i] = new Term(BinlogConstant.BINLOG_ID, messageIds.get(i));
-                }
-                shard.deleteBatch(terms);
-            } catch (IOException e) {
-                logger.error(e.getMessage());
-            }
-        }
-    }
-
-    /**
-     * 合并缓存队列任务到磁盘
-     */
-    final class WriterTask implements ScheduledTaskJob {
-
-        @Override
-        public void run() {
-            if (queue.isEmpty()) {
-                return;
-            }
-
-            List<Document> tasks = new ArrayList<>();
-            int count = 0;
-            long now = Instant.now().toEpochMilli();
-            while (!queue.isEmpty() && count < binlogRecorderConfig.getBatchCount()) {
-                BinlogMessage message = queue.poll();
-                if (null != message) {
-                    tasks.add(DocumentUtil.convertBinlog2Doc(String.valueOf(snowflakeIdWorker.nextId()), BinlogConstant.READY, new BytesRef(message.toByteArray()), now));
-                }
-                count++;
-            }
-
-            if (!CollectionUtils.isEmpty(tasks)) {
-                try {
-                    shard.insertBatch(tasks);
-                } catch (IOException e) {
-                    logger.error(e.getMessage());
-                }
-            }
-        }
-    }
-
-    /**
-     * 从磁盘读取日志到任务队列
-     */
-    final class ReaderTask implements ScheduledTaskJob {
-
-        private final Lock lock = new ReentrantLock(true);
-
-        private volatile boolean running;
-
-        @Override
-        public void run() {
-            // 读取任务数 >= 1/2缓存同步队列容量则继续等待
-            if (running || binlogRecorderConfig.getBatchCount() + getQueue().size() >= getQueueCapacity() / 2) {
-                return;
-            }
-
-            final Lock binlogLock = lock;
-            boolean locked = false;
-            try {
-                locked = binlogLock.tryLock();
-                if (locked) {
-                    running = true;
-                    doParse();
-                }
-            } catch (Exception e) {
-                logger.error(e.getMessage());
-            } finally {
-                if (locked) {
-                    running = false;
-                    binlogLock.unlock();
-                }
-            }
-        }
-
-        private void doParse() throws IOException {
-            //  查询[待处理] 或 [处理中 & 处理超时]
-            long maxProcessingSeconds = Timestamp.valueOf(LocalDateTime.now().minusSeconds(binlogRecorderConfig.getMaxProcessingSeconds())).getTime();
-            BooleanQuery query = new BooleanQuery.Builder()
-                    .add(new BooleanQuery.Builder()
-                            .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST)
-                            .build(), BooleanClause.Occur.SHOULD)
-                    .add(new BooleanQuery.Builder()
-                            .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.PROCESSING), BooleanClause.Occur.MUST)
-                            .add(LongPoint.newRangeQuery(BinlogConstant.BINLOG_TIME, Long.MIN_VALUE, maxProcessingSeconds), BooleanClause.Occur.MUST)
-                            .build(), BooleanClause.Occur.SHOULD)
-                    .build();
-            Option option = new Option(query);
-            option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_STATUS, IndexFieldResolverEnum.INT);
-            option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_CONTENT, IndexFieldResolverEnum.BINARY);
-            option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_TIME, IndexFieldResolverEnum.LONG);
-
-            // 优先处理最早记录
-            Sort sort = new Sort(new SortField(BinlogConstant.BINLOG_TIME, SortField.Type.LONG));
-            Paging paging = shard.query(option, 1, binlogRecorderConfig.getBatchCount(), sort);
-            if (CollectionUtils.isEmpty(paging.getData())) {
-                return;
-            }
-
-            List<Map> list = (List<Map>) paging.getData();
-            final int size = list.size();
-            final List<Message> messages = new ArrayList<>(size);
-            final List<Document> updateDocs = new ArrayList<>(size);
-            final Term[] deleteIds = new Term[size];
-            boolean existProcessing = false;
-            for (int i = 0; i < size; i++) {
-                Map row = list.get(i);
-                String id = (String) row.get(BinlogConstant.BINLOG_ID);
-                Integer status = (Integer) row.get(BinlogConstant.BINLOG_STATUS);
-                BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
-                if (BinlogConstant.PROCESSING == status) {
-                    existProcessing = true;
-                }
-                deleteIds[i] = new Term(BinlogConstant.BINLOG_ID, id);
-                String newId = String.valueOf(snowflakeIdWorker.nextId());
-                try {
-                    Message message = deserialize(newId, BinlogMessage.parseFrom(ref.bytes));
-                    if (null != message) {
-                        messages.add(message);
-                        updateDocs.add(DocumentUtil.convertBinlog2Doc(newId, BinlogConstant.PROCESSING, ref, Instant.now().toEpochMilli()));
-                    }
-                } catch (InvalidProtocolBufferException e) {
-                    logger.error(e.getMessage());
-                }
-            }
-            if (existProcessing) {
-                logger.warn("存在超时未处理数据,正在重试,建议优化配置参数[max-processing-seconds={}].", binlogRecorderConfig.getMaxProcessingSeconds());
-            }
-
-            // 如果在更新消息状态的过程中服务被中止,为保证数据的安全性,重启后消息可能会同步2次)
-            shard.insertBatch(updateDocs);
-            shard.deleteBatch(deleteIds);
-            getQueue().addAll(messages);
-        }
-    }
-
-}

+ 43 - 29
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogService.java

@@ -9,14 +9,16 @@ import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
-import org.dbsyncer.connector.model.Filter;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
-import org.dbsyncer.storage.query.BooleanQuery;
+import org.dbsyncer.storage.query.BooleanFilter;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.query.Query;
+import org.dbsyncer.storage.query.filter.IntFilter;
+import org.dbsyncer.storage.query.filter.LongFilter;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -95,27 +97,31 @@ public abstract class AbstractBinlogService<Message> implements BinlogRecorder {
                 return;
                 return;
             }
             }
 
 
-            List<Map> tasks = new ArrayList<>();
-            int count = 0;
-            long now = Instant.now().toEpochMilli();
-            Map task = null;
-            while (!queue.isEmpty() && count < binlogRecorderConfig.getBatchCount()) {
-                BinlogMessage message = queue.poll();
-                if (null != message) {
-                    task = new HashMap();
-                    task.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
-                    task.put(ConfigConstant.BINLOG_STATUS, BinlogConstant.READY);
-                    task.put(ConfigConstant.CONFIG_MODEL_JSON, message.toByteArray());
-                    task.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
-                    tasks.add(task);
+            try {
+                List<Map> tasks = new ArrayList<>();
+                int count = 0;
+                long now = Instant.now().toEpochMilli();
+                Map task = null;
+                while (!queue.isEmpty() && count < binlogRecorderConfig.getBatchCount()) {
+                    BinlogMessage message = queue.poll();
+                    if (null != message) {
+                        task = new HashMap();
+                        task.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
+                        task.put(ConfigConstant.BINLOG_STATUS, BinlogConstant.READY);
+                        task.put(ConfigConstant.BINLOG_DATA, message.toByteArray());
+                        task.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
+                        tasks.add(task);
+                    }
+                    count++;
                 }
                 }
-                count++;
-            }
 
 
-            if (!CollectionUtils.isEmpty(tasks)) {
-                storageService.addBatch(StorageEnum.BINLOG, tasks);
+                if (!CollectionUtils.isEmpty(tasks)) {
+                    storageService.addBatch(StorageEnum.BINLOG, tasks);
+                }
+                tasks = null;
+            } catch (Exception e) {
+                logger.error(e.getMessage());
             }
             }
-            tasks = null;
         }
         }
     }
     }
 
 
@@ -154,17 +160,25 @@ public abstract class AbstractBinlogService<Message> implements BinlogRecorder {
         }
         }
 
 
         private void doParse() {
         private void doParse() {
-            //  TODO 查询[待处理] 或 [处理中 & 处理超时]
+            // 查询[待处理] 或 [处理中 & 处理超时]
             Query query = new Query();
             Query query = new Query();
             query.setType(StorageEnum.BINLOG);
             query.setType(StorageEnum.BINLOG);
-            Filter ready = new Filter(ConfigConstant.BINLOG_STATUS, FilterEnum.EQUAL, BinlogConstant.READY);
-            Filter processing = new Filter(ConfigConstant.BINLOG_STATUS, FilterEnum.EQUAL, BinlogConstant.PROCESSING);
+
+            IntFilter ready = new IntFilter(ConfigConstant.BINLOG_STATUS, FilterEnum.EQUAL, BinlogConstant.READY);
+            IntFilter processing = new IntFilter(ConfigConstant.BINLOG_STATUS, FilterEnum.EQUAL, BinlogConstant.PROCESSING);
             long maxProcessingSeconds = Timestamp.valueOf(LocalDateTime.now().minusSeconds(binlogRecorderConfig.getMaxProcessingSeconds())).getTime();
             long maxProcessingSeconds = Timestamp.valueOf(LocalDateTime.now().minusSeconds(binlogRecorderConfig.getMaxProcessingSeconds())).getTime();
-            Filter processingTimeout = new Filter(ConfigConstant.CONFIG_MODEL_CREATE_TIME, FilterEnum.LT, maxProcessingSeconds);
-            BooleanQuery booleanQuery = new BooleanQuery()
-                    .add(new BooleanQuery().add(ready), OperationEnum.OR)
-                    .add(new BooleanQuery().add(processing).add(processingTimeout));
-            query.setBooleanQuery(booleanQuery);
+            LongFilter processingTimeout = new LongFilter(ConfigConstant.CONFIG_MODEL_CREATE_TIME, FilterEnum.LT, maxProcessingSeconds);
+            BooleanFilter booleanFilter = new BooleanFilter()
+                    .add(new BooleanFilter().add(ready), OperationEnum.OR)
+                    .add(new BooleanFilter().add(processing).add(processingTimeout), OperationEnum.OR);
+            query.setBooleanFilter(booleanFilter);
+
+            // 指定返回值类型
+            Map<String, IndexFieldResolverEnum> fieldResolvers = new LinkedHashMap<>();
+            fieldResolvers.put(ConfigConstant.BINLOG_STATUS, IndexFieldResolverEnum.INT);
+            fieldResolvers.put(ConfigConstant.CONFIG_MODEL_JSON, IndexFieldResolverEnum.BINARY);
+            fieldResolvers.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, IndexFieldResolverEnum.LONG);
+            query.setIndexFieldResolverMap(fieldResolvers);
             query.setPageNum(1);
             query.setPageNum(1);
             query.setPageSize(binlogRecorderConfig.getBatchCount());
             query.setPageSize(binlogRecorderConfig.getBatchCount());
             Paging paging = storageService.query(query);
             Paging paging = storageService.query(query);
@@ -181,7 +195,7 @@ public abstract class AbstractBinlogService<Message> implements BinlogRecorder {
                 Map row = list.get(i);
                 Map row = list.get(i);
                 String id = (String) row.get(ConfigConstant.CONFIG_MODEL_ID);
                 String id = (String) row.get(ConfigConstant.CONFIG_MODEL_ID);
                 Integer status = (Integer) row.get(ConfigConstant.BINLOG_STATUS);
                 Integer status = (Integer) row.get(ConfigConstant.BINLOG_STATUS);
-                byte[] bytes = (byte[]) row.get(ConfigConstant.CONFIG_MODEL_JSON);
+                byte[] bytes = (byte[]) row.get(ConfigConstant.BINLOG_DATA);
                 if (BinlogConstant.PROCESSING == status) {
                 if (BinlogConstant.PROCESSING == status) {
                     existProcessing = true;
                     existProcessing = true;
                 }
                 }

+ 18 - 23
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/BinlogConstant.java

@@ -1,24 +1,19 @@
-package org.dbsyncer.storage.constant;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/7/13 22:14
- */
-public class BinlogConstant {
-
-    /**
-     * 属性
-     */
-    public static final String BINLOG_ID = "id";
-    public static final String BINLOG_STATUS = "s";
-    public static final String BINLOG_CONTENT = "c";
-    public static final String BINLOG_TIME = "t";
-
-    /**
-     * 状态类型
-     */
-    public static final int READY = 0;
-    public static final int PROCESSING = 1;
-
+package org.dbsyncer.storage.constant;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/7/13 22:14
+ */
+public class BinlogConstant {
+
+    /**
+     * 待同步
+     */
+    public static final int READY = 0;
+    /**
+     * 同步中
+     */
+    public static final int PROCESSING = 1;
+
 }
 }

+ 1 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java

@@ -41,5 +41,6 @@ public class ConfigConstant {
      * Binlog
      * Binlog
      */
      */
     public static final String BINLOG_STATUS = "status";
     public static final String BINLOG_STATUS = "status";
+    public static final String BINLOG_DATA = "data";
 
 
 }
 }

+ 98 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Option.java

@@ -0,0 +1,98 @@
+package org.dbsyncer.storage.lucene;
+
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.highlight.Highlighter;
+import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
+import org.dbsyncer.storage.lucene.IndexFieldResolver;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-06-01 22:57
+ */
+public class Option {
+
+    private Query query;
+
+    private Set<String> highLightKeys;
+
+    private boolean enableHighLightSearch;
+
+    private Highlighter highlighter = null;
+
+    /**
+     * 只查总数
+     */
+    private boolean queryTotal;
+
+    /**
+     * 返回值转换器
+     */
+    private Map<String, IndexFieldResolverEnum> indexFieldResolverMap = new ConcurrentHashMap<>();
+
+    /**
+     * 指定返回的值类型
+     *
+     * @param name
+     * @return
+     */
+    public IndexFieldResolver getIndexFieldResolver(String name) {
+        IndexFieldResolverEnum indexFieldResolverEnum = indexFieldResolverMap.get(name);
+        if (null != indexFieldResolverEnum) {
+            return indexFieldResolverEnum.getIndexFieldResolver();
+        }
+        return IndexFieldResolverEnum.STRING.getIndexFieldResolver();
+    }
+
+    public Query getQuery() {
+        return query;
+    }
+
+    public void setQuery(Query query) {
+        this.query = query;
+    }
+
+    public Set<String> getHighLightKeys() {
+        return highLightKeys;
+    }
+
+    public void setHighLightKeys(Set<String> highLightKeys) {
+        this.highLightKeys = highLightKeys;
+    }
+
+    public boolean isEnableHighLightSearch() {
+        return enableHighLightSearch;
+    }
+
+    public void setEnableHighLightSearch(boolean enableHighLightSearch) {
+        this.enableHighLightSearch = enableHighLightSearch;
+    }
+
+    public Highlighter getHighlighter() {
+        return highlighter;
+    }
+
+    public void setHighlighter(Highlighter highlighter) {
+        this.highlighter = highlighter;
+    }
+
+    public boolean isQueryTotal() {
+        return queryTotal;
+    }
+
+    public void setQueryTotal(boolean queryTotal) {
+        this.queryTotal = queryTotal;
+    }
+
+    public Map<String, IndexFieldResolverEnum> getIndexFieldResolverMap() {
+        return indexFieldResolverMap;
+    }
+
+    public void setIndexFieldResolverMap(Map<String, IndexFieldResolverEnum> indexFieldResolverMap) {
+        this.indexFieldResolverMap = indexFieldResolverMap;
+    }
+}

+ 2 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -12,7 +12,6 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.store.FSDirectory;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.storage.StorageException;
 import org.dbsyncer.storage.StorageException;
-import org.dbsyncer.storage.query.Option;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
@@ -131,7 +130,7 @@ public class Shard {
         final TopDocs topDocs = getTopDocs(searcher, option.getQuery(), MAX_SIZE, sort);
         final TopDocs topDocs = getTopDocs(searcher, option.getQuery(), MAX_SIZE, sort);
         Paging paging = new Paging(pageNum, pageSize);
         Paging paging = new Paging(pageNum, pageSize);
         paging.setTotal(topDocs.totalHits);
         paging.setTotal(topDocs.totalHits);
-        if(option.isQueryTotal()){
+        if (option.isQueryTotal()) {
             return paging;
             return paging;
         }
         }
 
 
@@ -197,7 +196,7 @@ public class Shard {
                 }
                 }
 
 
                 // 解析value类型
                 // 解析value类型
-                r.put(f.name(), option.getFieldResolver(f.name()).getValue(f));
+                r.put(f.name(), option.getIndexFieldResolver(f.name()).getValue(f));
             }
             }
             list.add(r);
             list.add(r);
         }
         }

+ 33 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/AbstractFilter.java

@@ -0,0 +1,33 @@
+package org.dbsyncer.storage.query;
+
+import org.apache.lucene.search.Query;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.connector.model.Filter;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/11/17 23:56
+ */
+public abstract class AbstractFilter extends Filter {
+    private boolean enableHighLightSearch;
+
+    public AbstractFilter(String name, FilterEnum filterEnum, Object value) {
+        this(name, filterEnum, value, false);
+    }
+
+    public AbstractFilter(String name, FilterEnum filterEnum, Object value, boolean enableHighLightSearch) {
+        setName(name);
+        setFilter(filterEnum.getName());
+        setValue(String.valueOf(value));
+        this.enableHighLightSearch = enableHighLightSearch;
+    }
+
+    public abstract Query newEqual();
+
+    public abstract Query newLessThan();
+
+    public boolean isEnableHighLightSearch() {
+        return enableHighLightSearch;
+    }
+}

+ 13 - 14
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/BooleanQuery.java → dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/BooleanFilter.java

@@ -2,45 +2,44 @@ package org.dbsyncer.storage.query;
 
 
 
 
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
-import org.dbsyncer.connector.model.Filter;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 
 
-public class BooleanQuery {
+public class BooleanFilter {
 
 
-    private final List<BooleanQuery> clauses = new ArrayList<>();
+    private final List<BooleanFilter> clauses = new ArrayList<>();
 
 
-    private final List<Filter> filters = new ArrayList<>();
+    private final List<AbstractFilter> filters = new ArrayList<>();
 
 
     private OperationEnum operationEnum;
     private OperationEnum operationEnum;
 
 
-    public BooleanQuery add(BooleanQuery booleanQuery) {
-        return add(booleanQuery, OperationEnum.AND);
-    }
-
-    public BooleanQuery add(BooleanQuery booleanQuery, OperationEnum operationEnum) {
-        clauses.add(booleanQuery);
-        booleanQuery.setOperationEnum(operationEnum);
+    public BooleanFilter add(BooleanFilter booleanFilter, OperationEnum operationEnum) {
+        clauses.add(booleanFilter);
+        booleanFilter.setOperationEnum(operationEnum);
         return this;
         return this;
     }
     }
 
 
-    public BooleanQuery add(Filter filter) {
+    public BooleanFilter add(AbstractFilter filter) {
         filter.setOperation(OperationEnum.AND.getName());
         filter.setOperation(OperationEnum.AND.getName());
         filters.add(filter);
         filters.add(filter);
         return this;
         return this;
     }
     }
 
 
-    public BooleanQuery or(Filter filter) {
+    public BooleanFilter or(AbstractFilter filter) {
         filter.setOperation(OperationEnum.OR.getName());
         filter.setOperation(OperationEnum.OR.getName());
         filters.add(filter);
         filters.add(filter);
         return this;
         return this;
     }
     }
 
 
-    public List<Filter> getFilters() {
+    public List<AbstractFilter> getFilters() {
         return filters;
         return filters;
     }
     }
 
 
+    public List<BooleanFilter> getClauses() {
+        return clauses;
+    }
+
     public OperationEnum getOperationEnum() {
     public OperationEnum getOperationEnum() {
         return operationEnum;
         return operationEnum;
     }
     }

+ 0 - 93
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Option.java

@@ -1,93 +0,0 @@
-package org.dbsyncer.storage.query;
-
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.highlight.Highlighter;
-import org.apache.lucene.search.highlight.QueryScorer;
-import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
-import org.dbsyncer.storage.lucene.IndexFieldResolver;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * @version 1.0.0
- * @Author AE86
- * @Date 2020-06-01 22:57
- */
-public class Option {
-
-    private Query query;
-
-    private Set<String> highLightKeys;
-
-    private boolean enableHighLightSearch;
-
-    private Highlighter highlighter = null;
-
-    /**
-     * 只查总数
-     */
-    private boolean queryTotal;
-
-    private final Map<String, IndexFieldResolverEnum> fieldResolvers = new LinkedHashMap<>();
-
-    public Option(Query query) {
-        this.query = query;
-    }
-
-    public Option(Query query, boolean queryTotal, List<Param> params) {
-        this.query = query;
-        this.queryTotal = queryTotal;
-        if (!CollectionUtils.isEmpty(params)) {
-            this.highLightKeys = params.stream()
-                    .filter(p -> p.isHighlighter())
-                    .map(p -> p.getKey())
-                    .collect(Collectors.toSet());
-        }
-        if (!CollectionUtils.isEmpty(highLightKeys)) {
-            this.enableHighLightSearch = true;
-            SimpleHTMLFormatter formatter = new SimpleHTMLFormatter("<span style='color:red'>", "</span>");
-            highlighter = new Highlighter(formatter, new QueryScorer(query));
-        }
-    }
-
-    public IndexFieldResolver getFieldResolver(String name){
-        if(fieldResolvers.containsKey(name)){
-            return fieldResolvers.get(name).getIndexFieldResolver();
-        }
-        return IndexFieldResolverEnum.STRING.getIndexFieldResolver();
-    }
-
-    public void addIndexFieldResolverEnum(String name, IndexFieldResolverEnum fieldResolver){
-        fieldResolvers.putIfAbsent(name, fieldResolver);
-    }
-
-    public Query getQuery() {
-        return query;
-    }
-
-    public boolean isQueryTotal() {
-        return queryTotal;
-    }
-
-    public void setQueryTotal(boolean queryTotal) {
-        this.queryTotal = queryTotal;
-    }
-
-    public Set<String> getHighLightKeys() {
-        return highLightKeys;
-    }
-
-    public boolean isEnableHighLightSearch() {
-        return enableHighLightSearch;
-    }
-
-    public Highlighter getHighlighter() {
-        return highlighter;
-    }
-}

+ 0 - 36
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Param.java

@@ -1,36 +0,0 @@
-package org.dbsyncer.storage.query;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2019/11/17 23:56
- */
-public class Param {
-    private String key;
-    private String value;
-    private boolean highlighter;
-    private boolean number;
-
-    public Param(String key, String value, boolean highlighter, boolean number) {
-        this.key = key;
-        this.value = value;
-        this.highlighter = highlighter;
-        this.number = number;
-    }
-
-    public String getKey() {
-        return key;
-    }
-
-    public String getValue() {
-        return value;
-    }
-
-    public boolean isHighlighter() {
-        return highlighter;
-    }
-
-    public boolean isNumber() {
-        return number;
-    }
-}

+ 30 - 34
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Query.java

@@ -1,9 +1,12 @@
 package org.dbsyncer.storage.query;
 package org.dbsyncer.storage.query;
 
 
+import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
+import org.dbsyncer.storage.query.filter.IntFilter;
+import org.dbsyncer.storage.query.filter.StringFilter;
 
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
 /**
  * @author AE86
  * @author AE86
@@ -19,44 +22,40 @@ public class Query {
 
 
     private String metaId;
     private String metaId;
 
 
-    private List<Param> params;
+    private BooleanFilter booleanFilter = new BooleanFilter();
 
 
-    private BooleanQuery booleanQuery;
+    /**
+     * 查询应用性能,不用排序查询,只用查询总量即可
+     */
+    private boolean queryTotal;
 
 
     private int pageNum = 1;
     private int pageNum = 1;
 
 
     private int pageSize = 20;
     private int pageSize = 20;
 
 
-    private boolean enableHighLightSearch;
-
     /**
     /**
-     * 查询应用性能,不用排序查询,只用查询总量即可
+     * 返回值转换器,限Disk使用
      */
      */
-    private boolean queryTotal;
+    private Map<String, IndexFieldResolverEnum> indexFieldResolverMap = new ConcurrentHashMap<>();
 
 
     public Query() {
     public Query() {
-        this.params = new ArrayList<>();
     }
     }
 
 
     public Query(int pageNum, int pageSize) {
     public Query(int pageNum, int pageSize) {
         this.pageNum = pageNum;
         this.pageNum = pageNum;
         this.pageSize = pageSize;
         this.pageSize = pageSize;
-        this.params = new ArrayList<>();
     }
     }
 
 
-    public void addFilter(String key, String value) {
-        addFilter(key, value, false, false);
+    public void addFilter(String name, String value) {
+        booleanFilter.add(new StringFilter(name, value, false));
     }
     }
 
 
-    public void addFilter(String key, String value, boolean highlighter) {
-        addFilter(key, value, highlighter, false);
+    public void addFilter(String name, String value, boolean enableHighLightSearch) {
+        booleanFilter.add(new StringFilter(name, value, enableHighLightSearch));
     }
     }
 
 
-    public void addFilter(String key, String value, boolean highlighter, boolean number) {
-        params.add(new Param(key, value, highlighter, number));
-        if (highlighter) {
-            enableHighLightSearch = highlighter;
-        }
+    public void addFilter(String name, int value) {
+        booleanFilter.add(new IntFilter(name, value));
     }
     }
 
 
     public StorageEnum getType() {
     public StorageEnum getType() {
@@ -75,20 +74,20 @@ public class Query {
         this.metaId = metaId;
         this.metaId = metaId;
     }
     }
 
 
-    public List<Param> getParams() {
-        return params;
+    public BooleanFilter getBooleanFilter() {
+        return booleanFilter;
     }
     }
 
 
-    public void setParams(List<Param> params) {
-        this.params = params;
+    public void setBooleanFilter(BooleanFilter booleanFilter) {
+        this.booleanFilter = booleanFilter;
     }
     }
 
 
-    public BooleanQuery getBooleanQuery() {
-        return booleanQuery;
+    public boolean isQueryTotal() {
+        return queryTotal;
     }
     }
 
 
-    public void setBooleanQuery(BooleanQuery booleanQuery) {
-        this.booleanQuery = booleanQuery;
+    public void setQueryTotal(boolean queryTotal) {
+        this.queryTotal = queryTotal;
     }
     }
 
 
     public int getPageNum() {
     public int getPageNum() {
@@ -107,15 +106,12 @@ public class Query {
         this.pageSize = pageSize;
         this.pageSize = pageSize;
     }
     }
 
 
-    public boolean isEnableHighLightSearch() {
-        return enableHighLightSearch;
+    public Map<String, IndexFieldResolverEnum> getIndexFieldResolverMap() {
+        return indexFieldResolverMap;
     }
     }
 
 
-    public boolean isQueryTotal() {
-        return queryTotal;
+    public void setIndexFieldResolverMap(Map<String, IndexFieldResolverEnum> indexFieldResolverMap) {
+        this.indexFieldResolverMap = indexFieldResolverMap;
     }
     }
 
 
-    public void setQueryTotal(boolean queryTotal) {
-        this.queryTotal = queryTotal;
-    }
 }
 }

+ 28 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/filter/IntFilter.java

@@ -0,0 +1,28 @@
+package org.dbsyncer.storage.query.filter;
+
+import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.search.Query;
+import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.storage.query.AbstractFilter;
+
+public class IntFilter extends AbstractFilter {
+
+    public IntFilter(String name, int value) {
+        super(name, FilterEnum.EQUAL, value);
+    }
+
+    public IntFilter(String name, FilterEnum filterEnum, int value) {
+        super(name, filterEnum, value);
+    }
+
+    @Override
+    public Query newEqual() {
+        return IntPoint.newSetQuery(getName(), NumberUtil.toInt(getValue()));
+    }
+
+    @Override
+    public Query newLessThan() {
+        return IntPoint.newRangeQuery(getName(), Integer.MIN_VALUE, NumberUtil.toInt(getValue()));
+    }
+}

+ 24 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/filter/LongFilter.java

@@ -0,0 +1,24 @@
+package org.dbsyncer.storage.query.filter;
+
+import org.apache.lucene.document.LongPoint;
+import org.apache.lucene.search.Query;
+import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.storage.query.AbstractFilter;
+
+public class LongFilter extends AbstractFilter {
+
+    public LongFilter(String name, FilterEnum filterEnum, long value) {
+        super(name, filterEnum, value);
+    }
+
+    @Override
+    public Query newEqual() {
+        return LongPoint.newSetQuery(getName(), NumberUtil.toLong(getValue()));
+    }
+
+    @Override
+    public Query newLessThan() {
+        return LongPoint.newRangeQuery(getName(), Long.MIN_VALUE, NumberUtil.toLong(getValue()));
+    }
+}

+ 25 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/filter/StringFilter.java

@@ -0,0 +1,25 @@
+package org.dbsyncer.storage.query.filter;
+
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.storage.StorageException;
+import org.dbsyncer.storage.query.AbstractFilter;
+
+public class StringFilter extends AbstractFilter {
+
+    public StringFilter(String name, String value, boolean enableHighLightSearch) {
+        super(name, FilterEnum.EQUAL, value, enableHighLightSearch);
+    }
+
+    @Override
+    public Query newEqual() {
+        return new TermQuery(new Term(getName(), getValue()));
+    }
+
+    @Override
+    public Query newLessThan() {
+        throw new StorageException("Unsupported method newLessThan.");
+    }
+}

+ 78 - 21
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -1,28 +1,30 @@
 package org.dbsyncer.storage.support;
 package org.dbsyncer.storage.support;
 
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.IntPoint;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.*;
 import org.apache.lucene.search.*;
+import org.apache.lucene.search.highlight.Highlighter;
+import org.apache.lucene.search.highlight.QueryScorer;
+import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.storage.AbstractStorageService;
 import org.dbsyncer.storage.AbstractStorageService;
 import org.dbsyncer.storage.StorageException;
 import org.dbsyncer.storage.StorageException;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.lucene.Shard;
 import org.dbsyncer.storage.lucene.Shard;
-import org.dbsyncer.storage.query.Option;
-import org.dbsyncer.storage.query.Param;
+import org.dbsyncer.storage.query.AbstractFilter;
+import org.dbsyncer.storage.query.BooleanFilter;
+import org.dbsyncer.storage.lucene.Option;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.util.DocumentUtil;
 import org.dbsyncer.storage.util.DocumentUtil;
 
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
 /**
@@ -56,26 +58,40 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
             Shard shard = getShard(sharding);
             Shard shard = getShard(sharding);
             int pageNum = query.getPageNum() <= 0 ? 1 : query.getPageNum();
             int pageNum = query.getPageNum() <= 0 ? 1 : query.getPageNum();
             int pageSize = query.getPageSize() <= 0 ? 20 : query.getPageSize();
             int pageSize = query.getPageSize() <= 0 ? 20 : query.getPageSize();
-            boolean queryTotal = query.isQueryTotal();
             // 根据修改时间 > 创建时间排序
             // 根据修改时间 > 创建时间排序
             Sort sort = new Sort(new SortField(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, SortField.Type.LONG, true),
             Sort sort = new Sort(new SortField(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, SortField.Type.LONG, true),
                     new SortField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, SortField.Type.LONG, true));
                     new SortField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, SortField.Type.LONG, true));
+            Option option = new Option();
+            option.setQueryTotal(query.isQueryTotal());
+            option.setIndexFieldResolverMap(query.getIndexFieldResolverMap());
             // 设置参数
             // 设置参数
-            List<Param> params = query.getParams();
-            if (!CollectionUtils.isEmpty(params)) {
-                BooleanQuery.Builder builder = new BooleanQuery.Builder();
-                params.forEach(p -> {
-                    if (p.isNumber()) {
-                        builder.add(IntPoint.newSetQuery(p.getKey(), NumberUtil.toInt(p.getValue())), BooleanClause.Occur.MUST);
-                    } else {
-                        builder.add(new TermQuery(new Term(p.getKey(), p.getValue())), BooleanClause.Occur.MUST);
-                    }
-                });
-                BooleanQuery q = builder.build();
-                return shard.query(new Option(q, queryTotal, params), pageNum, pageSize, sort);
+            BooleanFilter baseQuery = query.getBooleanFilter();
+            List<AbstractFilter> filters = baseQuery.getFilters();
+            List<BooleanFilter> clauses = baseQuery.getClauses();
+            if (CollectionUtils.isEmpty(clauses) && CollectionUtils.isEmpty(filters)) {
+                option.setQuery(new MatchAllDocsQuery());
+                return shard.query(option, pageNum, pageSize, sort);
             }
             }
 
 
-            return shard.query(new Option(new MatchAllDocsQuery(), queryTotal, null), pageNum, pageSize, sort);
+            Set<String> highLightKeys = new HashSet<>();
+            BooleanQuery build = null;
+            if (!CollectionUtils.isEmpty(filters)) {
+                build = buildQueryWithFilters(filters, highLightKeys);
+            } else {
+                build = buildQueryWithBooleanFilters(clauses, highLightKeys);
+            }
+
+            option.setQuery(build);
+
+            // 高亮查询
+            if (!CollectionUtils.isEmpty(highLightKeys)) {
+                option.setHighLightKeys(highLightKeys);
+                option.setEnableHighLightSearch(true);
+                SimpleHTMLFormatter formatter = new SimpleHTMLFormatter("<span style='color:red'>", "</span>");
+                option.setHighlighter(new Highlighter(formatter, new QueryScorer(build)));
+            }
+
+            return shard.query(option, pageNum, pageSize, sort);
         } catch (IOException e) {
         } catch (IOException e) {
             throw new StorageException(e);
             throw new StorageException(e);
         }
         }
@@ -127,6 +143,43 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
         shards.clear();
         shards.clear();
     }
     }
 
 
+    private BooleanQuery buildQueryWithFilters(List<AbstractFilter> filters, Set<String> highLightKeys) {
+        BooleanQuery.Builder builder = new BooleanQuery.Builder();
+        filters.forEach(p -> {
+            FilterEnum filterEnum = FilterEnum.getFilterEnum(p.getFilter());
+            BooleanClause.Occur occur = getOccur(p.getOperation());
+            switch (filterEnum) {
+                case EQUAL:
+                case LIKE:
+                    builder.add(p.newEqual(), occur);
+                    break;
+                case LT:
+                    builder.add(p.newLessThan(), occur);
+                    break;
+            }
+
+            if (p.isEnableHighLightSearch()) {
+                highLightKeys.add(p.getName());
+            }
+        });
+        return builder.build();
+    }
+
+    private BooleanQuery buildQueryWithBooleanFilters(List<BooleanFilter> clauses, Set<String> highLightKeys) {
+        BooleanQuery.Builder builder = new BooleanQuery.Builder();
+        clauses.forEach(c -> {
+            if (!CollectionUtils.isEmpty(c.getFilters())) {
+                BooleanQuery cBuild = buildQueryWithFilters(c.getFilters(), highLightKeys);
+                builder.add(cBuild, getOccur(c.getOperationEnum().getName()));
+            }
+        });
+        return builder.build();
+    }
+
+    private BooleanClause.Occur getOccur(String operation) {
+        return OperationEnum.isAnd(operation) ? BooleanClause.Occur.MUST : BooleanClause.Occur.SHOULD;
+    }
+
     private Term getPrimaryKeyTerm(Document doc) {
     private Term getPrimaryKeyTerm(Document doc) {
         return new Term(ConfigConstant.CONFIG_MODEL_ID, doc.getField(ConfigConstant.CONFIG_MODEL_ID).stringValue());
         return new Term(ConfigConstant.CONFIG_MODEL_ID, doc.getField(ConfigConstant.CONFIG_MODEL_ID).stringValue());
     }
     }
@@ -183,4 +236,8 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
     interface ExecuteMapper {
     interface ExecuteMapper {
         void apply(Shard shard, List<Document> docs) throws IOException;
         void apply(Shard shard, List<Document> docs) throws IOException;
     }
     }
+
+    interface BooleanClauseBuilder {
+        BooleanClause toBooleanClause();
+    }
 }
 }

+ 96 - 29
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -11,13 +11,15 @@ import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.Database;
 import org.dbsyncer.connector.database.Database;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.ConnectorEnum;
+import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.util.DatabaseUtil;
 import org.dbsyncer.connector.util.DatabaseUtil;
 import org.dbsyncer.storage.AbstractStorageService;
 import org.dbsyncer.storage.AbstractStorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
-import org.dbsyncer.storage.query.Param;
+import org.dbsyncer.storage.query.AbstractFilter;
+import org.dbsyncer.storage.query.BooleanFilter;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.util.UnderlineToCamelUtils;
 import org.dbsyncer.storage.util.UnderlineToCamelUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -41,7 +43,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.Stream;
 
 
@@ -111,10 +112,11 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             return paging;
             return paging;
         }
         }
 
 
+        List<AbstractFilter> highLightKeys = new ArrayList<>();
         List<Object> queryArgs = new ArrayList<>();
         List<Object> queryArgs = new ArrayList<>();
-        String querySql = buildQuerySql(query, executor, queryArgs);
+        String querySql = buildQuerySql(query, executor, queryArgs, highLightKeys);
         List<Map<String, Object>> data = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, queryArgs.toArray()));
         List<Map<String, Object>> data = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, queryArgs.toArray()));
-        replaceHighLight(query, data);
+        replaceHighLight(highLightKeys, data);
         paging.setData(data);
         paging.setData(data);
         return paging;
         return paging;
     }
     }
@@ -217,9 +219,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         return args.toArray();
         return args.toArray();
     }
     }
 
 
-    private String buildQuerySql(Query query, Executor executor, List<Object> args) {
+    private String buildQuerySql(Query query, Executor executor, List<Object> args, List<AbstractFilter> highLightKeys) {
         StringBuilder sql = new StringBuilder(executor.getQuery());
         StringBuilder sql = new StringBuilder(executor.getQuery());
-        buildQuerySqlWithParams(query, args, sql);
+        buildQuerySqlWithParams(query, args, sql, highLightKeys);
         // order by updateTime,createTime desc
         // order by updateTime,createTime desc
         sql.append(" order by ");
         sql.append(" order by ");
         if (executor.isOrderByUpdateTime()) {
         if (executor.isOrderByUpdateTime()) {
@@ -236,26 +238,85 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         StringBuilder queryCount = new StringBuilder();
         StringBuilder queryCount = new StringBuilder();
         queryCount.append("SELECT COUNT(1) FROM (");
         queryCount.append("SELECT COUNT(1) FROM (");
         StringBuilder sql = new StringBuilder("SELECT 1 FROM `").append(executor.getTable()).append("`");
         StringBuilder sql = new StringBuilder("SELECT 1 FROM `").append(executor.getTable()).append("`");
-        buildQuerySqlWithParams(query, args, sql);
+        buildQuerySqlWithParams(query, args, sql, null);
         queryCount.append(sql);
         queryCount.append(sql);
         queryCount.append(" GROUP BY `ID`) DBSYNCER_T");
         queryCount.append(" GROUP BY `ID`) DBSYNCER_T");
         return queryCount.toString();
         return queryCount.toString();
     }
     }
 
 
-    private void buildQuerySqlWithParams(Query query, List<Object> args, StringBuilder sql) {
-        List<Param> params = query.getParams();
-        if (!CollectionUtils.isEmpty(params)) {
-            sql.append(" WHERE ");
-            AtomicBoolean flag = new AtomicBoolean();
-            params.forEach(p -> {
-                if (flag.get()) {
-                    sql.append(" AND ");
-                }
-                // name=?
-                sql.append(p.getKey()).append(p.isHighlighter() ? " LIKE ?" : "=?");
-                args.add(p.isHighlighter() ? new StringBuilder("%").append(p.getValue()).append("%") : p.getValue());
-                flag.set(true);
-            });
+    private void buildQuerySqlWithParams(Query query, List<Object> args, StringBuilder sql, List<AbstractFilter> highLightKeys) {
+        BooleanFilter baseQuery = query.getBooleanFilter();
+        List<BooleanFilter> clauses = baseQuery.getClauses();
+        List<AbstractFilter> filters = baseQuery.getFilters();
+        if (CollectionUtils.isEmpty(clauses) && CollectionUtils.isEmpty(filters)) {
+            return;
+        }
+
+        sql.append(" WHERE ");
+        if (!CollectionUtils.isEmpty(filters)) {
+            buildQuerySqlWithFilters(filters, args, sql, highLightKeys);
+            return;
+        }
+
+        buildQuerySqlWithBooleanFilters(clauses, args, sql, highLightKeys);
+    }
+
+    private void buildQuerySqlWithFilters(List<AbstractFilter> filters, List<Object> args, StringBuilder sql, List<AbstractFilter> highLightKeys) {
+        // 过滤值
+        int size = filters.size();
+        for (int i = 0; i < size; i++) {
+            AbstractFilter p = filters.get(i);
+
+            if (i > 0) {
+                sql.append(" ").append(p.getOperation().toUpperCase()).append(" ");
+            }
+
+            FilterEnum filterEnum = FilterEnum.getFilterEnum(p.getFilter());
+            String name = UnderlineToCamelUtils.camelToUnderline(p.getName());
+            switch (filterEnum) {
+                case EQUAL:
+                    sql.append(name).append(" = ?");
+                    args.add(p.getValue());
+                    break;
+                case LIKE:
+                    sql.append(name).append(" LIKE ?");
+                    args.add(new StringBuilder("%").append(p.getValue()).append("%"));
+                    break;
+                case LT:
+                    sql.append(name).append(" < ?");
+                    args.add(p.getValue());
+                    break;
+            }
+            if (null != highLightKeys && p.isEnableHighLightSearch()) {
+                highLightKeys.add(p);
+            }
+        }
+    }
+
+    private void buildQuerySqlWithBooleanFilters(List<BooleanFilter> clauses, List<Object> args, StringBuilder sql, List<AbstractFilter> highLightKeys) {
+        // 解析查询
+        int size = clauses.size();
+        for (int i = 0; i < size; i++) {
+            BooleanFilter booleanFilter = clauses.get(i);
+            List<AbstractFilter> filters = booleanFilter.getFilters();
+            if (CollectionUtils.isEmpty(filters)) {
+                continue;
+            }
+
+            // 组合条件
+            if (i > 0) {
+                sql.append(" ").append(booleanFilter.getOperationEnum().name().toUpperCase()).append(" ");
+            }
+
+            if (size > 0) {
+                sql.append("(");
+            }
+
+            buildQuerySqlWithFilters(filters, args, sql, highLightKeys);
+
+            if (size > 0) {
+                sql.append(")");
+            }
         }
         }
     }
     }
 
 
@@ -305,12 +366,17 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
         builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
         List<Field> logFields = builder.getFields();
         List<Field> logFields = builder.getFields();
 
 
+        // 缓存任务
+        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.BINLOG_STATUS, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.BINLOG_DATA);
+        List<Field> binlogFields = builder.getFields();
+
         // 数据
         // 数据
         builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.DATA_SUCCESS, ConfigConstant.DATA_TABLE_GROUP_ID, ConfigConstant.DATA_TARGET_TABLE_NAME, ConfigConstant.DATA_EVENT, ConfigConstant.DATA_ERROR, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
         builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.DATA_SUCCESS, ConfigConstant.DATA_TABLE_GROUP_ID, ConfigConstant.DATA_TARGET_TABLE_NAME, ConfigConstant.DATA_EVENT, ConfigConstant.DATA_ERROR, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
         List<Field> dataFields = builder.getFields();
         List<Field> dataFields = builder.getFields();
 
 
         tables.computeIfAbsent(StorageEnum.CONFIG.getType(), k -> new Executor(k, configFields, true, true));
         tables.computeIfAbsent(StorageEnum.CONFIG.getType(), k -> new Executor(k, configFields, true, true));
         tables.computeIfAbsent(StorageEnum.LOG.getType(), k -> new Executor(k, logFields, true, false));
         tables.computeIfAbsent(StorageEnum.LOG.getType(), k -> new Executor(k, logFields, true, false));
+        tables.computeIfAbsent(StorageEnum.BINLOG.getType(), k -> new Executor(k, binlogFields, true, false));
         tables.computeIfAbsent(StorageEnum.DATA.getType(), k -> new Executor(k, dataFields, false, false));
         tables.computeIfAbsent(StorageEnum.DATA.getType(), k -> new Executor(k, dataFields, false, false));
         // 创建表
         // 创建表
         tables.forEach((tableName, e) -> {
         tables.forEach((tableName, e) -> {
@@ -385,15 +451,14 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         });
         });
     }
     }
 
 
-    private void replaceHighLight(Query query, List<Map<String, Object>> list) {
+    private void replaceHighLight(List<AbstractFilter> highLightKeys, List<Map<String, Object>> list) {
         // 开启高亮
         // 开启高亮
-        if (!CollectionUtils.isEmpty(list) && query.isEnableHighLightSearch()) {
-            List<Param> highLight = query.getParams().stream().filter(p -> p.isHighlighter()).collect(Collectors.toList());
+        if (!CollectionUtils.isEmpty(list) && !CollectionUtils.isEmpty(highLightKeys)) {
             list.forEach(row ->
             list.forEach(row ->
-                    highLight.forEach(p -> {
-                        String text = String.valueOf(row.get(p.getKey()));
-                        String replacement = new StringBuilder("<span style='color:red'>").append(p.getValue()).append("</span>").toString();
-                        row.put(p.getKey(), StringUtil.replace(text, p.getValue(), replacement));
+                    highLightKeys.forEach(paramFilter -> {
+                        String text = String.valueOf(row.get(paramFilter.getName()));
+                        String replacement = new StringBuilder("<span style='color:red'>").append(paramFilter.getValue()).append("</span>").toString();
+                        row.put(paramFilter.getName(), StringUtil.replace(text, paramFilter.getValue(), replacement));
                     })
                     })
             );
             );
         }
         }
@@ -419,7 +484,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
                     new Field(ConfigConstant.DATA_TABLE_GROUP_ID, "VARCHAR", Types.VARCHAR),
                     new Field(ConfigConstant.DATA_TABLE_GROUP_ID, "VARCHAR", Types.VARCHAR),
                     new Field(ConfigConstant.DATA_TARGET_TABLE_NAME, "VARCHAR", Types.VARCHAR),
                     new Field(ConfigConstant.DATA_TARGET_TABLE_NAME, "VARCHAR", Types.VARCHAR),
                     new Field(ConfigConstant.DATA_EVENT, "VARCHAR", Types.VARCHAR),
                     new Field(ConfigConstant.DATA_EVENT, "VARCHAR", Types.VARCHAR),
-                    new Field(ConfigConstant.DATA_ERROR, "LONGVARCHAR", Types.LONGVARCHAR)
+                    new Field(ConfigConstant.DATA_ERROR, "LONGVARCHAR", Types.LONGVARCHAR),
+                    new Field(ConfigConstant.BINLOG_STATUS, "INTEGER", Types.INTEGER),
+                    new Field(ConfigConstant.BINLOG_DATA, "VARBINARY", Types.VARBINARY)
             ).map(field -> {
             ).map(field -> {
                 field.setLabelName(field.getName());
                 field.setLabelName(field.getName());
                 // 转换列下划线
                 // 转换列下划线

+ 5 - 23
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/DocumentUtil.java

@@ -2,7 +2,6 @@ package org.dbsyncer.storage.util;
 
 
 import org.apache.lucene.document.*;
 import org.apache.lucene.document.*;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRef;
-import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
 
 
@@ -16,7 +15,7 @@ import java.util.Map;
  * <p/> new NumericDocValuesField(name, value); 要排序,必须添加一个同名的SortedNumericDocValuesField
  * <p/> new NumericDocValuesField(name, value); 要排序,必须添加一个同名的SortedNumericDocValuesField
  * <p/> 其他FloatPoint、LongPoint、DoublePoint同上
  * <p/> 其他FloatPoint、LongPoint、DoublePoint同上
  * <p/> id使用字符串,防止更新失败
  * <p/> id使用字符串,防止更新失败
- *
+ * <p>
  * <p/>2、Field:
  * <p/>2、Field:
  * <p/>IntPoint
  * <p/>IntPoint
  * <p/>FloatPoint
  * <p/>FloatPoint
@@ -31,7 +30,7 @@ import java.util.Map;
  * <p/>FloatDocValuesField 存储Float类型索引并排序
  * <p/>FloatDocValuesField 存储Float类型索引并排序
  * <p/>DoubleDocValuesField 存储Double类型索引并排序
  * <p/>DoubleDocValuesField 存储Double类型索引并排序
  * <p/>BinaryDocValuesField 只存储不共享,例如标题类字段,如果需要共享并排序,推荐使用SortedDocValuesField
  * <p/>BinaryDocValuesField 只存储不共享,例如标题类字段,如果需要共享并排序,推荐使用SortedDocValuesField
- *
+ * <p>
  * <p/>3、Lucene 6.0版本后:
  * <p/>3、Lucene 6.0版本后:
  * <p>IntField 替换为 IntPoint</p>
  * <p>IntField 替换为 IntPoint</p>
  * <p>FloatField 替换为 FloatPoint</p>
  * <p>FloatField 替换为 FloatPoint</p>
@@ -116,23 +115,6 @@ public abstract class DocumentUtil {
         return doc;
         return doc;
     }
     }
 
 
-    @Deprecated
-    public static Document convertBinlog2Doc(String messageId, int status, BytesRef bytes, long updateTime) {
-        Document doc = new Document();
-        doc.add(new StringField(BinlogConstant.BINLOG_ID, messageId, Field.Store.YES));
-
-        doc.add(new IntPoint(BinlogConstant.BINLOG_STATUS, status));
-        doc.add(new StoredField(BinlogConstant.BINLOG_STATUS, status));
-
-        doc.add(new BinaryDocValuesField(BinlogConstant.BINLOG_CONTENT, bytes));
-        doc.add(new StoredField(BinlogConstant.BINLOG_CONTENT, bytes));
-
-        doc.add(new LongPoint(BinlogConstant.BINLOG_TIME, updateTime));
-        doc.add(new StoredField(BinlogConstant.BINLOG_TIME, updateTime));
-        doc.add(new NumericDocValuesField(BinlogConstant.BINLOG_TIME, updateTime));
-        return doc;
-    }
-
     public static Document convertBinlog2Doc(Map params) {
     public static Document convertBinlog2Doc(Map params) {
         Document doc = new Document();
         Document doc = new Document();
         String id = (String) params.get(ConfigConstant.CONFIG_MODEL_ID);
         String id = (String) params.get(ConfigConstant.CONFIG_MODEL_ID);
@@ -142,9 +124,9 @@ public abstract class DocumentUtil {
         doc.add(new IntPoint(ConfigConstant.BINLOG_STATUS, status));
         doc.add(new IntPoint(ConfigConstant.BINLOG_STATUS, status));
         doc.add(new StoredField(ConfigConstant.BINLOG_STATUS, status));
         doc.add(new StoredField(ConfigConstant.BINLOG_STATUS, status));
 
 
-        byte[] bytes = (byte[]) params.get(ConfigConstant.CONFIG_MODEL_JSON);
-        doc.add(new BinaryDocValuesField(ConfigConstant.CONFIG_MODEL_JSON, new BytesRef(bytes)));
-        doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_JSON, bytes));
+        byte[] bytes = (byte[]) params.get(ConfigConstant.BINLOG_DATA);
+        doc.add(new BinaryDocValuesField(ConfigConstant.BINLOG_DATA, new BytesRef(bytes)));
+        doc.add(new StoredField(ConfigConstant.BINLOG_DATA, bytes));
 
 
         Long createTime = (Long) params.get(ConfigConstant.CONFIG_MODEL_CREATE_TIME);
         Long createTime = (Long) params.get(ConfigConstant.CONFIG_MODEL_CREATE_TIME);
         doc.add(new LongPoint(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
         doc.add(new LongPoint(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));

+ 8 - 0
dbsyncer-storage/src/main/resources/dbsyncer_binlog.sql

@@ -0,0 +1,8 @@
+CREATE TABLE `dbsyncer_binlog`  (
+  `ID` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '唯一ID',
+  `STATUS` int(1) NOT NULL COMMENT '状态,0-待同步;1-同步中',
+  `CREATE_TIME` bigint(0) NOT NULL COMMENT '创建时间',
+  `DATA` blob NOT NULL COMMENT '同步数据',
+  PRIMARY KEY (`ID`) USING BTREE,
+  INDEX `IDX_TYPE_CREATE_TIME`(`STATUS`, `CREATE_TIME`) USING BTREE
+) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin COMMENT = '缓存队列任务表' ROW_FORMAT = Dynamic;