Browse Source

!110 fixbug
Merge pull request !110 from AE86/V_1.0.0_RC

AE86 2 years ago
parent
commit
284fb54607
52 changed files with 1368 additions and 697 deletions
  1. 26 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/MonitorService.java
  2. 204 5
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java
  3. 35 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/vo/BinlogColumnVo.java
  4. 46 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/vo/MessageVo.java
  5. 18 2
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/DateFormatUtil.java
  6. 11 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java
  7. 16 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/FilterEnum.java
  8. 0 9
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Filter.java
  9. 3 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/LongVarcharValueMapper.java
  10. 4 1
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/Command.java
  11. 2 0
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java
  12. 5 0
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java
  13. 14 0
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/command/Persistence.java
  14. 2 0
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/command/PersistenceCommand.java
  15. 35 0
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/command/Preload.java
  16. 20 8
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/command/PreloadCommand.java
  17. 34 17
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/enums/CommandEnum.java
  18. 10 0
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/Monitor.java
  19. 44 4
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java
  20. 3 15
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/AbstractWriterBinlog.java
  21. 13 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  22. 5 6
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  23. 8 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/AbstractStorageService.java
  24. 0 241
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java
  25. 43 29
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogService.java
  26. 18 23
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/BinlogConstant.java
  27. 1 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java
  28. 1 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/IndexFieldResolverEnum.java
  29. 98 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Option.java
  30. 2 3
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java
  31. 33 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/AbstractFilter.java
  32. 13 14
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/BooleanFilter.java
  33. 0 93
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Option.java
  34. 0 36
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Param.java
  35. 30 34
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Query.java
  36. 28 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/filter/IntFilter.java
  37. 24 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/filter/LongFilter.java
  38. 25 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/filter/StringFilter.java
  39. 75 21
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java
  40. 102 36
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java
  41. 22 3
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/BinlogMessageUtil.java
  42. 11 25
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/DocumentUtil.java
  43. 8 0
      dbsyncer-storage/src/main/resources/dbsyncer_binlog.sql
  44. 1 1
      dbsyncer-storage/src/main/resources/dbsyncer_data.sql
  45. 2 1
      dbsyncer-storage/src/main/resources/dbsyncer_upgrade.sql
  46. 20 1
      dbsyncer-web/src/main/java/org/dbsyncer/web/controller/monitor/MonitorController.java
  47. 1 1
      dbsyncer-web/src/main/resources/public/connector/addDqlMysql.html
  48. 1 1
      dbsyncer-web/src/main/resources/public/connector/addMysql.html
  49. 5 1
      dbsyncer-web/src/main/resources/public/monitor/monitor.html
  50. 63 0
      dbsyncer-web/src/main/resources/public/monitor/retry.html
  51. 117 61
      dbsyncer-web/src/main/resources/static/js/monitor/index.js
  52. 66 0
      dbsyncer-web/src/main/resources/static/js/monitor/retry.js

+ 26 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/MonitorService.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.biz;
 
 import org.dbsyncer.biz.vo.AppReportMetricVo;
+import org.dbsyncer.biz.vo.MessageVo;
 import org.dbsyncer.biz.vo.MetaVo;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.monitor.enums.MetricEnum;
@@ -24,6 +25,14 @@ public interface MonitorService {
      */
     List<MetaVo> getMetaAll();
 
+    /**
+     * 获取驱动元信息
+     *
+     * @param metaId
+     * @return
+     */
+    MetaVo getMetaVo(String metaId);
+
     /**
      * 获取驱动默认元信息id
      * @param params
@@ -39,6 +48,22 @@ public interface MonitorService {
      */
     Paging queryData(Map<String, String> params);
 
+    /**
+     * 获取驱动同步数据
+     * @param metaId
+     * @param messageId
+     * @return
+     */
+    MessageVo getMessageVo(String metaId, String messageId);
+
+    /**
+     * 手动同步单条数据
+     *
+     * @param params
+     * @return
+     */
+    String sync(Map<String, String> params);
+
     /**
      * 清空驱动同步数据
      *
@@ -82,4 +107,5 @@ public interface MonitorService {
      * @return
      */
     AppReportMetricVo queryAppReportMetric(List<MetricResponse> metrics);
+
 }

+ 204 - 5
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java

@@ -1,14 +1,29 @@
 package org.dbsyncer.biz.impl;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.dbsyncer.biz.MonitorService;
 import org.dbsyncer.biz.metric.MetricDetailFormatter;
-import org.dbsyncer.biz.metric.impl.*;
-import org.dbsyncer.biz.vo.*;
+import org.dbsyncer.biz.metric.impl.CpuMetricDetailFormatter;
+import org.dbsyncer.biz.metric.impl.DiskMetricDetailFormatter;
+import org.dbsyncer.biz.metric.impl.DoubleRoundMetricDetailFormatter;
+import org.dbsyncer.biz.metric.impl.GCMetricDetailFormatter;
+import org.dbsyncer.biz.metric.impl.MemoryMetricDetailFormatter;
+import org.dbsyncer.biz.metric.impl.ValueMetricDetailFormatter;
+import org.dbsyncer.biz.vo.AppReportMetricVo;
+import org.dbsyncer.biz.vo.BinlogColumnVo;
+import org.dbsyncer.biz.vo.DataVo;
+import org.dbsyncer.biz.vo.LogVo;
+import org.dbsyncer.biz.vo.MessageVo;
+import org.dbsyncer.biz.vo.MetaVo;
+import org.dbsyncer.biz.vo.MetricResponseVo;
+import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.monitor.Monitor;
 import org.dbsyncer.monitor.enums.DiskMetricEnum;
 import org.dbsyncer.monitor.enums.MetricEnum;
@@ -17,17 +32,29 @@ import org.dbsyncer.monitor.enums.ThreadPoolMetricEnum;
 import org.dbsyncer.monitor.model.AppReportMetric;
 import org.dbsyncer.monitor.model.MetricResponse;
 import org.dbsyncer.parser.enums.ModelEnum;
+import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.parser.model.Picker;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.WriterRequest;
+import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
+import org.dbsyncer.storage.util.BinlogMessageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -41,9 +68,17 @@ import java.util.stream.Collectors;
 @Service
 public class MonitorServiceImpl implements MonitorService {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
     @Autowired
     private Monitor monitor;
 
+    @Autowired
+    private CacheService cacheService;
+
+    @Autowired
+    private BufferActuator writerBufferActuator;
+
     private Map<String, MetricDetailFormatter> metricDetailFormatterMap = new LinkedHashMap<>();
 
     @PostConstruct
@@ -78,6 +113,14 @@ public class MonitorServiceImpl implements MonitorService {
         return list;
     }
 
+    @Override
+    public MetaVo getMetaVo(String metaId) {
+        Meta meta = monitor.getMeta(metaId);
+        Assert.notNull(meta, "The meta is null.");
+
+        return convertMeta2Vo(meta);
+    }
+
     @Override
     public String getDefaultMetaId(Map<String, String> params) {
         String id = params.get(ConfigConstant.CONFIG_MODEL_ID);
@@ -94,12 +137,77 @@ public class MonitorServiceImpl implements MonitorService {
 
         Paging paging = monitor.queryData(getDefaultMetaId(id), pageNum, pageSize, error, success);
         List<Map> data = (List<Map>) paging.getData();
-        paging.setData(data.stream()
-                .map(m -> convert2Vo(m, DataVo.class))
-                .collect(Collectors.toList()));
+        List<DataVo> list = new ArrayList<>();
+        for (Map row : data) {
+            try {
+                DataVo dataVo = convert2Vo(row, DataVo.class);
+                Map binlogData = getBinlogData(row, true);
+                dataVo.setJson(JsonUtil.objToJson(binlogData));
+                list.add(dataVo);
+            } catch (Exception e) {
+                logger.error(e.getLocalizedMessage());
+            }
+        }
+        paging.setData(list);
         return paging;
     }
 
+    @Override
+    public MessageVo getMessageVo(String metaId, String messageId) {
+        Assert.hasText(metaId, "The metaId is null.");
+        Assert.hasText(messageId, "The messageId is null.");
+
+        MessageVo messageVo = new MessageVo();
+        try {
+            Map row = monitor.getData(metaId, messageId);
+            Map binlogData = getBinlogData(row, true);
+            String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
+            TableGroup tableGroup = monitor.getTableGroup(tableGroupId);
+            messageVo.setSourceTableName(tableGroup.getSourceTable().getName());
+            messageVo.setTargetTableName(tableGroup.getTargetTable().getName());
+            messageVo.setId(messageId);
+
+            if (!CollectionUtils.isEmpty(binlogData)) {
+                Map<String, String> columnMap = tableGroup.getTargetTable().getColumn().stream().collect(Collectors.toMap(Field::getName, Field::getTypeName));
+                List<BinlogColumnVo> columns = new ArrayList<>();
+                binlogData.forEach((k, v) -> columns.add(new BinlogColumnVo((String) k, v, columnMap.get(k))));
+                messageVo.setColumns(columns);
+            }
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage());
+        }
+        return messageVo;
+    }
+
+    @Override
+    public String sync(Map<String, String> params) {
+        String metaId = params.get("metaId");
+        String messageId = params.get("messageId");
+        Assert.hasText(metaId, "The metaId is null.");
+        Assert.hasText(messageId, "The messageId is null.");
+
+        try {
+            Map row = monitor.getData(metaId, messageId);
+            Map binlogData = getBinlogData(row, false);
+            // 历史数据不支持手动同步
+            if (CollectionUtils.isEmpty(binlogData)) {
+                return messageId;
+            }
+            String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
+            String event = (String) row.get(ConfigConstant.DATA_EVENT);
+            // 有修改同步值
+            String retryDataParams = params.get("retryDataParams");
+            if (StringUtil.isNotBlank(retryDataParams)) {
+                JsonUtil.parseObject(retryDataParams).getInnerMap().forEach((k, v) -> binlogData.put(k, convertValue(binlogData.get(k), (String) v)));
+            }
+            writerBufferActuator.offer(new WriterRequest(tableGroupId, event, binlogData));
+            monitor.removeData(metaId, messageId);
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage());
+        }
+        return messageId;
+    }
+
     @Override
     public String clearData(String id) {
         Assert.hasText(id, "驱动不存在.");
@@ -145,6 +253,97 @@ public class MonitorServiceImpl implements MonitorService {
         return vo;
     }
 
+    private Map getBinlogData(Map row, boolean prettyBytes) throws InvalidProtocolBufferException {
+        String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
+        byte[] bytes = (byte[]) row.get(ConfigConstant.BINLOG_DATA);
+        if (null == bytes) {
+            if (prettyBytes) {
+                String json = (String) row.get(ConfigConstant.CONFIG_MODEL_JSON);
+                return JsonUtil.parseObject(json).toJavaObject(Map.class);
+            }
+            return Collections.EMPTY_MAP;
+        }
+        BinlogMap message = BinlogMap.parseFrom(bytes);
+
+        // 1、获取配置信息
+        final TableGroup tableGroup = cacheService.get(tableGroupId, TableGroup.class);
+
+        // 2、反序列数据
+        Map<String, Object> map = new HashMap<>();
+        final Picker picker = new Picker(tableGroup.getFieldMapping());
+        final Map<String, Field> fieldMap = picker.getSourceFieldMap();
+        message.getRowMap().forEach((k, v) -> {
+            if (fieldMap.containsKey(k)) {
+                Object val = BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v);
+                // 处理二进制对象显示
+                if (prettyBytes) {
+                    if (null != val && val instanceof byte[]) {
+                        byte[] b = (byte[]) val;
+                        if (b.length > 128) {
+                            map.put(k, String.format("byte[%d]", b.length));
+                            return;
+                        }
+                        map.put(k, Arrays.toString(b));
+                        return;
+                    }
+                }
+                map.put(k, val);
+            }
+        });
+        return map;
+    }
+
+    private Object convertValue(Object oldValue, String newValue) {
+        if (oldValue == null) {
+            return newValue;
+        }
+
+        Object newVal;
+        String type = oldValue.getClass().getName();
+        switch (type) {
+            case "java.sql.Date":
+                newVal = DateFormatUtil.stringToDate(newValue);
+                break;
+            case "java.sql.Timestamp":
+                newVal = DateFormatUtil.stringToTimestamp(newValue);
+                break;
+            case "java.lang.Integer":
+            case "java.lang.Short":
+                newVal = NumberUtil.toInt(newValue);
+                break;
+            case "java.lang.Long":
+                newVal = NumberUtil.toLong(newValue);
+                break;
+            case "java.lang.Float":
+                newVal = Float.valueOf(newValue);
+                break;
+            case "java.lang.Double":
+                newVal = Double.valueOf(newValue);
+                break;
+            case "[B":
+                newVal = stringToBytes(newValue);
+                break;
+            default:
+                newVal = newValue;
+        }
+
+        return newVal;
+    }
+
+    private byte[] stringToBytes(String s) {
+        byte[] b = null;
+        if (s.startsWith("[") && s.endsWith("]")) {
+            s = StringUtil.substring(s, 1, s.length() - 1);
+            String[] split = StringUtil.split(s, ",");
+            int length = split.length;
+            b = new byte[length];
+            for (int i = 0; i < length; i++) {
+                b[i] = Byte.valueOf(split[i].trim());
+            }
+        }
+        return b;
+    }
+
     private MetaVo convertMeta2Vo(Meta meta) {
         Mapping mapping = monitor.getMapping(meta.getMappingId());
         Assert.notNull(mapping, "驱动不存在.");

+ 35 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/vo/BinlogColumnVo.java

@@ -0,0 +1,35 @@
+package org.dbsyncer.biz.vo;
+
+public class BinlogColumnVo {
+
+    private String key;
+
+    private Object value;
+
+    private String keyType;
+
+    private String valueType;
+
+    public BinlogColumnVo(String key, Object value, String keyType) {
+        this.key = key;
+        this.value = value;
+        this.keyType = keyType;
+        this.valueType = value == null ? "" : value.getClass().getName();
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public Object getValue() {
+        return value;
+    }
+
+    public String getKeyType() {
+        return keyType;
+    }
+
+    public String getValueType() {
+        return valueType;
+    }
+}

+ 46 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/vo/MessageVo.java

@@ -0,0 +1,46 @@
+package org.dbsyncer.biz.vo;
+
+import java.util.List;
+
+public class MessageVo {
+
+    private String id;
+
+    private String sourceTableName;
+
+    private String targetTableName;
+
+    private List<BinlogColumnVo> columns;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getSourceTableName() {
+        return sourceTableName;
+    }
+
+    public void setSourceTableName(String sourceTableName) {
+        this.sourceTableName = sourceTableName;
+    }
+
+    public String getTargetTableName() {
+        return targetTableName;
+    }
+
+    public void setTargetTableName(String targetTableName) {
+        this.targetTableName = targetTableName;
+    }
+
+    public List<BinlogColumnVo> getColumns() {
+        return columns;
+    }
+
+    public void setColumns(List<BinlogColumnVo> columns) {
+        this.columns = columns;
+    }
+}

+ 18 - 2
dbsyncer-common/src/main/java/org/dbsyncer/common/util/DateFormatUtil.java

@@ -8,8 +8,18 @@ import java.sql.Timestamp;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.time.*;
-import java.time.format.*;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.OffsetTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.format.DateTimeParseException;
+import java.time.format.SignStyle;
+import java.time.format.TextStyle;
 import java.time.temporal.ChronoField;
 import java.time.temporal.TemporalAccessor;
 
@@ -106,6 +116,12 @@ public abstract class DateFormatUtil {
                 return Timestamp.valueOf(LocalDateTime.from(CHINESE_STANDARD_TIME_FORMATTER.parse(s)));
             }
 
+            // 2020-07-12 00:00:00.0
+            if (s.length() == 21) {
+                s = s.substring(0, s.lastIndexOf("."));
+                return Timestamp.valueOf(LocalDateTime.from(CHINESE_STANDARD_TIME_FORMATTER.parse(s)));
+            }
+
             // 2022-07-21T05:35:34.000+0800
             if (s.length() == 28) {
                 return stringToTimestamp(s, GMT_FORMATTER);

+ 11 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java

@@ -7,6 +7,8 @@ import org.dbsyncer.connector.config.WriterBatchConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.schema.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.sql.Types;
 import java.util.LinkedHashMap;
@@ -15,6 +17,8 @@ import java.util.Map;
 
 public abstract class AbstractConnector {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
     protected static final Map<Integer, ValueMapper> valueMappers = new LinkedHashMap<>();
 
     static {
@@ -57,7 +61,7 @@ public abstract class AbstractConnector {
      * @param connectorMapper
      * @param config
      */
-    protected void convertProcessBeforeWriter(ConnectorMapper connectorMapper, WriterBatchConfig config) throws Exception {
+    protected void convertProcessBeforeWriter(ConnectorMapper connectorMapper, WriterBatchConfig config) {
         if (CollectionUtils.isEmpty(config.getFields()) || CollectionUtils.isEmpty(config.getData())) {
             return;
         }
@@ -73,7 +77,12 @@ public abstract class AbstractConnector {
                 final ValueMapper valueMapper = valueMappers.get(f.getType());
                 if (null != valueMapper) {
                     // 当数据类型不同时,转换值类型
-                    row.put(f.getName(), valueMapper.convertValue(connectorMapper, row.get(f.getName())));
+                    try {
+                        row.put(f.getName(), valueMapper.convertValue(connectorMapper, row.get(f.getName())));
+                    } catch (Exception e) {
+                        logger.error("convert value error: ({}, {})", f.getName(), row.get(f.getName()));
+                        throw new ConnectorException(e);
+                    }
                 }
             }
         }

+ 16 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/FilterEnum.java

@@ -71,6 +71,22 @@ public enum FilterEnum {
         this.compareFilter = compareFilter;
     }
 
+    /**
+     * 获取表达式
+     *
+     * @param name
+     * @return
+     * @throws ConnectorException
+     */
+    public static FilterEnum getFilterEnum(String name) throws ConnectorException {
+        for (FilterEnum e : FilterEnum.values()) {
+            if (StringUtil.equals(name, e.getName())) {
+                return e;
+            }
+        }
+        throw new ConnectorException(String.format("FilterEnum name \"%s\" does not exist.", name));
+    }
+
     /**
      * 获取比较器
      *

+ 0 - 9
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Filter.java

@@ -31,15 +31,6 @@ public class Filter {
      */
     private String value;
 
-    public Filter() {
-    }
-
-    public Filter(String name, FilterEnum filterEnum, Object value) {
-        this.name = name;
-        this.filter = filterEnum.getName();
-        this.value = String.valueOf(value);
-    }
-
     public String getName() {
         return name;
     }

+ 3 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/schema/LongVarcharValueMapper.java

@@ -21,6 +21,9 @@ public class LongVarcharValueMapper extends AbstractValueMapper<String> {
         if (val instanceof Date) {
             return String.valueOf(val);
         }
+        if (val instanceof Integer) {
+            return String.valueOf(val);
+        }
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }
 }

+ 4 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Command.java

@@ -1,4 +1,7 @@
 package org.dbsyncer.manager;
 
-public interface Command {
+import org.dbsyncer.manager.command.Persistence;
+import org.dbsyncer.manager.command.Preload;
+
+public interface Command extends Persistence, Preload {
 }

+ 2 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -109,6 +109,8 @@ public interface Manager extends Executor {
     // Data
     Paging queryData(Query query);
 
+    void removeData(String metaId, String messageId);
+
     void clearData(String metaId);
 
     // Log

+ 5 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -225,6 +225,11 @@ public class ManagerFactory implements Manager, ApplicationListener<ClosedEvent>
         return storageService.query(query);
     }
 
+    @Override
+    public void removeData(String metaId, String messageId) {
+        storageService.remove(StorageEnum.DATA, metaId, messageId);
+    }
+
     @Override
     public void clearData(String metaId) {
         Meta meta = getMeta(metaId);

+ 14 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/command/Persistence.java

@@ -0,0 +1,14 @@
+package org.dbsyncer.manager.command;
+
+import org.dbsyncer.manager.ManagerException;
+
+public interface Persistence {
+
+    default boolean addConfig() {
+        throw new ManagerException("Unsupported method addConfig");
+    }
+
+    default boolean editConfig() {
+        throw new ManagerException("Unsupported method editConfig");
+    }
+}

+ 2 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/command/PersistenceCommand.java

@@ -17,11 +17,13 @@ public class PersistenceCommand implements Command {
         this.params = params;
     }
 
+    @Override
     public boolean addConfig() {
         storageService.add(StorageEnum.CONFIG, params);
         return true;
     }
 
+    @Override
     public boolean editConfig() {
         storageService.edit(StorageEnum.CONFIG, params);
         return true;

+ 35 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/command/Preload.java

@@ -0,0 +1,35 @@
+package org.dbsyncer.manager.command;
+
+import org.dbsyncer.manager.ManagerException;
+import org.dbsyncer.parser.model.*;
+
+public interface Preload {
+
+    default SystemConfig parseSystemConfig(){
+        throw new ManagerException("Unsupported method parseSystemConfig");
+    }
+
+    default UserConfig parseUserConfig(){
+        throw new ManagerException("Unsupported method parseUserConfig");
+    }
+
+    default Connector parseConnector(){
+        throw new ManagerException("Unsupported method parseConnector");
+    }
+
+    default Mapping parseMapping(){
+        throw new ManagerException("Unsupported method parseMapping");
+    }
+
+    default TableGroup parseTableGroup(){
+        throw new ManagerException("Unsupported method parseTableGroup");
+    }
+
+    default Meta parseMeta(){
+        throw new ManagerException("Unsupported method parseMeta");
+    }
+
+    default ProjectGroup parseProjectGroup(){
+        throw new ManagerException("Unsupported method parseProjectGroup");
+    }
+}

+ 20 - 8
dbsyncer-manager/src/main/java/org/dbsyncer/manager/command/PreloadCommand.java

@@ -2,8 +2,13 @@ package org.dbsyncer.manager.command;
 
 import org.dbsyncer.manager.Command;
 import org.dbsyncer.parser.Parser;
-import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.parser.model.ProjectGroup;
 import org.dbsyncer.parser.model.SystemConfig;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.UserConfig;
 
 public class PreloadCommand implements Command {
 
@@ -16,31 +21,38 @@ public class PreloadCommand implements Command {
         this.json = json;
     }
 
-    public Object parseSystemConfig() {
+    @Override
+    public SystemConfig parseSystemConfig() {
         return parser.parseObject(json, SystemConfig.class);
     }
 
-    public Object parseUserConfig() {
+    @Override
+    public UserConfig parseUserConfig() {
         return parser.parseObject(json, UserConfig.class);
     }
 
-    public Object parseConnector() {
+    @Override
+    public Connector parseConnector() {
         return parser.parseConnector(json);
     }
 
-    public Object parseMapping() {
+    @Override
+    public Mapping parseMapping() {
         return parser.parseObject(json, Mapping.class);
     }
 
-    public Object parseTableGroup() {
+    @Override
+    public TableGroup parseTableGroup() {
         return parser.parseObject(json, TableGroup.class);
     }
 
-    public Object parseMeta() {
+    @Override
+    public Meta parseMeta() {
         return parser.parseObject(json, Meta.class);
     }
 
-    public Object parseProjectGroup() {
+    @Override
+    public ProjectGroup parseProjectGroup() {
         return parser.parseObject(json, ProjectGroup.class);
     }
 

+ 34 - 17
dbsyncer-manager/src/main/java/org/dbsyncer/manager/enums/CommandEnum.java

@@ -1,11 +1,13 @@
 package org.dbsyncer.manager.enums;
 
 import org.dbsyncer.manager.CommandExecutor;
-import org.dbsyncer.manager.command.PersistenceCommand;
-import org.dbsyncer.manager.command.PreloadCommand;
+import org.dbsyncer.manager.command.Persistence;
+import org.dbsyncer.manager.command.Preload;
 import org.dbsyncer.storage.constant.ConfigConstant;
 
 /**
+ * 枚举命令模式: 持久化和预加载
+ *
  * @author AE86
  * @version 1.0.0
  * @date 2020/04/24 14:19
@@ -15,66 +17,81 @@ public enum CommandEnum {
     /**
      * 添加
      */
-    OPR_ADD("add", (cmd) -> ((PersistenceCommand) cmd).addConfig()),
+    OPR_ADD("add", Persistence::addConfig),
 
     /**
      * 修改
      */
-    OPR_EDIT("edit", (cmd) -> ((PersistenceCommand) cmd).editConfig()),
+    OPR_EDIT("edit", Persistence::editConfig),
 
     /**
      * 预加载SystemConfig
      */
-    PRELOAD_SYSTEM(ConfigConstant.SYSTEM, true, (cmd) -> ((PreloadCommand) cmd).parseSystemConfig()),
+    PRELOAD_SYSTEM(ConfigConstant.SYSTEM, Preload::parseSystemConfig, true),
 
     /**
      * 预加载UserConfig
      */
-    PRELOAD_USER(ConfigConstant.USER, true, (cmd) -> ((PreloadCommand) cmd).parseUserConfig()),
+    PRELOAD_USER(ConfigConstant.USER, Preload::parseUserConfig, true),
 
     /**
      * 预加载Connector
      */
-    PRELOAD_CONNECTOR(ConfigConstant.CONNECTOR, true, (cmd) -> ((PreloadCommand) cmd).parseConnector()),
+    PRELOAD_CONNECTOR(ConfigConstant.CONNECTOR, Preload::parseConnector, true),
 
     /**
      * 预加载Mapping
      */
-    PRELOAD_MAPPING(ConfigConstant.MAPPING, true, (cmd) -> ((PreloadCommand) cmd).parseMapping()),
+    PRELOAD_MAPPING(ConfigConstant.MAPPING, Preload::parseMapping, true),
 
     /**
      * 预加载TableGroup
      */
-    PRELOAD_TABLE_GROUP(ConfigConstant.TABLE_GROUP, true, GroupStrategyEnum.TABLE, (cmd) -> ((PreloadCommand) cmd).parseTableGroup()),
+    PRELOAD_TABLE_GROUP(ConfigConstant.TABLE_GROUP, Preload::parseTableGroup, true, GroupStrategyEnum.TABLE),
 
     /**
      * 预加载Meta
      */
-    PRELOAD_META(ConfigConstant.META, true, (cmd) -> ((PreloadCommand) cmd).parseMeta()),
+    PRELOAD_META(ConfigConstant.META, Preload::parseMeta, true),
 
     /**
      * 预加载ProjectGroup
      */
-    PRELOAD_PROJECT_GROUP(ConfigConstant.PROJECT_GROUP, true, (cmd) -> ((PreloadCommand) cmd).parseProjectGroup());
+    PRELOAD_PROJECT_GROUP(ConfigConstant.PROJECT_GROUP, Preload::parseProjectGroup, true);
 
+    /**
+     * 命令类型
+     */
     private String modelType;
-    private boolean preload;
+
+    /**
+     * 执行器
+     */
     private CommandExecutor commandExecutor;
+
+    /**
+     * 是否预加载
+     */
+    private boolean preload;
+
+    /**
+     * 分组持久化策略
+     */
     private GroupStrategyEnum groupStrategyEnum;
 
     CommandEnum(String modelType, CommandExecutor commandExecutor) {
-        this(modelType, false, commandExecutor);
+        this(modelType, commandExecutor, false);
     }
 
-    CommandEnum(String modelType, boolean preload, CommandExecutor commandExecutor) {
-        this(modelType, preload, GroupStrategyEnum.DEFAULT, commandExecutor);
+    CommandEnum(String modelType, CommandExecutor commandExecutor, boolean preload) {
+        this(modelType, commandExecutor, preload, GroupStrategyEnum.DEFAULT);
     }
 
-    CommandEnum(String modelType, boolean preload, GroupStrategyEnum groupStrategyEnum, CommandExecutor commandExecutor) {
+    CommandEnum(String modelType, CommandExecutor commandExecutor, boolean preload, GroupStrategyEnum groupStrategyEnum) {
         this.modelType = modelType;
+        this.commandExecutor = commandExecutor;
         this.preload = preload;
         this.groupStrategyEnum = groupStrategyEnum;
-        this.commandExecutor = commandExecutor;
     }
 
     public String getModelType() {

+ 10 - 0
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/Monitor.java

@@ -6,9 +6,11 @@ import org.dbsyncer.monitor.model.AppReportMetric;
 import org.dbsyncer.monitor.model.MetricResponse;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * @author AE86
@@ -19,10 +21,18 @@ public interface Monitor {
 
     Mapping getMapping(String mappingId);
 
+    TableGroup getTableGroup(String tableGroupId);
+
     List<Meta> getMetaAll();
 
+    Meta getMeta(String metaId);
+
     Paging queryData(String metaId, int pageNum, int pageSize, String error, String success);
 
+    Map getData(String metaId, String messageId);
+
+    void removeData(String metaId, String messageId);
+
     void clearData(String metaId);
 
     Paging queryLog(int pageNum, int pageSize, String json);

+ 44 - 4
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -4,6 +4,7 @@ import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.manager.Manager;
@@ -18,7 +19,9 @@ import org.dbsyncer.monitor.model.Sample;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
@@ -33,7 +36,10 @@ import javax.annotation.PostConstruct;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
@@ -80,29 +86,63 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
         return manager.getMapping(mappingId);
     }
 
+    @Override
+    public TableGroup getTableGroup(String tableGroupId) {
+        return manager.getTableGroup(tableGroupId);
+    }
+
     @Override
     public List<Meta> getMetaAll() {
         return manager.getMetaAll();
     }
 
+    @Override
+    public Meta getMeta(String metaId) {
+        return manager.getMeta(metaId);
+    }
+
     @Override
     public Paging queryData(String metaId, int pageNum, int pageSize, String error, String success) {
         // 没有驱动
         if (StringUtil.isBlank(metaId)) {
             return new Paging(pageNum, pageSize);
         }
+        Query query = new Query(pageNum, pageSize);
+        Map<String, IndexFieldResolverEnum> fieldResolvers = new LinkedHashMap<>();
+        fieldResolvers.put(ConfigConstant.BINLOG_DATA, IndexFieldResolverEnum.BINARY);
+        query.setIndexFieldResolverMap(fieldResolvers);
 
         // 查询异常信息
-        Query query = new Query(pageNum, pageSize);
         if (StringUtil.isNotBlank(error)) {
             query.addFilter(ConfigConstant.DATA_ERROR, error, true);
         }
         // 查询是否成功, 默认查询失败
-        query.addFilter(ConfigConstant.DATA_SUCCESS, StringUtil.isNotBlank(success) ? success : StorageDataStatusEnum.FAIL.getCode(), false, true);
+        query.addFilter(ConfigConstant.DATA_SUCCESS, StringUtil.isNotBlank(success) ? NumberUtil.toInt(success) : StorageDataStatusEnum.FAIL.getValue());
         query.setMetaId(metaId);
         return manager.queryData(query);
     }
 
+    @Override
+    public Map getData(String metaId, String messageId) {
+        Query query = new Query(1, 1);
+        Map<String, IndexFieldResolverEnum> fieldResolvers = new LinkedHashMap<>();
+        fieldResolvers.put(ConfigConstant.BINLOG_DATA, IndexFieldResolverEnum.BINARY);
+        query.setIndexFieldResolverMap(fieldResolvers);
+        query.addFilter(ConfigConstant.CONFIG_MODEL_ID, messageId);
+        query.setMetaId(metaId);
+        Paging paging = manager.queryData(query);
+        if (!CollectionUtils.isEmpty(paging.getData())) {
+            List<Map> data = (List<Map>) paging.getData();
+            return data.get(0);
+        }
+        return Collections.EMPTY_MAP;
+    }
+
+    @Override
+    public void removeData(String metaId, String messageId) {
+        manager.removeData(metaId, messageId);
+    }
+
     @Override
     public void clearData(String metaId) {
         manager.clearData(metaId);
@@ -196,7 +236,7 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
      * @return
      */
     private long getMappingSuccess(List<Meta> metaAll) {
-        return queryMappingMetricCount(metaAll, (query) -> query.addFilter(ConfigConstant.DATA_SUCCESS, StorageDataStatusEnum.SUCCESS.getCode(), false, true));
+        return queryMappingMetricCount(metaAll, (query) -> query.addFilter(ConfigConstant.DATA_SUCCESS, StorageDataStatusEnum.SUCCESS.getValue()));
     }
 
     /**
@@ -206,7 +246,7 @@ public class MonitorFactory implements Monitor, ScheduledTaskJob {
      * @return
      */
     private long getMappingFail(List<Meta> metaAll) {
-        return queryMappingMetricCount(metaAll, (query) -> query.addFilter(ConfigConstant.DATA_SUCCESS, StorageDataStatusEnum.FAIL.getCode(), false, true));
+        return queryMappingMetricCount(metaAll, (query) -> query.addFilter(ConfigConstant.DATA_SUCCESS, StorageDataStatusEnum.FAIL.getValue()));
     }
 
     /**

+ 3 - 15
dbsyncer-parser/src/main/java/org/dbsyncer/parser/AbstractWriterBinlog.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.parser;
 
-import com.google.protobuf.ByteString;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.model.Field;
@@ -9,8 +8,7 @@ import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.model.Picker;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.WriterRequest;
-import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
-import org.dbsyncer.storage.binlog.proto.BinlogMap;
+import org.dbsyncer.storage.binlog.AbstractBinlogService;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.binlog.proto.EventEnum;
 import org.dbsyncer.storage.util.BinlogMessageUtil;
@@ -22,7 +20,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 
-public abstract class AbstractWriterBinlog extends AbstractBinlogRecorder<WriterRequest> {
+public abstract class AbstractWriterBinlog extends AbstractBinlogService<WriterRequest> {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -34,20 +32,10 @@ public abstract class AbstractWriterBinlog extends AbstractBinlogRecorder<Writer
 
     protected void flush(String tableGroupId, String event, Map<String, Object> data) {
         try {
-            BinlogMap.Builder dataBuilder = BinlogMap.newBuilder();
-            data.forEach((k, v) -> {
-                if (null != v) {
-                    ByteString bytes = BinlogMessageUtil.serializeValue(v);
-                    if (null != bytes) {
-                        dataBuilder.putRow(k, bytes);
-                    }
-                }
-            });
-
             BinlogMessage builder = BinlogMessage.newBuilder()
                     .setTableGroupId(tableGroupId)
                     .setEvent(EventEnum.valueOf(event))
-                    .setData(dataBuilder.build())
+                    .setData(BinlogMessageUtil.toBinlogMap(data))
                     .build();
             super.flush(builder);
         } catch (Exception e) {

+ 13 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -10,6 +10,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import javax.annotation.PostConstruct;
 import java.lang.reflect.ParameterizedType;
 import java.time.Instant;
+import java.time.LocalDateTime;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Queue;
@@ -38,6 +39,8 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     @Autowired
     private BufferActuatorConfig bufferActuatorConfig;
 
+    private LocalDateTime lastBufferWarningTime;
+
     private BlockingQueue<Request> buffer;
 
     private final Lock lock = new ReentrantLock(true);
@@ -100,7 +103,16 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     public boolean offer(BufferRequest request) {
         boolean offer = buffer.offer((Request) request);
         if (!offer) {
-            logger.warn("[{}]缓存队列容量已达上限,建议修改参数[dbsyncer.parser.flush.buffer.actuator.queue-capacity={}], ", this.getClass().getSimpleName(), getQueueCapacity());
+            LocalDateTime now = LocalDateTime.now();
+            if (null == lastBufferWarningTime) {
+                lastBufferWarningTime = now;
+            }
+
+            // 3s前有警告时间
+            if (now.minusSeconds(3).isAfter(lastBufferWarningTime)) {
+                logger.warn("[{}]缓存队列容量已达上限,建议修改参数[dbsyncer.parser.flush.buffer.actuator.queue-capacity={}], ", this.getClass().getSimpleName(), getQueueCapacity());
+                lastBufferWarningTime = now;
+            }
         }
         return offer;
     }

+ 5 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -1,9 +1,7 @@
 package org.dbsyncer.parser.flush.impl;
 
-import com.alibaba.fastjson.JSONException;
 import org.dbsyncer.common.config.IncrementDataConfig;
 import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
-import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.FlushService;
@@ -12,6 +10,7 @@ import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
+import org.dbsyncer.storage.util.BinlogMessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -69,13 +68,13 @@ public class FlushServiceImpl implements FlushService {
             row.put(ConfigConstant.DATA_TARGET_TABLE_NAME, targetTableGroupName);
             row.put(ConfigConstant.DATA_EVENT, event);
             row.put(ConfigConstant.DATA_ERROR, substring(error));
+            row.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
             try {
-                row.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(r));
-            } catch (JSONException e) {
+                byte[] bytes = BinlogMessageUtil.toBinlogMap(r).toByteArray();
+                row.put(ConfigConstant.BINLOG_DATA, bytes);
+            } catch (Exception e) {
                 logger.warn("可能存在Blob或inputStream大文件类型, 无法序列化:{}", r);
-                row.put(ConfigConstant.CONFIG_MODEL_JSON, r.toString());
             }
-            row.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
 
             // 缓存队列满时,打印日志
             if (!storageBufferActuator.offer(new StorageRequest(metaId, row))) {

+ 8 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/AbstractStorageService.java

@@ -5,6 +5,8 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.strategy.Strategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.Assert;
@@ -13,6 +15,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -23,6 +26,8 @@ import java.util.concurrent.locks.ReentrantLock;
  */
 public abstract class AbstractStorageService implements StorageService, DisposableBean {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
     @Autowired
     private Map<String, Strategy> map;
 
@@ -59,11 +64,13 @@ public abstract class AbstractStorageService implements StorageService, Disposab
 
         boolean locked = false;
         try {
-            locked = lock.tryLock();
+            locked = lock.tryLock(3, TimeUnit.SECONDS);
             if (locked) {
                 String sharding = getSharding(query.getType(), query.getMetaId());
                 return select(sharding, query);
             }
+        } catch (InterruptedException e) {
+            logger.warn("tryLock error", e.getLocalizedMessage());
         } finally {
             if (locked) {
                 lock.unlock();

+ 0 - 241
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -1,241 +0,0 @@
-package org.dbsyncer.storage.binlog;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.IntPoint;
-import org.apache.lucene.document.LongPoint;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.BooleanClause;
-import org.apache.lucene.search.BooleanQuery;
-import org.apache.lucene.search.Sort;
-import org.apache.lucene.search.SortField;
-import org.apache.lucene.util.BytesRef;
-import org.dbsyncer.common.config.BinlogRecorderConfig;
-import org.dbsyncer.common.model.Paging;
-import org.dbsyncer.common.scheduled.ScheduledTaskJob;
-import org.dbsyncer.common.scheduled.ScheduledTaskService;
-import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.storage.binlog.proto.BinlogMessage;
-import org.dbsyncer.storage.constant.BinlogConstant;
-import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
-import org.dbsyncer.storage.lucene.Shard;
-import org.dbsyncer.storage.query.Option;
-import org.dbsyncer.storage.util.DocumentUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import javax.annotation.PostConstruct;
-import java.io.File;
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/6/8 0:53
- */
-public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder, DisposableBean {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    private static final String PATH = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar).append("data").append(File.separatorChar).append("data").append(File.separatorChar).toString();
-
-    @Autowired
-    private ScheduledTaskService scheduledTaskService;
-
-    @Autowired
-    private SnowflakeIdWorker snowflakeIdWorker;
-
-    @Autowired
-    private BinlogRecorderConfig binlogRecorderConfig;
-
-    private static Queue<BinlogMessage> queue;
-
-    private static Shard shard;
-
-    private WriterTask writerTask = new WriterTask();
-
-    private ReaderTask readerTask = new ReaderTask();
-
-    @PostConstruct
-    private void init() throws IOException {
-        queue = new LinkedBlockingQueue(binlogRecorderConfig.getQueueCapacity());
-        shard = new Shard(PATH + getTaskName());
-        scheduledTaskService.start(binlogRecorderConfig.getWriterPeriodMillisecond(), writerTask);
-        scheduledTaskService.start(binlogRecorderConfig.getReaderPeriodMillisecond(), readerTask);
-    }
-
-    /**
-     * 反序列化消息
-     *
-     * @param message
-     * @return
-     */
-    protected abstract Message deserialize(String messageId, BinlogMessage message);
-
-    @Override
-    public void flush(BinlogMessage message) {
-        queue.offer(message);
-    }
-
-    @Override
-    public void destroy() throws IOException {
-        shard.close();
-    }
-
-    @Override
-    public void complete(List<String> messageIds) {
-        if (!CollectionUtils.isEmpty(messageIds)) {
-            try {
-                int size = messageIds.size();
-                Term[] terms = new Term[size];
-                for (int i = 0; i < size; i++) {
-                    terms[i] = new Term(BinlogConstant.BINLOG_ID, messageIds.get(i));
-                }
-                shard.deleteBatch(terms);
-            } catch (IOException e) {
-                logger.error(e.getMessage());
-            }
-        }
-    }
-
-    /**
-     * 合并缓存队列任务到磁盘
-     */
-    final class WriterTask implements ScheduledTaskJob {
-
-        @Override
-        public void run() {
-            if (queue.isEmpty()) {
-                return;
-            }
-
-            List<Document> tasks = new ArrayList<>();
-            int count = 0;
-            long now = Instant.now().toEpochMilli();
-            while (!queue.isEmpty() && count < binlogRecorderConfig.getBatchCount()) {
-                BinlogMessage message = queue.poll();
-                if (null != message) {
-                    tasks.add(DocumentUtil.convertBinlog2Doc(String.valueOf(snowflakeIdWorker.nextId()), BinlogConstant.READY, new BytesRef(message.toByteArray()), now));
-                }
-                count++;
-            }
-
-            if (!CollectionUtils.isEmpty(tasks)) {
-                try {
-                    shard.insertBatch(tasks);
-                } catch (IOException e) {
-                    logger.error(e.getMessage());
-                }
-            }
-        }
-    }
-
-    /**
-     * 从磁盘读取日志到任务队列
-     */
-    final class ReaderTask implements ScheduledTaskJob {
-
-        private final Lock lock = new ReentrantLock(true);
-
-        private volatile boolean running;
-
-        @Override
-        public void run() {
-            // 读取任务数 >= 1/2缓存同步队列容量则继续等待
-            if (running || binlogRecorderConfig.getBatchCount() + getQueue().size() >= getQueueCapacity() / 2) {
-                return;
-            }
-
-            final Lock binlogLock = lock;
-            boolean locked = false;
-            try {
-                locked = binlogLock.tryLock();
-                if (locked) {
-                    running = true;
-                    doParse();
-                }
-            } catch (Exception e) {
-                logger.error(e.getMessage());
-            } finally {
-                if (locked) {
-                    running = false;
-                    binlogLock.unlock();
-                }
-            }
-        }
-
-        private void doParse() throws IOException {
-            //  查询[待处理] 或 [处理中 & 处理超时]
-            long maxProcessingSeconds = Timestamp.valueOf(LocalDateTime.now().minusSeconds(binlogRecorderConfig.getMaxProcessingSeconds())).getTime();
-            BooleanQuery query = new BooleanQuery.Builder()
-                    .add(new BooleanQuery.Builder()
-                            .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST)
-                            .build(), BooleanClause.Occur.SHOULD)
-                    .add(new BooleanQuery.Builder()
-                            .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.PROCESSING), BooleanClause.Occur.MUST)
-                            .add(LongPoint.newRangeQuery(BinlogConstant.BINLOG_TIME, Long.MIN_VALUE, maxProcessingSeconds), BooleanClause.Occur.MUST)
-                            .build(), BooleanClause.Occur.SHOULD)
-                    .build();
-            Option option = new Option(query);
-            option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_STATUS, IndexFieldResolverEnum.INT);
-            option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_CONTENT, IndexFieldResolverEnum.BINARY);
-            option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_TIME, IndexFieldResolverEnum.LONG);
-
-            // 优先处理最早记录
-            Sort sort = new Sort(new SortField(BinlogConstant.BINLOG_TIME, SortField.Type.LONG));
-            Paging paging = shard.query(option, 1, binlogRecorderConfig.getBatchCount(), sort);
-            if (CollectionUtils.isEmpty(paging.getData())) {
-                return;
-            }
-
-            List<Map> list = (List<Map>) paging.getData();
-            final int size = list.size();
-            final List<Message> messages = new ArrayList<>(size);
-            final List<Document> updateDocs = new ArrayList<>(size);
-            final Term[] deleteIds = new Term[size];
-            boolean existProcessing = false;
-            for (int i = 0; i < size; i++) {
-                Map row = list.get(i);
-                String id = (String) row.get(BinlogConstant.BINLOG_ID);
-                Integer status = (Integer) row.get(BinlogConstant.BINLOG_STATUS);
-                BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
-                if (BinlogConstant.PROCESSING == status) {
-                    existProcessing = true;
-                }
-                deleteIds[i] = new Term(BinlogConstant.BINLOG_ID, id);
-                String newId = String.valueOf(snowflakeIdWorker.nextId());
-                try {
-                    Message message = deserialize(newId, BinlogMessage.parseFrom(ref.bytes));
-                    if (null != message) {
-                        messages.add(message);
-                        updateDocs.add(DocumentUtil.convertBinlog2Doc(newId, BinlogConstant.PROCESSING, ref, Instant.now().toEpochMilli()));
-                    }
-                } catch (InvalidProtocolBufferException e) {
-                    logger.error(e.getMessage());
-                }
-            }
-            if (existProcessing) {
-                logger.warn("存在超时未处理数据,正在重试,建议优化配置参数[max-processing-seconds={}].", binlogRecorderConfig.getMaxProcessingSeconds());
-            }
-
-            // 如果在更新消息状态的过程中服务被中止,为保证数据的安全性,重启后消息可能会同步2次)
-            shard.insertBatch(updateDocs);
-            shard.deleteBatch(deleteIds);
-            getQueue().addAll(messages);
-        }
-    }
-
-}

+ 43 - 29
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogService.java

@@ -9,14 +9,16 @@ import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
-import org.dbsyncer.connector.model.Filter;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
-import org.dbsyncer.storage.query.BooleanQuery;
+import org.dbsyncer.storage.query.BooleanFilter;
 import org.dbsyncer.storage.query.Query;
+import org.dbsyncer.storage.query.filter.IntFilter;
+import org.dbsyncer.storage.query.filter.LongFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -95,27 +97,31 @@ public abstract class AbstractBinlogService<Message> implements BinlogRecorder {
                 return;
             }
 
-            List<Map> tasks = new ArrayList<>();
-            int count = 0;
-            long now = Instant.now().toEpochMilli();
-            Map task = null;
-            while (!queue.isEmpty() && count < binlogRecorderConfig.getBatchCount()) {
-                BinlogMessage message = queue.poll();
-                if (null != message) {
-                    task = new HashMap();
-                    task.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
-                    task.put(ConfigConstant.BINLOG_STATUS, BinlogConstant.READY);
-                    task.put(ConfigConstant.CONFIG_MODEL_JSON, message.toByteArray());
-                    task.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
-                    tasks.add(task);
+            try {
+                List<Map> tasks = new ArrayList<>();
+                int count = 0;
+                long now = Instant.now().toEpochMilli();
+                Map task = null;
+                while (!queue.isEmpty() && count < binlogRecorderConfig.getBatchCount()) {
+                    BinlogMessage message = queue.poll();
+                    if (null != message) {
+                        task = new HashMap();
+                        task.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
+                        task.put(ConfigConstant.BINLOG_STATUS, BinlogConstant.READY);
+                        task.put(ConfigConstant.BINLOG_DATA, message.toByteArray());
+                        task.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
+                        tasks.add(task);
+                    }
+                    count++;
                 }
-                count++;
-            }
 
-            if (!CollectionUtils.isEmpty(tasks)) {
-                storageService.addBatch(StorageEnum.BINLOG, tasks);
+                if (!CollectionUtils.isEmpty(tasks)) {
+                    storageService.addBatch(StorageEnum.BINLOG, tasks);
+                }
+                tasks = null;
+            } catch (Exception e) {
+                logger.error(e.getMessage());
             }
-            tasks = null;
         }
     }
 
@@ -154,17 +160,25 @@ public abstract class AbstractBinlogService<Message> implements BinlogRecorder {
         }
 
         private void doParse() {
-            //  TODO 查询[待处理] 或 [处理中 & 处理超时]
+            // 查询[待处理] 或 [处理中 & 处理超时] // TODO 待优化
             Query query = new Query();
             query.setType(StorageEnum.BINLOG);
-            Filter ready = new Filter(ConfigConstant.BINLOG_STATUS, FilterEnum.EQUAL, BinlogConstant.READY);
-            Filter processing = new Filter(ConfigConstant.BINLOG_STATUS, FilterEnum.EQUAL, BinlogConstant.PROCESSING);
+
+            IntFilter ready = new IntFilter(ConfigConstant.BINLOG_STATUS, FilterEnum.EQUAL, BinlogConstant.READY);
+            IntFilter processing = new IntFilter(ConfigConstant.BINLOG_STATUS, FilterEnum.EQUAL, BinlogConstant.PROCESSING);
             long maxProcessingSeconds = Timestamp.valueOf(LocalDateTime.now().minusSeconds(binlogRecorderConfig.getMaxProcessingSeconds())).getTime();
-            Filter processingTimeout = new Filter(ConfigConstant.CONFIG_MODEL_CREATE_TIME, FilterEnum.LT, maxProcessingSeconds);
-            BooleanQuery booleanQuery = new BooleanQuery()
-                    .add(new BooleanQuery().add(ready), OperationEnum.OR)
-                    .add(new BooleanQuery().add(processing).add(processingTimeout));
-            query.setBooleanQuery(booleanQuery);
+            LongFilter processingTimeout = new LongFilter(ConfigConstant.CONFIG_MODEL_CREATE_TIME, FilterEnum.LT, maxProcessingSeconds);
+            BooleanFilter booleanFilter = new BooleanFilter()
+                    .add(new BooleanFilter().add(ready), OperationEnum.OR)
+                    .add(new BooleanFilter().add(processing).add(processingTimeout), OperationEnum.OR);
+            query.setBooleanFilter(booleanFilter);
+
+            // 指定返回值类型
+            Map<String, IndexFieldResolverEnum> fieldResolvers = new LinkedHashMap<>();
+            fieldResolvers.put(ConfigConstant.BINLOG_STATUS, IndexFieldResolverEnum.INT);
+            fieldResolvers.put(ConfigConstant.BINLOG_DATA, IndexFieldResolverEnum.BINARY);
+            fieldResolvers.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, IndexFieldResolverEnum.LONG);
+            query.setIndexFieldResolverMap(fieldResolvers);
             query.setPageNum(1);
             query.setPageSize(binlogRecorderConfig.getBatchCount());
             Paging paging = storageService.query(query);
@@ -181,7 +195,7 @@ public abstract class AbstractBinlogService<Message> implements BinlogRecorder {
                 Map row = list.get(i);
                 String id = (String) row.get(ConfigConstant.CONFIG_MODEL_ID);
                 Integer status = (Integer) row.get(ConfigConstant.BINLOG_STATUS);
-                byte[] bytes = (byte[]) row.get(ConfigConstant.CONFIG_MODEL_JSON);
+                byte[] bytes = (byte[]) row.get(ConfigConstant.BINLOG_DATA);
                 if (BinlogConstant.PROCESSING == status) {
                     existProcessing = true;
                 }

+ 18 - 23
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/BinlogConstant.java

@@ -1,24 +1,19 @@
-package org.dbsyncer.storage.constant;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/7/13 22:14
- */
-public class BinlogConstant {
-
-    /**
-     * 属性
-     */
-    public static final String BINLOG_ID = "id";
-    public static final String BINLOG_STATUS = "s";
-    public static final String BINLOG_CONTENT = "c";
-    public static final String BINLOG_TIME = "t";
-
-    /**
-     * 状态类型
-     */
-    public static final int READY = 0;
-    public static final int PROCESSING = 1;
-
+package org.dbsyncer.storage.constant;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/7/13 22:14
+ */
+public class BinlogConstant {
+
+    /**
+     * 待同步
+     */
+    public static final int READY = 0;
+    /**
+     * 同步中
+     */
+    public static final int PROCESSING = 1;
+
 }

+ 1 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java

@@ -41,5 +41,6 @@ public class ConfigConstant {
      * Binlog
      */
     public static final String BINLOG_STATUS = "status";
+    public static final String BINLOG_DATA = "data";
 
 }

+ 1 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/IndexFieldResolverEnum.java

@@ -10,7 +10,7 @@ public enum IndexFieldResolverEnum {
 
     STRING((f) -> f.stringValue()),
 
-    BINARY((f) -> f.binaryValue());
+    BINARY((f) -> f.binaryValue().bytes);
 
     private IndexFieldResolver indexFieldResolver;
 

+ 98 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Option.java

@@ -0,0 +1,98 @@
+package org.dbsyncer.storage.lucene;
+
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.highlight.Highlighter;
+import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
+import org.dbsyncer.storage.lucene.IndexFieldResolver;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-06-01 22:57
+ */
+public class Option {
+
+    private Query query;
+
+    private Set<String> highLightKeys;
+
+    private boolean enableHighLightSearch;
+
+    private Highlighter highlighter = null;
+
+    /**
+     * 只查总数
+     */
+    private boolean queryTotal;
+
+    /**
+     * 返回值转换器
+     */
+    private Map<String, IndexFieldResolverEnum> indexFieldResolverMap = new ConcurrentHashMap<>();
+
+    /**
+     * 指定返回的值类型
+     *
+     * @param name
+     * @return
+     */
+    public IndexFieldResolver getIndexFieldResolver(String name) {
+        IndexFieldResolverEnum indexFieldResolverEnum = indexFieldResolverMap.get(name);
+        if (null != indexFieldResolverEnum) {
+            return indexFieldResolverEnum.getIndexFieldResolver();
+        }
+        return IndexFieldResolverEnum.STRING.getIndexFieldResolver();
+    }
+
+    public Query getQuery() {
+        return query;
+    }
+
+    public void setQuery(Query query) {
+        this.query = query;
+    }
+
+    public Set<String> getHighLightKeys() {
+        return highLightKeys;
+    }
+
+    public void setHighLightKeys(Set<String> highLightKeys) {
+        this.highLightKeys = highLightKeys;
+    }
+
+    public boolean isEnableHighLightSearch() {
+        return enableHighLightSearch;
+    }
+
+    public void setEnableHighLightSearch(boolean enableHighLightSearch) {
+        this.enableHighLightSearch = enableHighLightSearch;
+    }
+
+    public Highlighter getHighlighter() {
+        return highlighter;
+    }
+
+    public void setHighlighter(Highlighter highlighter) {
+        this.highlighter = highlighter;
+    }
+
+    public boolean isQueryTotal() {
+        return queryTotal;
+    }
+
+    public void setQueryTotal(boolean queryTotal) {
+        this.queryTotal = queryTotal;
+    }
+
+    public Map<String, IndexFieldResolverEnum> getIndexFieldResolverMap() {
+        return indexFieldResolverMap;
+    }
+
+    public void setIndexFieldResolverMap(Map<String, IndexFieldResolverEnum> indexFieldResolverMap) {
+        this.indexFieldResolverMap = indexFieldResolverMap;
+    }
+}

+ 2 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -12,7 +12,6 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.storage.StorageException;
-import org.dbsyncer.storage.query.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -131,7 +130,7 @@ public class Shard {
         final TopDocs topDocs = getTopDocs(searcher, option.getQuery(), MAX_SIZE, sort);
         Paging paging = new Paging(pageNum, pageSize);
         paging.setTotal(topDocs.totalHits);
-        if(option.isQueryTotal()){
+        if (option.isQueryTotal()) {
             return paging;
         }
 
@@ -197,7 +196,7 @@ public class Shard {
                 }
 
                 // 解析value类型
-                r.put(f.name(), option.getFieldResolver(f.name()).getValue(f));
+                r.put(f.name(), option.getIndexFieldResolver(f.name()).getValue(f));
             }
             list.add(r);
         }

+ 33 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/AbstractFilter.java

@@ -0,0 +1,33 @@
+package org.dbsyncer.storage.query;
+
+import org.apache.lucene.search.Query;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.connector.model.Filter;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/11/17 23:56
+ */
+public abstract class AbstractFilter extends Filter {
+    private boolean enableHighLightSearch;
+
+    public AbstractFilter(String name, FilterEnum filterEnum, Object value) {
+        this(name, filterEnum, value, false);
+    }
+
+    public AbstractFilter(String name, FilterEnum filterEnum, Object value, boolean enableHighLightSearch) {
+        setName(name);
+        setFilter(filterEnum.getName());
+        setValue(String.valueOf(value));
+        this.enableHighLightSearch = enableHighLightSearch;
+    }
+
+    public abstract Query newEqual();
+
+    public abstract Query newLessThan();
+
+    public boolean isEnableHighLightSearch() {
+        return enableHighLightSearch;
+    }
+}

+ 13 - 14
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/BooleanQuery.java → dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/BooleanFilter.java

@@ -2,45 +2,44 @@ package org.dbsyncer.storage.query;
 
 
 import org.dbsyncer.connector.enums.OperationEnum;
-import org.dbsyncer.connector.model.Filter;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public class BooleanQuery {
+public class BooleanFilter {
 
-    private final List<BooleanQuery> clauses = new ArrayList<>();
+    private final List<BooleanFilter> clauses = new ArrayList<>();
 
-    private final List<Filter> filters = new ArrayList<>();
+    private final List<AbstractFilter> filters = new ArrayList<>();
 
     private OperationEnum operationEnum;
 
-    public BooleanQuery add(BooleanQuery booleanQuery) {
-        return add(booleanQuery, OperationEnum.AND);
-    }
-
-    public BooleanQuery add(BooleanQuery booleanQuery, OperationEnum operationEnum) {
-        clauses.add(booleanQuery);
-        booleanQuery.setOperationEnum(operationEnum);
+    public BooleanFilter add(BooleanFilter booleanFilter, OperationEnum operationEnum) {
+        clauses.add(booleanFilter);
+        booleanFilter.setOperationEnum(operationEnum);
         return this;
     }
 
-    public BooleanQuery add(Filter filter) {
+    public BooleanFilter add(AbstractFilter filter) {
         filter.setOperation(OperationEnum.AND.getName());
         filters.add(filter);
         return this;
     }
 
-    public BooleanQuery or(Filter filter) {
+    public BooleanFilter or(AbstractFilter filter) {
         filter.setOperation(OperationEnum.OR.getName());
         filters.add(filter);
         return this;
     }
 
-    public List<Filter> getFilters() {
+    public List<AbstractFilter> getFilters() {
         return filters;
     }
 
+    public List<BooleanFilter> getClauses() {
+        return clauses;
+    }
+
     public OperationEnum getOperationEnum() {
         return operationEnum;
     }

+ 0 - 93
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Option.java

@@ -1,93 +0,0 @@
-package org.dbsyncer.storage.query;
-
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.highlight.Highlighter;
-import org.apache.lucene.search.highlight.QueryScorer;
-import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
-import org.dbsyncer.storage.lucene.IndexFieldResolver;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * @version 1.0.0
- * @Author AE86
- * @Date 2020-06-01 22:57
- */
-public class Option {
-
-    private Query query;
-
-    private Set<String> highLightKeys;
-
-    private boolean enableHighLightSearch;
-
-    private Highlighter highlighter = null;
-
-    /**
-     * 只查总数
-     */
-    private boolean queryTotal;
-
-    private final Map<String, IndexFieldResolverEnum> fieldResolvers = new LinkedHashMap<>();
-
-    public Option(Query query) {
-        this.query = query;
-    }
-
-    public Option(Query query, boolean queryTotal, List<Param> params) {
-        this.query = query;
-        this.queryTotal = queryTotal;
-        if (!CollectionUtils.isEmpty(params)) {
-            this.highLightKeys = params.stream()
-                    .filter(p -> p.isHighlighter())
-                    .map(p -> p.getKey())
-                    .collect(Collectors.toSet());
-        }
-        if (!CollectionUtils.isEmpty(highLightKeys)) {
-            this.enableHighLightSearch = true;
-            SimpleHTMLFormatter formatter = new SimpleHTMLFormatter("<span style='color:red'>", "</span>");
-            highlighter = new Highlighter(formatter, new QueryScorer(query));
-        }
-    }
-
-    public IndexFieldResolver getFieldResolver(String name){
-        if(fieldResolvers.containsKey(name)){
-            return fieldResolvers.get(name).getIndexFieldResolver();
-        }
-        return IndexFieldResolverEnum.STRING.getIndexFieldResolver();
-    }
-
-    public void addIndexFieldResolverEnum(String name, IndexFieldResolverEnum fieldResolver){
-        fieldResolvers.putIfAbsent(name, fieldResolver);
-    }
-
-    public Query getQuery() {
-        return query;
-    }
-
-    public boolean isQueryTotal() {
-        return queryTotal;
-    }
-
-    public void setQueryTotal(boolean queryTotal) {
-        this.queryTotal = queryTotal;
-    }
-
-    public Set<String> getHighLightKeys() {
-        return highLightKeys;
-    }
-
-    public boolean isEnableHighLightSearch() {
-        return enableHighLightSearch;
-    }
-
-    public Highlighter getHighlighter() {
-        return highlighter;
-    }
-}

+ 0 - 36
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Param.java

@@ -1,36 +0,0 @@
-package org.dbsyncer.storage.query;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2019/11/17 23:56
- */
-public class Param {
-    private String key;
-    private String value;
-    private boolean highlighter;
-    private boolean number;
-
-    public Param(String key, String value, boolean highlighter, boolean number) {
-        this.key = key;
-        this.value = value;
-        this.highlighter = highlighter;
-        this.number = number;
-    }
-
-    public String getKey() {
-        return key;
-    }
-
-    public String getValue() {
-        return value;
-    }
-
-    public boolean isHighlighter() {
-        return highlighter;
-    }
-
-    public boolean isNumber() {
-        return number;
-    }
-}

+ 30 - 34
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Query.java

@@ -1,9 +1,12 @@
 package org.dbsyncer.storage.query;
 
+import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
+import org.dbsyncer.storage.query.filter.IntFilter;
+import org.dbsyncer.storage.query.filter.StringFilter;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author AE86
@@ -19,44 +22,40 @@ public class Query {
 
     private String metaId;
 
-    private List<Param> params;
+    private BooleanFilter booleanFilter = new BooleanFilter();
 
-    private BooleanQuery booleanQuery;
+    /**
+     * 查询应用性能,不用排序查询,只用查询总量即可
+     */
+    private boolean queryTotal;
 
     private int pageNum = 1;
 
     private int pageSize = 20;
 
-    private boolean enableHighLightSearch;
-
     /**
-     * 查询应用性能,不用排序查询,只用查询总量即可
+     * 返回值转换器,限Disk使用
      */
-    private boolean queryTotal;
+    private Map<String, IndexFieldResolverEnum> indexFieldResolverMap = new ConcurrentHashMap<>();
 
     public Query() {
-        this.params = new ArrayList<>();
     }
 
     public Query(int pageNum, int pageSize) {
         this.pageNum = pageNum;
         this.pageSize = pageSize;
-        this.params = new ArrayList<>();
     }
 
-    public void addFilter(String key, String value) {
-        addFilter(key, value, false, false);
+    public void addFilter(String name, String value) {
+        booleanFilter.add(new StringFilter(name, value, false));
     }
 
-    public void addFilter(String key, String value, boolean highlighter) {
-        addFilter(key, value, highlighter, false);
+    public void addFilter(String name, String value, boolean enableHighLightSearch) {
+        booleanFilter.add(new StringFilter(name, value, enableHighLightSearch));
     }
 
-    public void addFilter(String key, String value, boolean highlighter, boolean number) {
-        params.add(new Param(key, value, highlighter, number));
-        if (highlighter) {
-            enableHighLightSearch = highlighter;
-        }
+    public void addFilter(String name, int value) {
+        booleanFilter.add(new IntFilter(name, value));
     }
 
     public StorageEnum getType() {
@@ -75,20 +74,20 @@ public class Query {
         this.metaId = metaId;
     }
 
-    public List<Param> getParams() {
-        return params;
+    public BooleanFilter getBooleanFilter() {
+        return booleanFilter;
     }
 
-    public void setParams(List<Param> params) {
-        this.params = params;
+    public void setBooleanFilter(BooleanFilter booleanFilter) {
+        this.booleanFilter = booleanFilter;
     }
 
-    public BooleanQuery getBooleanQuery() {
-        return booleanQuery;
+    public boolean isQueryTotal() {
+        return queryTotal;
     }
 
-    public void setBooleanQuery(BooleanQuery booleanQuery) {
-        this.booleanQuery = booleanQuery;
+    public void setQueryTotal(boolean queryTotal) {
+        this.queryTotal = queryTotal;
     }
 
     public int getPageNum() {
@@ -107,15 +106,12 @@ public class Query {
         this.pageSize = pageSize;
     }
 
-    public boolean isEnableHighLightSearch() {
-        return enableHighLightSearch;
+    public Map<String, IndexFieldResolverEnum> getIndexFieldResolverMap() {
+        return indexFieldResolverMap;
     }
 
-    public boolean isQueryTotal() {
-        return queryTotal;
+    public void setIndexFieldResolverMap(Map<String, IndexFieldResolverEnum> indexFieldResolverMap) {
+        this.indexFieldResolverMap = indexFieldResolverMap;
     }
 
-    public void setQueryTotal(boolean queryTotal) {
-        this.queryTotal = queryTotal;
-    }
 }

+ 28 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/filter/IntFilter.java

@@ -0,0 +1,28 @@
+package org.dbsyncer.storage.query.filter;
+
+import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.search.Query;
+import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.storage.query.AbstractFilter;
+
+public class IntFilter extends AbstractFilter {
+
+    public IntFilter(String name, int value) {
+        super(name, FilterEnum.EQUAL, value);
+    }
+
+    public IntFilter(String name, FilterEnum filterEnum, int value) {
+        super(name, filterEnum, value);
+    }
+
+    @Override
+    public Query newEqual() {
+        return IntPoint.newSetQuery(getName(), NumberUtil.toInt(getValue()));
+    }
+
+    @Override
+    public Query newLessThan() {
+        return IntPoint.newRangeQuery(getName(), Integer.MIN_VALUE, NumberUtil.toInt(getValue()));
+    }
+}

+ 24 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/filter/LongFilter.java

@@ -0,0 +1,24 @@
+package org.dbsyncer.storage.query.filter;
+
+import org.apache.lucene.document.LongPoint;
+import org.apache.lucene.search.Query;
+import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.storage.query.AbstractFilter;
+
+public class LongFilter extends AbstractFilter {
+
+    public LongFilter(String name, FilterEnum filterEnum, long value) {
+        super(name, filterEnum, value);
+    }
+
+    @Override
+    public Query newEqual() {
+        return LongPoint.newSetQuery(getName(), NumberUtil.toLong(getValue()));
+    }
+
+    @Override
+    public Query newLessThan() {
+        return LongPoint.newRangeQuery(getName(), Long.MIN_VALUE, NumberUtil.toLong(getValue()));
+    }
+}

+ 25 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/filter/StringFilter.java

@@ -0,0 +1,25 @@
+package org.dbsyncer.storage.query.filter;
+
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.storage.StorageException;
+import org.dbsyncer.storage.query.AbstractFilter;
+
+public class StringFilter extends AbstractFilter {
+
+    public StringFilter(String name, String value, boolean enableHighLightSearch) {
+        super(name, FilterEnum.EQUAL, value, enableHighLightSearch);
+    }
+
+    @Override
+    public Query newEqual() {
+        return new TermQuery(new Term(getName(), getValue()));
+    }
+
+    @Override
+    public Query newLessThan() {
+        throw new StorageException("Unsupported method newLessThan.");
+    }
+}

+ 75 - 21
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -1,28 +1,30 @@
 package org.dbsyncer.storage.support;
 
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.IntPoint;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.*;
+import org.apache.lucene.search.highlight.Highlighter;
+import org.apache.lucene.search.highlight.QueryScorer;
+import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.storage.AbstractStorageService;
 import org.dbsyncer.storage.StorageException;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageEnum;
+import org.dbsyncer.storage.lucene.Option;
 import org.dbsyncer.storage.lucene.Shard;
-import org.dbsyncer.storage.query.Option;
-import org.dbsyncer.storage.query.Param;
+import org.dbsyncer.storage.query.AbstractFilter;
+import org.dbsyncer.storage.query.BooleanFilter;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.util.DocumentUtil;
 
 import javax.annotation.PostConstruct;
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -56,26 +58,40 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
             Shard shard = getShard(sharding);
             int pageNum = query.getPageNum() <= 0 ? 1 : query.getPageNum();
             int pageSize = query.getPageSize() <= 0 ? 20 : query.getPageSize();
-            boolean queryTotal = query.isQueryTotal();
             // 根据修改时间 > 创建时间排序
             Sort sort = new Sort(new SortField(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, SortField.Type.LONG, true),
                     new SortField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, SortField.Type.LONG, true));
+            Option option = new Option();
+            option.setQueryTotal(query.isQueryTotal());
+            option.setIndexFieldResolverMap(query.getIndexFieldResolverMap());
             // 设置参数
-            List<Param> params = query.getParams();
-            if (!CollectionUtils.isEmpty(params)) {
-                BooleanQuery.Builder builder = new BooleanQuery.Builder();
-                params.forEach(p -> {
-                    if (p.isNumber()) {
-                        builder.add(IntPoint.newSetQuery(p.getKey(), NumberUtil.toInt(p.getValue())), BooleanClause.Occur.MUST);
-                    } else {
-                        builder.add(new TermQuery(new Term(p.getKey(), p.getValue())), BooleanClause.Occur.MUST);
-                    }
-                });
-                BooleanQuery q = builder.build();
-                return shard.query(new Option(q, queryTotal, params), pageNum, pageSize, sort);
+            BooleanFilter baseQuery = query.getBooleanFilter();
+            List<AbstractFilter> filters = baseQuery.getFilters();
+            List<BooleanFilter> clauses = baseQuery.getClauses();
+            if (CollectionUtils.isEmpty(clauses) && CollectionUtils.isEmpty(filters)) {
+                option.setQuery(new MatchAllDocsQuery());
+                return shard.query(option, pageNum, pageSize, sort);
             }
 
-            return shard.query(new Option(new MatchAllDocsQuery(), queryTotal, null), pageNum, pageSize, sort);
+            Set<String> highLightKeys = new HashSet<>();
+            BooleanQuery build = null;
+            if (!CollectionUtils.isEmpty(filters)) {
+                build = buildQueryWithFilters(filters, highLightKeys);
+            } else {
+                build = buildQueryWithBooleanFilters(clauses, highLightKeys);
+            }
+
+            option.setQuery(build);
+
+            // 高亮查询
+            if (!CollectionUtils.isEmpty(highLightKeys)) {
+                option.setHighLightKeys(highLightKeys);
+                option.setEnableHighLightSearch(true);
+                SimpleHTMLFormatter formatter = new SimpleHTMLFormatter("<span style='color:red'>", "</span>");
+                option.setHighlighter(new Highlighter(formatter, new QueryScorer(build)));
+            }
+
+            return shard.query(option, pageNum, pageSize, sort);
         } catch (IOException e) {
             throw new StorageException(e);
         }
@@ -127,6 +143,43 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
         shards.clear();
     }
 
+    private BooleanQuery buildQueryWithFilters(List<AbstractFilter> filters, Set<String> highLightKeys) {
+        BooleanQuery.Builder builder = new BooleanQuery.Builder();
+        filters.forEach(p -> {
+            FilterEnum filterEnum = FilterEnum.getFilterEnum(p.getFilter());
+            BooleanClause.Occur occur = getOccur(p.getOperation());
+            switch (filterEnum) {
+                case EQUAL:
+                case LIKE:
+                    builder.add(p.newEqual(), occur);
+                    break;
+                case LT:
+                    builder.add(p.newLessThan(), occur);
+                    break;
+            }
+
+            if (p.isEnableHighLightSearch()) {
+                highLightKeys.add(p.getName());
+            }
+        });
+        return builder.build();
+    }
+
+    private BooleanQuery buildQueryWithBooleanFilters(List<BooleanFilter> clauses, Set<String> highLightKeys) {
+        BooleanQuery.Builder builder = new BooleanQuery.Builder();
+        clauses.forEach(c -> {
+            if (!CollectionUtils.isEmpty(c.getFilters())) {
+                BooleanQuery cBuild = buildQueryWithFilters(c.getFilters(), highLightKeys);
+                builder.add(cBuild, getOccur(c.getOperationEnum().getName()));
+            }
+        });
+        return builder.build();
+    }
+
+    private BooleanClause.Occur getOccur(String operation) {
+        return OperationEnum.isAnd(operation) ? BooleanClause.Occur.MUST : BooleanClause.Occur.SHOULD;
+    }
+
     private Term getPrimaryKeyTerm(Document doc) {
         return new Term(ConfigConstant.CONFIG_MODEL_ID, doc.getField(ConfigConstant.CONFIG_MODEL_ID).stringValue());
     }
@@ -183,4 +236,5 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
     interface ExecuteMapper {
         void apply(Shard shard, List<Document> docs) throws IOException;
     }
+
 }

+ 102 - 36
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -11,13 +11,15 @@ import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.Database;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.enums.ConnectorEnum;
+import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.util.DatabaseUtil;
 import org.dbsyncer.storage.AbstractStorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageEnum;
-import org.dbsyncer.storage.query.Param;
+import org.dbsyncer.storage.query.AbstractFilter;
+import org.dbsyncer.storage.query.BooleanFilter;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.util.UnderlineToCamelUtils;
 import org.slf4j.Logger;
@@ -41,7 +43,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -111,10 +112,11 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             return paging;
         }
 
+        List<AbstractFilter> highLightKeys = new ArrayList<>();
         List<Object> queryArgs = new ArrayList<>();
-        String querySql = buildQuerySql(query, executor, queryArgs);
+        String querySql = buildQuerySql(query, executor, queryArgs, highLightKeys);
         List<Map<String, Object>> data = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, queryArgs.toArray()));
-        replaceHighLight(query, data);
+        replaceHighLight(highLightKeys, data);
         paging.setData(data);
         return paging;
     }
@@ -217,9 +219,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         return args.toArray();
     }
 
-    private String buildQuerySql(Query query, Executor executor, List<Object> args) {
+    private String buildQuerySql(Query query, Executor executor, List<Object> args, List<AbstractFilter> highLightKeys) {
         StringBuilder sql = new StringBuilder(executor.getQuery());
-        buildQuerySqlWithParams(query, args, sql);
+        buildQuerySqlWithParams(query, args, sql, highLightKeys);
         // order by updateTime,createTime desc
         sql.append(" order by ");
         if (executor.isOrderByUpdateTime()) {
@@ -236,26 +238,85 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         StringBuilder queryCount = new StringBuilder();
         queryCount.append("SELECT COUNT(1) FROM (");
         StringBuilder sql = new StringBuilder("SELECT 1 FROM `").append(executor.getTable()).append("`");
-        buildQuerySqlWithParams(query, args, sql);
+        buildQuerySqlWithParams(query, args, sql, null);
         queryCount.append(sql);
         queryCount.append(" GROUP BY `ID`) DBSYNCER_T");
         return queryCount.toString();
     }
 
-    private void buildQuerySqlWithParams(Query query, List<Object> args, StringBuilder sql) {
-        List<Param> params = query.getParams();
-        if (!CollectionUtils.isEmpty(params)) {
-            sql.append(" WHERE ");
-            AtomicBoolean flag = new AtomicBoolean();
-            params.forEach(p -> {
-                if (flag.get()) {
-                    sql.append(" AND ");
-                }
-                // name=?
-                sql.append(p.getKey()).append(p.isHighlighter() ? " LIKE ?" : "=?");
-                args.add(p.isHighlighter() ? new StringBuilder("%").append(p.getValue()).append("%") : p.getValue());
-                flag.set(true);
-            });
+    private void buildQuerySqlWithParams(Query query, List<Object> args, StringBuilder sql, List<AbstractFilter> highLightKeys) {
+        BooleanFilter baseQuery = query.getBooleanFilter();
+        List<BooleanFilter> clauses = baseQuery.getClauses();
+        List<AbstractFilter> filters = baseQuery.getFilters();
+        if (CollectionUtils.isEmpty(clauses) && CollectionUtils.isEmpty(filters)) {
+            return;
+        }
+
+        sql.append(" WHERE ");
+        if (!CollectionUtils.isEmpty(filters)) {
+            buildQuerySqlWithFilters(filters, args, sql, highLightKeys);
+            return;
+        }
+
+        buildQuerySqlWithBooleanFilters(clauses, args, sql, highLightKeys);
+    }
+
+    private void buildQuerySqlWithFilters(List<AbstractFilter> filters, List<Object> args, StringBuilder sql, List<AbstractFilter> highLightKeys) {
+        // 过滤值
+        int size = filters.size();
+        for (int i = 0; i < size; i++) {
+            AbstractFilter p = filters.get(i);
+
+            if (i > 0) {
+                sql.append(" ").append(p.getOperation().toUpperCase()).append(" ");
+            }
+
+            FilterEnum filterEnum = FilterEnum.getFilterEnum(p.getFilter());
+            String name = UnderlineToCamelUtils.camelToUnderline(p.getName());
+            switch (filterEnum) {
+                case EQUAL:
+                    sql.append(name).append(" = ?");
+                    args.add(p.getValue());
+                    break;
+                case LIKE:
+                    sql.append(name).append(" LIKE ?");
+                    args.add(new StringBuilder("%").append(p.getValue()).append("%"));
+                    break;
+                case LT:
+                    sql.append(name).append(" < ?");
+                    args.add(p.getValue());
+                    break;
+            }
+            if (null != highLightKeys && p.isEnableHighLightSearch()) {
+                highLightKeys.add(p);
+            }
+        }
+    }
+
+    private void buildQuerySqlWithBooleanFilters(List<BooleanFilter> clauses, List<Object> args, StringBuilder sql, List<AbstractFilter> highLightKeys) {
+        // 解析查询
+        int size = clauses.size();
+        for (int i = 0; i < size; i++) {
+            BooleanFilter booleanFilter = clauses.get(i);
+            List<AbstractFilter> filters = booleanFilter.getFilters();
+            if (CollectionUtils.isEmpty(filters)) {
+                continue;
+            }
+
+            // 组合条件
+            if (i > 0) {
+                sql.append(" ").append(booleanFilter.getOperationEnum().name().toUpperCase()).append(" ");
+            }
+
+            if (size > 0) {
+                sql.append("(");
+            }
+
+            buildQuerySqlWithFilters(filters, args, sql, highLightKeys);
+
+            if (size > 0) {
+                sql.append(")");
+            }
         }
     }
 
@@ -271,16 +332,15 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         if (CollectionUtils.isEmpty(tables)) {
             return;
         }
-        final String queryColumnCount = "SELECT count(*) FROM information_schema.columns WHERE table_name = '%s' and column_name = 'TABLE_GROUP_ID'";
+        final String queryColumnCount = "SELECT count(*) FROM information_schema.columns WHERE table_name = '%s' and column_name = 'DATA'";
         tables.forEach(table -> {
             try {
                 String query = String.format(queryColumnCount, table);
                 // 是否已升级
                 int count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(query, Integer.class));
                 if (count == 0) {
-                    String ddl = readSql(UPGRADE_SQL, true, table);
-                    executeSql(ddl);
-                    logger.info(ddl);
+                    String ddlSql = readSql(UPGRADE_SQL, false, table);
+                    Stream.of(StringUtil.split(ddlSql, ";")).forEach(ddl -> executeSql(ddl));
                 }
             } catch (Exception e) {
                 if (e.getCause() instanceof SQLSyntaxErrorException) {
@@ -305,12 +365,17 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
         List<Field> logFields = builder.getFields();
 
+        // 缓存任务
+        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.BINLOG_STATUS, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.BINLOG_DATA);
+        List<Field> binlogFields = builder.getFields();
+
         // 数据
-        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.DATA_SUCCESS, ConfigConstant.DATA_TABLE_GROUP_ID, ConfigConstant.DATA_TARGET_TABLE_NAME, ConfigConstant.DATA_EVENT, ConfigConstant.DATA_ERROR, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
+        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.DATA_SUCCESS, ConfigConstant.DATA_TABLE_GROUP_ID, ConfigConstant.DATA_TARGET_TABLE_NAME, ConfigConstant.DATA_EVENT, ConfigConstant.DATA_ERROR, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.BINLOG_DATA);
         List<Field> dataFields = builder.getFields();
 
         tables.computeIfAbsent(StorageEnum.CONFIG.getType(), k -> new Executor(k, configFields, true, true));
         tables.computeIfAbsent(StorageEnum.LOG.getType(), k -> new Executor(k, logFields, true, false));
+        tables.computeIfAbsent(StorageEnum.BINLOG.getType(), k -> new Executor(k, binlogFields, true, false));
         tables.computeIfAbsent(StorageEnum.DATA.getType(), k -> new Executor(k, dataFields, false, false));
         // 创建表
         tables.forEach((tableName, e) -> {
@@ -332,7 +397,6 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         } catch (EmptyResultDataAccessException e) {
             // 不存在表
             String ddl = readSql(executor.getType(), executor.isSystemTable(), table);
-            logger.info(ddl);
             executeSql(ddl);
         }
 
@@ -373,7 +437,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
 
         // 动态替换表名
         if (!systemTable) {
-            return StringUtil.replaceOnce(res.toString(), template, table);
+            return StringUtil.replace(res.toString(), template, table);
         }
         return res.toString();
     }
@@ -381,19 +445,19 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     private void executeSql(String ddl) {
         connectorMapper.execute(databaseTemplate -> {
             databaseTemplate.execute(ddl);
+            logger.info(ddl);
             return true;
         });
     }
 
-    private void replaceHighLight(Query query, List<Map<String, Object>> list) {
+    private void replaceHighLight(List<AbstractFilter> highLightKeys, List<Map<String, Object>> list) {
         // 开启高亮
-        if (!CollectionUtils.isEmpty(list) && query.isEnableHighLightSearch()) {
-            List<Param> highLight = query.getParams().stream().filter(p -> p.isHighlighter()).collect(Collectors.toList());
+        if (!CollectionUtils.isEmpty(list) && !CollectionUtils.isEmpty(highLightKeys)) {
             list.forEach(row ->
-                    highLight.forEach(p -> {
-                        String text = String.valueOf(row.get(p.getKey()));
-                        String replacement = new StringBuilder("<span style='color:red'>").append(p.getValue()).append("</span>").toString();
-                        row.put(p.getKey(), StringUtil.replace(text, p.getValue(), replacement));
+                    highLightKeys.forEach(paramFilter -> {
+                        String text = String.valueOf(row.get(paramFilter.getName()));
+                        String replacement = new StringBuilder("<span style='color:red'>").append(paramFilter.getValue()).append("</span>").toString();
+                        row.put(paramFilter.getName(), StringUtil.replace(text, paramFilter.getValue(), replacement));
                     })
             );
         }
@@ -419,7 +483,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
                     new Field(ConfigConstant.DATA_TABLE_GROUP_ID, "VARCHAR", Types.VARCHAR),
                     new Field(ConfigConstant.DATA_TARGET_TABLE_NAME, "VARCHAR", Types.VARCHAR),
                     new Field(ConfigConstant.DATA_EVENT, "VARCHAR", Types.VARCHAR),
-                    new Field(ConfigConstant.DATA_ERROR, "LONGVARCHAR", Types.LONGVARCHAR)
+                    new Field(ConfigConstant.DATA_ERROR, "LONGVARCHAR", Types.LONGVARCHAR),
+                    new Field(ConfigConstant.BINLOG_STATUS, "INTEGER", Types.INTEGER),
+                    new Field(ConfigConstant.BINLOG_DATA, "VARBINARY", Types.BLOB)
             ).map(field -> {
                 field.setLabelName(field.getName());
                 // 转换列下划线

+ 22 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/BinlogMessageUtil.java

@@ -6,6 +6,7 @@ import oracle.sql.CLOB;
 import oracle.sql.TIMESTAMP;
 import org.apache.commons.io.IOUtils;
 import org.dbsyncer.storage.binlog.BinlogColumnValue;
+import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -14,9 +15,14 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
-import java.sql.*;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
 import java.time.LocalDateTime;
 import java.util.BitSet;
+import java.util.Map;
 
 /**
  * Java语言提供了八种基本类型,六种数字类型(四个整数型,两个浮点型),一种字符类型,一种布尔型。
@@ -52,6 +58,19 @@ public abstract class BinlogMessageUtil {
 
     private static final BinlogColumnValue value = new BinlogColumnValue();
 
+    public static BinlogMap toBinlogMap(Map<String, Object> data) {
+        BinlogMap.Builder dataBuilder = BinlogMap.newBuilder();
+        data.forEach((k, v) -> {
+            if (null != v) {
+                ByteString bytes = serializeValue(v);
+                if (null != bytes) {
+                    dataBuilder.putRow(k, bytes);
+                }
+            }
+        });
+        return dataBuilder.build();
+    }
+
     public static ByteString serializeValue(Object v) {
         String type = v.getClass().getName();
         switch (type) {
@@ -202,7 +221,7 @@ public abstract class BinlogMessageUtil {
             case Types.BINARY:
             case Types.VARBINARY:
             case Types.LONGVARBINARY:
-            // 二进制对象
+                // 二进制对象
             case Types.NCLOB:
             case Types.CLOB:
             case Types.BLOB:
@@ -224,7 +243,7 @@ public abstract class BinlogMessageUtil {
             is = blob.getBinaryStream();
             b = new byte[(int) blob.length()];
             int read = is.read(b);
-            if(-1 == read){
+            if (-1 == read) {
                 return b;
             }
         } catch (Exception e) {

+ 11 - 25
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/DocumentUtil.java

@@ -2,7 +2,6 @@ package org.dbsyncer.storage.util;
 
 import org.apache.lucene.document.*;
 import org.apache.lucene.util.BytesRef;
-import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.springframework.util.Assert;
 
@@ -16,7 +15,7 @@ import java.util.Map;
  * <p/> new NumericDocValuesField(name, value); 要排序,必须添加一个同名的SortedNumericDocValuesField
  * <p/> 其他FloatPoint、LongPoint、DoublePoint同上
  * <p/> id使用字符串,防止更新失败
- *
+ * <p>
  * <p/>2、Field:
  * <p/>IntPoint
  * <p/>FloatPoint
@@ -31,7 +30,7 @@ import java.util.Map;
  * <p/>FloatDocValuesField 存储Float类型索引并排序
  * <p/>DoubleDocValuesField 存储Double类型索引并排序
  * <p/>BinaryDocValuesField 只存储不共享,例如标题类字段,如果需要共享并排序,推荐使用SortedDocValuesField
- *
+ * <p>
  * <p/>3、Lucene 6.0版本后:
  * <p>IntField 替换为 IntPoint</p>
  * <p>FloatField 替换为 FloatPoint</p>
@@ -98,7 +97,6 @@ public abstract class DocumentUtil {
         String targetTableName = (String) params.get(ConfigConstant.DATA_TARGET_TABLE_NAME);
         String event = (String) params.get(ConfigConstant.DATA_EVENT);
         String error = (String) params.get(ConfigConstant.DATA_ERROR);
-        String json = (String) params.get(ConfigConstant.CONFIG_MODEL_JSON);
         Long createTime = (Long) params.get(ConfigConstant.CONFIG_MODEL_CREATE_TIME);
 
         doc.add(new StringField(ConfigConstant.CONFIG_MODEL_ID, id, Field.Store.YES));
@@ -108,7 +106,12 @@ public abstract class DocumentUtil {
         doc.add(new StringField(ConfigConstant.DATA_TARGET_TABLE_NAME, targetTableName, Field.Store.YES));
         doc.add(new StringField(ConfigConstant.DATA_EVENT, event, Field.Store.YES));
         doc.add(new TextField(ConfigConstant.DATA_ERROR, error, Field.Store.YES));
-        doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_JSON, json));
+
+        // 同步数据
+        byte[] bytes = (byte[]) params.get(ConfigConstant.BINLOG_DATA);
+        doc.add(new BinaryDocValuesField(ConfigConstant.BINLOG_DATA, new BytesRef(bytes)));
+        doc.add(new StoredField(ConfigConstant.BINLOG_DATA, bytes));
+
         // 创建时间
         doc.add(new LongPoint(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
         doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
@@ -116,23 +119,6 @@ public abstract class DocumentUtil {
         return doc;
     }
 
-    @Deprecated
-    public static Document convertBinlog2Doc(String messageId, int status, BytesRef bytes, long updateTime) {
-        Document doc = new Document();
-        doc.add(new StringField(BinlogConstant.BINLOG_ID, messageId, Field.Store.YES));
-
-        doc.add(new IntPoint(BinlogConstant.BINLOG_STATUS, status));
-        doc.add(new StoredField(BinlogConstant.BINLOG_STATUS, status));
-
-        doc.add(new BinaryDocValuesField(BinlogConstant.BINLOG_CONTENT, bytes));
-        doc.add(new StoredField(BinlogConstant.BINLOG_CONTENT, bytes));
-
-        doc.add(new LongPoint(BinlogConstant.BINLOG_TIME, updateTime));
-        doc.add(new StoredField(BinlogConstant.BINLOG_TIME, updateTime));
-        doc.add(new NumericDocValuesField(BinlogConstant.BINLOG_TIME, updateTime));
-        return doc;
-    }
-
     public static Document convertBinlog2Doc(Map params) {
         Document doc = new Document();
         String id = (String) params.get(ConfigConstant.CONFIG_MODEL_ID);
@@ -142,9 +128,9 @@ public abstract class DocumentUtil {
         doc.add(new IntPoint(ConfigConstant.BINLOG_STATUS, status));
         doc.add(new StoredField(ConfigConstant.BINLOG_STATUS, status));
 
-        byte[] bytes = (byte[]) params.get(ConfigConstant.CONFIG_MODEL_JSON);
-        doc.add(new BinaryDocValuesField(ConfigConstant.CONFIG_MODEL_JSON, new BytesRef(bytes)));
-        doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_JSON, bytes));
+        byte[] bytes = (byte[]) params.get(ConfigConstant.BINLOG_DATA);
+        doc.add(new BinaryDocValuesField(ConfigConstant.BINLOG_DATA, new BytesRef(bytes)));
+        doc.add(new StoredField(ConfigConstant.BINLOG_DATA, bytes));
 
         Long createTime = (Long) params.get(ConfigConstant.CONFIG_MODEL_CREATE_TIME);
         doc.add(new LongPoint(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));

+ 8 - 0
dbsyncer-storage/src/main/resources/dbsyncer_binlog.sql

@@ -0,0 +1,8 @@
+CREATE TABLE `dbsyncer_binlog`  (
+  `ID` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '唯一ID',
+  `STATUS` int(1) NOT NULL COMMENT '状态,0-待同步;1-同步中',
+  `CREATE_TIME` bigint(0) NOT NULL COMMENT '创建时间',
+  `DATA` blob NOT NULL COMMENT '同步数据',
+  PRIMARY KEY (`ID`) USING BTREE,
+  INDEX `IDX_TYPE_CREATE_TIME`(`STATUS`, `CREATE_TIME`) USING BTREE
+) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin COMMENT = '缓存队列任务表' ROW_FORMAT = Dynamic;

+ 1 - 1
dbsyncer-storage/src/main/resources/dbsyncer_data.sql

@@ -6,7 +6,7 @@ CREATE TABLE `dbsyncer_data` (
   `EVENT` varchar(10) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '事件',
   `ERROR` mediumtext CHARACTER SET utf8 COLLATE utf8_bin NULL COMMENT '异常信息',
   `CREATE_TIME` bigint(0) NOT NULL COMMENT '创建时间',
-  `JSON` mediumtext CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '同步数据',
+  `DATA` blob NOT NULL COMMENT '同步数据',
   PRIMARY KEY (`ID`) USING BTREE,
   INDEX `IDX_SUCCESS`(`SUCCESS`) USING BTREE,
   INDEX `IDX_EVENT`(`EVENT`) USING BTREE,

+ 2 - 1
dbsyncer-storage/src/main/resources/dbsyncer_upgrade.sql

@@ -1 +1,2 @@
-ALTER TABLE dbsyncer_upgrade ADD COLUMN TABLE_GROUP_ID VARCHAR(64) DEFAULT '' COMMENT '驱动表映射关系ID' AFTER SUCCESS, ADD COLUMN TARGET_TABLE_NAME VARCHAR(100) DEFAULT '' COMMENT '目标表名称' AFTER TABLE_GROUP_ID;
+ALTER TABLE `dbsyncer_upgrade` DROP COLUMN `JSON`;
+ALTER TABLE `dbsyncer_upgrade` ADD COLUMN `DATA` blob NOT NULL COMMENT '同步数据';

+ 20 - 1
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/monitor/MonitorController.java

@@ -1,8 +1,8 @@
 package org.dbsyncer.web.controller.monitor;
 
-import org.dbsyncer.biz.SystemConfigService;
 import org.dbsyncer.biz.ConnectorService;
 import org.dbsyncer.biz.MonitorService;
+import org.dbsyncer.biz.SystemConfigService;
 import org.dbsyncer.biz.vo.AppReportMetricVo;
 import org.dbsyncer.biz.vo.HistoryStackVo;
 import org.dbsyncer.biz.vo.RestResult;
@@ -79,6 +79,13 @@ public class MonitorController extends BaseController {
         return "monitor/monitor.html";
     }
 
+    @GetMapping("/page/retry")
+    public String page(ModelMap model, String metaId, String messageId) {
+        model.put("meta", monitorService.getMetaVo(metaId));
+        model.put("message", monitorService.getMessageVo(metaId, messageId));
+        return "monitor/retry.html";
+    }
+
     @Scheduled(fixedRate = 5000)
     public void recordHistoryStackMetric() {
         recordHistoryStackMetric(MetricEnum.CPU_USAGE, cpu, cpuHistoryStackValueFormatterImpl);
@@ -114,6 +121,18 @@ public class MonitorController extends BaseController {
         }
     }
 
+    @PostMapping("/sync")
+    @ResponseBody
+    public RestResult sync(HttpServletRequest request) {
+        try {
+            Map<String, String> params = getParams(request);
+            return RestResult.restSuccess(monitorService.sync(params));
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e.getClass());
+            return RestResult.restFail(e.getMessage());
+        }
+    }
+
     @PostMapping("/clearData")
     @ResponseBody
     public RestResult clearData(String id) {

+ 1 - 1
dbsyncer-web/src/main/resources/public/connector/addDqlMysql.html

@@ -45,7 +45,7 @@
         <div class="col-sm-10">
             <textarea name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="1024"
                       dbsyncer-valid="require" rows="5"
-                      th:text="${connector?.config?.url}?:'jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&useUnicode=true&amp;characterEncoding=UTF8&amp;serverTimezone=Asia/Shanghai&amp;useSSL=false&amp;verifyServerCertificate=false&amp;autoReconnect=true&amp;failOverReadOnly=false'"></textarea>
+                      th:text="${connector?.config?.url}?:'jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&amp;useUnicode=true&amp;characterEncoding=UTF8&amp;serverTimezone=Asia/Shanghai&amp;useSSL=false&amp;verifyServerCertificate=false&amp;autoReconnect=true&amp;failOverReadOnly=false'"></textarea>
         </div>
     </div>
 

+ 1 - 1
dbsyncer-web/src/main/resources/public/connector/addMysql.html

@@ -19,7 +19,7 @@
         <div class="col-sm-10">
             <textarea name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="1024"
                       dbsyncer-valid="require" rows="5"
-                      th:text="${connector?.config?.url} ?: 'jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&useUnicode=true&amp;characterEncoding=UTF8&amp;serverTimezone=Asia/Shanghai&amp;useSSL=false&amp;verifyServerCertificate=false&amp;autoReconnect=true&amp;failOverReadOnly=false'"></textarea>
+                      th:text="${connector?.config?.url} ?: 'jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&amp;useUnicode=true&amp;characterEncoding=UTF8&amp;serverTimezone=Asia/Shanghai&amp;useSSL=false&amp;verifyServerCertificate=false&amp;autoReconnect=true&amp;failOverReadOnly=false'"></textarea>
         </div>
     </div>
 

+ 5 - 1
dbsyncer-web/src/main/resources/public/monitor/monitor.html

@@ -140,7 +140,11 @@
                                     </td>
                                     <td style="max-width:100px;" class="dbsyncer_over_hidden"><a href="javascript:;" class="dbsyncer_pointer queryError">[[${d?.error}]]</a></td>
                                     <td th:text="${#dates.format(d?.createTime, 'yyyy-MM-dd HH:mm:ss')}" />
-                                    <td><a th:json="${d?.json}" href="javascript:;" class="label label-info queryData">查看数据</a><div class="hidden" th:text="${d?.json}"></div></td>
+                                    <td>
+                                        <a th:json="${d?.json}" href="javascript:;" class="label label-info queryData">查看数据</a>
+                                        <a th:if="${d?.success == 0}" th:id="${d?.id}" href="javascript:;" class="label label-warning retryData">重试</a>
+                                        <div class="hidden" th:text="${d?.json}"></div>
+                                    </td>
                                 </tr>
                                 </tbody>
                             </table>

+ 63 - 0
dbsyncer-web/src/main/resources/public/monitor/retry.html

@@ -0,0 +1,63 @@
+<!DOCTYPE html>
+<html xmlns="http://www.w3.org/1999/xhtml"
+xmlns:th="http://www.thymeleaf.org" lang="zh-CN">
+
+<div class="container-fluid">
+  <div class="container">
+    <form id="retryDataForm" class="form-horizontal" role="form" method="post">
+      <!-- 标题 -->
+      <div class="row text-center">
+        <div class="page-header">
+          <h3>重试[[${meta?.mappingName}]]</h3>
+          <input type="hidden" name="metaId" id="metaId" th:value="${meta?.id}"/>
+          <input type="hidden" name="messageId" th:value="${message?.id}"/>
+          <input type="hidden" id="retryDataParams" name="retryDataParams"/>
+        </div>
+      </div>
+
+      <!-- 操作 -->
+      <div class="form-group">
+        <div class="col-md-10"><h4>[[${message?.sourceTableName}]] <span class="fa fa-angle-double-right"></span> [[${message?.targetTableName}]]</h4></div>
+        <div class="col-md-2 text-right">
+          <button id="retryDataSubmitBtn" type="button" class="btn btn-primary">
+            <span class="fa fa-refresh"></span>执行
+          </button>
+          <button id="retryDataBackBtn" type="button" class="btn btn-default">
+            <span class="fa fa-reply"></span>返回
+          </button>
+        </div>
+      </div>
+
+      <!-- 配置 -->
+      <div class="row">
+        <div class="col-md-12">
+          <table class="table table-hover">
+            <caption>执行前,请先检查数据。字段类型是否支持当前值</caption>
+            <thead>
+            <tr>
+              <th></th>
+              <th>目标表字段</th>
+              <th>值类型</th>
+              <th>值</th>
+              <th>操作</th>
+            </tr>
+            </thead>
+            <tbody>
+            <tr th:each="c,state : ${message?.columns}">
+              <td th:text="${state.index}+1"></td>
+              <td th:text="${c?.key + '(' + c?.keyType + ')'}"/>
+              <td th:text="${c?.valueType}"/>
+              <td th:text="${c?.value}"/>
+              <td><i th:type="${c?.key}" th:title="修改" class="fa fa-edit well-sign-green dbsyncer_pointer retryDataModify"></i></td>
+            </tr>
+            </tbody>
+          </table>
+        </div>
+      </div>
+
+    </form>
+  </div>
+</div>
+
+<script th:src="@{/js/monitor/retry.js}"></script>
+</html>

+ 117 - 61
dbsyncer-web/src/main/resources/static/js/monitor/index.js

@@ -6,21 +6,39 @@ function formatDate(time) {
     var hh = (date.getHours() < 10 ? '0' + date.getHours() : date.getHours()) + ':';
     var mm = (date.getMinutes() < 10 ? '0' + date.getMinutes() : date.getMinutes()) + ':';
     var ss = (date.getSeconds() < 10 ? '0' + date.getSeconds() : date.getSeconds());
-    return YY + MM + DD +" "+hh + mm + ss;
+    return YY + MM + DD + " " + hh + mm + ss;
 }
 
 // 查看详细数据
 function bindQueryDataDetailEvent() {
-    var $queryData = $(".queryData");
+    let $queryData = $(".queryData");
     $queryData.unbind("click");
     $queryData.click(function () {
-        var json = $(this).parent().find("div").text();
-        var html = '<div class="row driver_break_word">' + json + '</div>';
+        let json = $(this).parent().find("div").text();
         BootstrapDialog.show({
+            size: BootstrapDialog.SIZE_WIDE,
             title: "注意信息安全",
             type: BootstrapDialog.TYPE_INFO,
-            message: html,
-            size: BootstrapDialog.SIZE_NORMAL,
+            message: function (dialog) {
+                let $content = '<table class="table table-hover">';
+                $content += '<thead><tr><th></th><th>字段</th><th>值</th></tr></thead>';
+                $content += '<tbody id="dataDetail" tableGroupId="">';
+
+                let jsonObj = $.parseJSON(json);
+                let index = 1;
+                $.each(jsonObj, function(name, value) {
+                    $content += '<tr>';
+                    $content += '<td>' + index + '</td>';
+                    $content += '<td>' + name + '</td>';
+                    $content += '<td class="driver_break_word">' + value + '</td>';
+                    $content += '</tr>';
+                    index++;
+                });
+
+                $content += '</tbody>';
+                $content += '</table>';
+                return $content;
+            },
             buttons: [{
                 label: "关闭",
                 action: function (dialog) {
@@ -31,6 +49,17 @@ function bindQueryDataDetailEvent() {
     });
 }
 
+// 重试同步
+function bindQueryDataRetryEvent(){
+    let $retry = $(".retryData");
+    $retry.unbind("click");
+    $retry.click(function () {
+        let id = $("#searchMetaData").selectpicker("val");
+        let messageId = $(this).attr("id");
+        doLoader('/monitor/page/retry?metaId=' + id + '&messageId=' + messageId);
+    });
+}
+
 // 查看详细数据日志
 function bindQueryErrorDetailEvent() {
     var $queryData = $(".queryError");
@@ -54,7 +83,7 @@ function bindQueryErrorDetailEvent() {
 }
 
 // 清空数据
-function bindClearEvent($btn, $title, $msg, $url, $callback){
+function bindClearEvent($btn, $title, $msg, $url, $callback) {
     $btn.click(function () {
         var $id = null != $callback ? $callback() : $(this).attr("id");
         var data = {"id": $id};
@@ -88,12 +117,12 @@ function bindClearEvent($btn, $title, $msg, $url, $callback){
 }
 
 // 显示更多
-function showMore($this, $url, $params, $call){
+function showMore($this, $url, $params, $call) {
     $params.pageNum = parseInt($this.attr("num")) + 1;
     $params.pageSize = 10;
     doGetter($url, $params, function (data) {
         if (data.success == true) {
-            if(data.resultValue.data.length > 0){
+            if (data.resultValue.data.length > 0) {
                 $this.attr("num", $params.pageNum);
             }
             $call(data.resultValue);
@@ -109,7 +138,13 @@ function bindQueryDataEvent() {
         var keyword = $("#searchDataKeyword").val();
         var id = $("#searchMetaData").selectpicker("val");
         var success = $("#searchDataSuccess").selectpicker("val");
-        doGetter('/monitor/queryData', {"error": keyword, "success": success, "id" : id, "pageNum" : 1, "pageSize" : 10}, function (data) {
+        doGetter('/monitor/queryData', {
+            "error": keyword,
+            "success": success,
+            "id": id,
+            "pageNum": 1,
+            "pageSize": 10
+        }, function (data) {
             if (data.success == true) {
                 refreshDataList(data.resultValue);
             } else {
@@ -118,36 +153,44 @@ function bindQueryDataEvent() {
         });
     });
 }
+
 function bindQueryDataMoreEvent() {
     $("#queryDataMore").click(function () {
         var keyword = $("#searchDataKeyword").val();
         var id = $("#searchMetaData").selectpicker("val");
         var success = $("#searchDataSuccess").selectpicker("val");
-        showMore($(this), '/monitor/queryData', {"error": keyword, "success": success, "id" : id}, function(resultValue){
+        showMore($(this), '/monitor/queryData', {
+            "error": keyword,
+            "success": success,
+            "id": id
+        }, function (resultValue) {
             refreshDataList(resultValue, true)
         });
     });
 }
-function refreshDataList(resultValue, append){
+
+function refreshDataList(resultValue, append) {
     var $dataList = $("#dataList");
     var $dataTotal = $("#dataTotal");
     var html = showData($dataList, resultValue.data, append);
-    if(append){
+    if (append) {
         $dataList.append(html);
-    }else{
+    } else {
         $dataList.html(html);
         $("#queryDataMore").attr("num", 1);
     }
     $dataTotal.html(resultValue.total);
     bindQueryDataDetailEvent();
+    bindQueryDataRetryEvent();
     bindQueryErrorDetailEvent();
 }
-function showData($dataList, arr, append){
+
+function showData($dataList, arr, append) {
     var html = '';
     var size = arr.length;
-    if(size > 0){
+    if (size > 0) {
         var start = append ? $dataList.find("tr").size() : 0;
-        for(i = 0; i < size; i++) {
+        for (i = 0; i < size; i++) {
             html += '<tr>';
             html += '<td>' + (start + i + 1) + '</td>';
             html += '<td>' + arr[i].targetTableName + '</td>';
@@ -155,7 +198,9 @@ function showData($dataList, arr, append){
             html += '<td>' + (arr[i].success ? '<span class="label label-success">成功</span>' : '<span class="label label-warning">失败</span>') + '</td>';
             html += '<td style="max-width:100px;" class="dbsyncer_over_hidden"><a href="javascript:;" class="dbsyncer_pointer queryError">' + arr[i].error + '</a></td>';
             html += '<td>' + formatDate(arr[i].createTime) + '</td>';
-            html += '<td><a href="javascript:;" class="label label-info queryData">查看数据</a><div class="hidden">' + arr[i].json + '</div></td>';
+            html += '<td><div class="hidden">' + arr[i].json + '</div><a href="javascript:;" class="label label-info queryData">查看数据</a>&nbsp;';
+            html += (arr[i].success ? '' : '<a id="' + arr[i].id + '" href="javascript:;" class="label label-warning retryData">重试</a>');
+            html += '</td>';
             html += '</tr>';
         }
     }
@@ -166,7 +211,7 @@ function showData($dataList, arr, append){
 function bindQueryLogEvent() {
     $("#queryLogBtn").click(function () {
         var keyword = $("#searchLogKeyword").val();
-        doGetter('/monitor/queryLog', {"json": keyword, "pageNum" : 1, "pageSize" : 10}, function (data) {
+        doGetter('/monitor/queryLog', {"json": keyword, "pageNum": 1, "pageSize": 10}, function (data) {
             if (data.success == true) {
                 refreshLogList(data.resultValue);
             } else {
@@ -175,32 +220,35 @@ function bindQueryLogEvent() {
         });
     });
 }
+
 function bindQueryLogMoreEvent() {
     $("#queryLogMore").click(function () {
         var keyword = $("#searchLogKeyword").val();
-        showMore($(this), '/monitor/queryLog', {"json": keyword}, function(resultValue){
+        showMore($(this), '/monitor/queryLog', {"json": keyword}, function (resultValue) {
             refreshLogList(resultValue, true)
         });
     });
 }
-function refreshLogList(resultValue, append){
+
+function refreshLogList(resultValue, append) {
     var $logList = $("#logList");
     var $logTotal = $("#logTotal");
     var html = showLog($logList, resultValue.data, append);
-    if(append){
+    if (append) {
         $logList.append(html);
-    }else{
+    } else {
         $logList.html(html);
         $("#queryLogMore").attr("num", 1);
     }
     $logTotal.html(resultValue.total);
 }
-function showLog($logList, arr, append){
+
+function showLog($logList, arr, append) {
     var size = arr.length;
     var html = '';
-    if(size > 0){
+    if (size > 0) {
         var start = append ? $logList.find("tr").size() : 0;
-        for(i = 0; i < size; i++) {
+        for (i = 0; i < size; i++) {
             html += '<tr>';
             html += '<td>' + (start + i + 1) + '</td>';
             html += '<td>' + arr[i].json + '</td>';
@@ -212,14 +260,14 @@ function showLog($logList, arr, append){
 }
 
 // 堆积数据
-function showQueueChart(queueUp, queueCapacity){
-    var option={
-        title:{
-            text:"堆积数据",
-            x:'center',
+function showQueueChart(queueUp, queueCapacity) {
+    var option = {
+        title: {
+            text: "堆积数据",
+            x: 'center',
             y: 'top'
         },
-        tooltip : {
+        tooltip: {
             formatter: "{a}: {c}",
             position: 'top'
         },
@@ -233,7 +281,7 @@ function showQueueChart(queueUp, queueCapacity){
                 splitNumber: 2,
                 axisLine: {            // 坐标轴线
                     lineStyle: {       // 属性lineStyle控制线条样式
-                        color: [[0.3, '#67e0e3'], [0.7, '#37a2da'],[1, '#fd666d']],
+                        color: [[0.3, '#67e0e3'], [0.7, '#37a2da'], [1, '#fd666d']],
                         width: 5
                     }
                 },
@@ -249,15 +297,16 @@ function showQueueChart(queueUp, queueCapacity){
                         color: 'auto'
                     }
                 },
-                detail: {fontSize:12, offsetCenter:[0,'65%']},
+                detail: {fontSize: 12, offsetCenter: [0, '65%']},
                 data: [{value: queueUp, name: ''}]
             }
         ]
     };
     echarts.init(document.getElementById('queueChart')).setOption(option);
 }
+
 // 事件分类
-function showEventChart(ins, upd, del){
+function showEventChart(ins, upd, del) {
     var option = {
         title: {
             text: '事件分类',
@@ -276,9 +325,9 @@ function showEventChart(ins, upd, del){
                 type: 'pie',
                 radius: '50%',
                 data: [
-                    {value:upd, name:'更新'},
-                    {value:ins, name:'插入'},
-                    {value:del, name:'删除'}
+                    {value: upd, name: '更新'},
+                    {value: ins, name: '插入'},
+                    {value: del, name: '删除'}
                 ],
                 emphasis: {
                     itemStyle: {
@@ -296,8 +345,9 @@ function showEventChart(ins, upd, del){
     $("#updateSpan").html(upd);
     $("#deleteSpan").html(del);
 }
+
 // 统计成功失败
-function showTotalChart(success, fail){
+function showTotalChart(success, fail) {
     var option = {
         title: {
             text: '已完成数据',
@@ -316,8 +366,8 @@ function showTotalChart(success, fail){
                 type: 'pie',
                 radius: '50%',
                 data: [
-                    {value:success, name:'成功'},
-                    {value:fail, name:'失败'}
+                    {value: success, name: '成功'},
+                    {value: fail, name: '失败'}
                 ],
                 emphasis: {
                     itemStyle: {
@@ -335,16 +385,17 @@ function showTotalChart(success, fail){
     $("#successSpan").html(success);
     $("#failSpan").html(fail);
 }
+
 // CPU历史
-function showCpuChart(cpu){
+function showCpuChart(cpu) {
     var option = {
-        title : {
-            show:true,
+        title: {
+            show: true,
             text: 'CPU(%)',
-            x:'center',
+            x: 'center',
             y: 'bottom'
         },
-        tooltip : {
+        tooltip: {
             trigger: 'item',
             formatter: "{b} : {c}%"
         },
@@ -362,16 +413,17 @@ function showCpuChart(cpu){
     };
     echarts.init(document.getElementById('cpuChart')).setOption(option);
 }
+
 // 内存历史
-function showMemoryChart(memory){
+function showMemoryChart(memory) {
     var option = {
-        title : {
-            show:true,
+        title: {
+            show: true,
             text: '内存(MB)',
-            x:'center',
+            x: 'center',
             y: 'bottom'
         },
-        tooltip : {
+        tooltip: {
             trigger: 'item',
             formatter: "{b} : {c}MB"
         },
@@ -391,21 +443,23 @@ function showMemoryChart(memory){
     };
     echarts.init(document.getElementById('memoryChart')).setOption(option);
 }
+
 // 指标列表
-function showMetricTable(metrics){
+function showMetricTable(metrics) {
     var html = '';
-    $.each(metrics, function(i) {
+    $.each(metrics, function (i) {
         html += '<tr>';
-        html += '   <td style="width:5%;">'+ (i + 1) +'</td>';
-        html += '   <td>'+ '['+ metrics[i].group + ']' + metrics[i].metricName +'</td>';
-        html += '   <td>'+ metrics[i].detail +'</td>';
+        html += '   <td style="width:5%;">' + (i + 1) + '</td>';
+        html += '   <td>' + '[' + metrics[i].group + ']' + metrics[i].metricName + '</td>';
+        html += '   <td>' + metrics[i].detail + '</td>';
         html += '</tr>';
     });
     $("#metricTable").html(html);
 }
+
 // 显示图表信息
-function showChartTable(){
-    doGetWithoutLoading("/monitor/queryAppReportMetric",{}, function (data) {
+function showChartTable() {
+    doGetWithoutLoading("/monitor/queryAppReportMetric", {}, function (data) {
         if (data.success == true) {
             var report = data.resultValue;
             showTotalChart(report.success, report.fail);
@@ -419,12 +473,13 @@ function showChartTable(){
         }
     });
 }
+
 // 创建定时器
-function createTimer(){
+function createTimer() {
     showChartTable();
-    doGetWithoutLoading("/monitor/getRefreshInterval",{}, function (data) {
+    doGetWithoutLoading("/monitor/getRefreshInterval", {}, function (data) {
         if (data.success == true) {
-            timer = setInterval(function(){
+            timer = setInterval(function () {
                 showChartTable();
             }, data.resultValue * 1000);
         } else {
@@ -451,6 +506,7 @@ $(function () {
     bindQueryDataEvent();
     bindQueryDataMoreEvent();
     bindQueryDataDetailEvent();
+    bindQueryDataRetryEvent();
     bindQueryErrorDetailEvent();
     bindClearEvent($(".clearDataBtn"), "确认清空数据?", "清空数据成功!", "/monitor/clearData", function () {
         return $("#searchMetaData").selectpicker("val");

+ 66 - 0
dbsyncer-web/src/main/resources/static/js/monitor/retry.js

@@ -0,0 +1,66 @@
+function submit(data) {
+    doPoster("/monitor/sync", data, function (data) {
+        if (data.success == true) {
+            bootGrowl("正在同步中.", "success");
+            backMonitorPage();
+        } else {
+            bootGrowl(data.resultValue, "danger");
+        }
+    });
+}
+
+function backMonitorPage() {
+    doLoader('/monitor?id=' + $("#metaId").val());
+}
+
+// 修改增量数据
+function bindRetryDataModifyClick() {
+    $(".retryDataModify").click(function () {
+        let $key = $(this).attr("type");
+        let $tr = $(this).parent().parent();
+        let $value = $tr.find("td:eq(3)");
+        let tmp = $value.text();
+        $value.text("");
+        $value.append("<input type='text'/>");
+        let $input = $value.find("input");
+        $input.focus().val(tmp);
+        $input.blur(function () {
+            $value.text($(this).val());
+            if (tmp != $(this).val()) {
+                createRetryDataParams($key, $(this).val());
+                $tr.addClass("success");
+                $tr.attr("title", "已修改");
+            }
+            $input.unbind();
+        });
+    })
+}
+
+// 生成修改参数
+function createRetryDataParams($key, $value) {
+    let $params = $("#retryDataParams");
+    let val = $params.val();
+    let jsonObj = {};
+    if(!isBlank(val)){
+        jsonObj = $.parseJSON(val);
+    }
+
+    jsonObj[$key] = $value;
+    $params.val(JSON.stringify(jsonObj));
+}
+
+$(function () {
+    //保存
+    $("#retryDataSubmitBtn").click(function () {
+        var $form = $("#retryDataForm");
+        var data = $form.serializeJson();
+        submit(data);
+    });
+
+    //返回
+    $("#retryDataBackBtn").click(function () {
+        backMonitorPage();
+    });
+
+    bindRetryDataModifyClick();
+})