Ver Fonte

!10 维护索引
Merge pull request !10 from AE86/V_1.0.0

AE86 há 5 anos atrás
pai
commit
9cb21f8c3d
41 ficheiros alterados com 1139 adições e 421 exclusões
  1. 0 8
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/MappingService.java
  2. 47 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/MonitorService.java
  3. 41 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/BaseServiceImpl.java
  4. 7 1
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConnectorServiceImpl.java
  5. 11 49
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MappingServiceImpl.java
  6. 76 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java
  7. 5 4
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/TableGroupServiceImpl.java
  8. 59 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/vo/DataVo.java
  9. 41 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/vo/LogVo.java
  10. 12 1
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/vo/MetaVo.java
  11. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  12. 0 1
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/extractor/MysqlExtractor.java
  13. 10 0
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java
  14. 26 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java
  15. 3 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/OperationCallBack.java
  16. 8 7
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/PreloadConfig.java
  17. 38 0
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/impl/DataTemplate.java
  18. 6 4
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/impl/OperationTemplate.java
  19. 7 6
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/impl/PreloadTemplate.java
  20. 25 11
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  21. 29 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java
  22. 61 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushServiceImpl.java
  23. 76 11
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/AbstractStorageService.java
  24. 35 8
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/StorageService.java
  25. 7 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java
  26. 0 25
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/StorageConstant.java
  27. 0 25
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/StrategyConstant.java
  28. 32 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/StorageEnum.java
  29. 123 147
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/LuceneFactoryTest.java
  30. 80 35
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java
  31. 4 4
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Param.java
  32. 28 4
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Query.java
  33. 3 5
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/ConfigStrategy.java
  34. 4 4
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/DataStrategy.java
  35. 3 5
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/LogStrategy.java
  36. 74 30
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java
  37. 17 7
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java
  38. 77 2
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/ParamsUtil.java
  39. 4 5
      dbsyncer-web/src/main/java/org/dbsyncer/web/controller/index/MetaController.java
  40. 52 0
      dbsyncer-web/src/main/java/org/dbsyncer/web/controller/monitor/MonitorController.java
  41. 7 7
      dbsyncer-web/src/main/resources/templates/index/index.html

+ 0 - 8
dbsyncer-biz/src/main/java/org/dbsyncer/biz/MappingService.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.biz;
 
 import org.dbsyncer.biz.vo.MappingVo;
-import org.dbsyncer.biz.vo.MetaVo;
 
 import java.util.List;
 import java.util.Map;
@@ -63,11 +62,4 @@ public interface MappingService {
      */
     String stop(String id);
 
-    /**
-     * 获取运行的驱动列表
-     *
-     * @return
-     */
-    List<MetaVo> getMetaAll();
-
 }

+ 47 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/MonitorService.java

@@ -1,5 +1,10 @@
 package org.dbsyncer.biz;
 
+import org.dbsyncer.biz.vo.DataVo;
+import org.dbsyncer.biz.vo.LogVo;
+import org.dbsyncer.biz.vo.MetaVo;
+
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -16,4 +21,46 @@ public interface MonitorService {
      */
     Map getThreadInfo();
 
+    /**
+     * 获取运行的驱动列表
+     *
+     * @return
+     */
+    List<MetaVo> getMetaAll();
+
+    /**
+     * 查询驱动同步数据
+     *
+     * @return
+     * @param id
+     * @param pageNum
+     * @param pageSize
+     */
+    List<DataVo> queryData(String id, int pageNum, int pageSize);
+
+    /**
+     * 清空驱动同步数据
+     *
+     * @param id
+     * @return
+     */
+    String clearData(String id);
+
+    /**
+     * 查询操作日志
+     *
+     * @param type
+     * @param pageNum
+     * @param pageSize
+     * @return
+     */
+    List<LogVo> queryLog(String type, int pageNum, int pageSize);
+
+    /**
+     * 清空操作日志
+     *
+     * @param id
+     * @return
+     */
+    String clearLog(String type);
 }

+ 41 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/BaseServiceImpl.java

@@ -0,0 +1,41 @@
+package org.dbsyncer.biz.impl;
+
+import org.dbsyncer.manager.Manager;
+import org.dbsyncer.parser.enums.MetaEnum;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.parser.model.TableGroup;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.util.Assert;
+
+public class BaseServiceImpl {
+
+    @Autowired
+    protected Manager manager;
+
+    /**
+     * 驱动启停锁
+     */
+    protected final static Object LOCK = new Object();
+
+    protected boolean isRunning(String metaId) {
+        Meta meta = manager.getMeta(metaId);
+        if (null != meta) {
+            int state = meta.getState();
+            return MetaEnum.isRunning(state);
+        }
+        return false;
+    }
+
+    protected void assertRunning(String metaId) {
+        Assert.isTrue(!isRunning(metaId), "驱动正在运行, 请先停止.");
+    }
+
+    protected void assertRunning(TableGroup model) {
+        synchronized (LOCK) {
+            Mapping mapping = manager.getMapping(model.getMappingId());
+            assertRunning(mapping.getMetaId());
+        }
+    }
+
+}

+ 7 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConnectorServiceImpl.java

@@ -12,8 +12,10 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * @author AE86
@@ -57,7 +59,11 @@ public class ConnectorServiceImpl implements ConnectorService {
 
     @Override
     public List<Connector> getConnectorAll() {
-        return manager.getConnectorAll();
+        List<Connector> list = manager.getConnectorAll()
+                .stream()
+                .sorted(Comparator.comparing(Connector::getUpdateTime).reversed())
+                .collect(Collectors.toList());
+        return list;
     }
 
     @Override

+ 11 - 49
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MappingServiceImpl.java

@@ -7,9 +7,7 @@ import org.dbsyncer.biz.vo.ConnectorVo;
 import org.dbsyncer.biz.vo.MappingVo;
 import org.dbsyncer.biz.vo.MetaVo;
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.manager.Manager;
 import org.dbsyncer.monitor.Monitor;
-import org.dbsyncer.parser.enums.MetaEnum;
 import org.dbsyncer.parser.enums.ModelEnum;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.storage.constant.ConfigConstant;
@@ -31,22 +29,16 @@ import java.util.stream.Collectors;
  * @date 2019/10/17 23:20
  */
 @Service
-public class MappingServiceImpl implements MappingService {
+public class MappingServiceImpl extends BaseServiceImpl implements MappingService {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    @Autowired
-    private Manager manager;
-
     @Autowired
     private Monitor monitor;
 
     @Autowired
     private Checker mappingChecker;
 
-    // 驱动启停锁
-    private final static Object LOCK = new Object();
-
     @Override
     public String add(Map<String, String> params) {
         ConfigModel model = mappingChecker.checkAddConfigModel(params);
@@ -102,8 +94,16 @@ public class MappingServiceImpl implements MappingService {
     @Override
     public String start(String id) {
         Mapping mapping = assertMappingExist(id);
+        final String metaId = mapping.getMetaId();
         synchronized (LOCK) {
-            assertRunning(mapping.getMetaId());
+            assertRunning(metaId);
+
+            // 清空同步记录
+            Meta meta = manager.getMeta(metaId);
+            meta.getFail().set(0);
+            meta.getSuccess().set(0);
+            manager.editMeta(meta);
+
             manager.start(mapping);
         }
         return "驱动启动成功";
@@ -121,16 +121,6 @@ public class MappingServiceImpl implements MappingService {
         return "驱动停止成功";
     }
 
-    @Override
-    public List<MetaVo> getMetaAll() {
-        List<MetaVo> list = manager.getMetaAll()
-                .stream()
-                .map(m -> convertMeta2Vo(m))
-                .sorted(Comparator.comparing(MetaVo::getUpdateTime).reversed())
-                .collect(Collectors.toList());
-        return list;
-    }
-
     private MappingVo convertMapping2Vo(Mapping mapping) {
         String model = mapping.getModel();
         Assert.notNull(mapping, "Mapping can not be null.");
@@ -144,7 +134,7 @@ public class MappingServiceImpl implements MappingService {
         // 元信息
         Meta meta = manager.getMeta(mapping.getMetaId());
         Assert.notNull(meta, "Meta can not be null.");
-        MetaVo metaVo = new MetaVo(ModelEnum.getModelEnum(model).getName());
+        MetaVo metaVo = new MetaVo(ModelEnum.getModelEnum(model).getName(), mapping.getName());
         BeanUtils.copyProperties(meta, metaVo);
 
         MappingVo vo = new MappingVo(sConn, tConn, metaVo);
@@ -152,15 +142,6 @@ public class MappingServiceImpl implements MappingService {
         return vo;
     }
 
-    private MetaVo convertMeta2Vo(Meta meta) {
-        Mapping mapping = manager.getMapping(meta.getMappingId());
-        Assert.notNull(mapping, "驱动不存在.");
-        ModelEnum modelEnum = ModelEnum.getModelEnum(mapping.getModel());
-        MetaVo metaVo = new MetaVo(modelEnum.getName());
-        BeanUtils.copyProperties(meta, metaVo);
-        return metaVo;
-    }
-
     /**
      * 检查是否存在驱动
      *
@@ -173,23 +154,4 @@ public class MappingServiceImpl implements MappingService {
         return mapping;
     }
 
-    /**
-     * 检查是否运行中,运行中抛出异常提示
-     *
-     * @param metaId
-     * @return
-     */
-    private void assertRunning(String metaId) {
-        Assert.isTrue(!isRunning(metaId), "驱动正在运行中, 请先停止.");
-    }
-
-    private boolean isRunning(String metaId) {
-        Meta meta = manager.getMeta(metaId);
-        if (null != meta) {
-            int state = meta.getState();
-            return MetaEnum.isRunning(state);
-        }
-        return false;
-    }
-
 }

+ 76 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java

@@ -1,11 +1,26 @@
 package org.dbsyncer.biz.impl;
 
 import org.dbsyncer.biz.MonitorService;
+import org.dbsyncer.biz.vo.DataVo;
+import org.dbsyncer.biz.vo.LogVo;
+import org.dbsyncer.biz.vo.MetaVo;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.manager.Manager;
 import org.dbsyncer.monitor.Monitor;
+import org.dbsyncer.parser.enums.ModelEnum;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.util.Assert;
 
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * @author AE86
@@ -15,11 +30,72 @@ import java.util.Map;
 @Service
 public class MonitorServiceImpl implements MonitorService {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
     @Autowired
     private Monitor monitor;
 
+    @Autowired
+    private Manager manager;
+
     @Override
     public Map getThreadInfo() {
         return monitor.getThreadInfo();
     }
+
+    @Override
+    public List<MetaVo> getMetaAll() {
+        List<MetaVo> list = manager.getMetaAll()
+                .stream()
+                .map(m -> convertMeta2Vo(m))
+                .sorted(Comparator.comparing(MetaVo::getUpdateTime).reversed())
+                .collect(Collectors.toList());
+        return list;
+    }
+
+    @Override
+    public List<DataVo> queryData(String id, int pageNum, int pageSize) {
+        Assert.hasText(id, "id不能为空.");
+        List<DataVo> list = manager.queryData(id, pageNum, pageSize)
+                .stream()
+                .map(m -> convert2Vo(m, DataVo.class))
+                .collect(Collectors.toList());
+        return list;
+    }
+
+    @Override
+    public String clearData(String id) {
+        manager.clearData(id);
+        return "清空同步数据成功";
+    }
+
+    @Override
+    public List<LogVo> queryLog(String type, int pageNum, int pageSize) {
+        List<LogVo> list = manager.queryLog(type, pageNum, pageSize)
+                .stream()
+                .map(m -> convert2Vo(m, LogVo.class))
+                .collect(Collectors.toList());
+        return list;
+    }
+
+    @Override
+    public String clearLog(String type) {
+        manager.clearLog(type);
+        return "清空日志成功";
+    }
+
+    private MetaVo convertMeta2Vo(Meta meta) {
+        Mapping mapping = manager.getMapping(meta.getMappingId());
+        Assert.notNull(mapping, "驱动不存在.");
+        ModelEnum modelEnum = ModelEnum.getModelEnum(mapping.getModel());
+        MetaVo metaVo = new MetaVo(modelEnum.getName(), mapping.getName());
+        metaVo.setMappingName(mapping.getName());
+        BeanUtils.copyProperties(meta, metaVo);
+        return metaVo;
+    }
+
+    private <T> T convert2Vo(Map map, Class<T> clazz) {
+        String json = JsonUtil.objToJson(map);
+        return (T) JsonUtil.jsonToObj(json, clazz);
+    }
+
 }

+ 5 - 4
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/TableGroupServiceImpl.java

@@ -5,8 +5,6 @@ import org.dbsyncer.biz.checker.Checker;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.manager.Manager;
-import org.dbsyncer.parser.model.ConfigModel;
-import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
 import org.slf4j.Logger;
@@ -23,7 +21,7 @@ import java.util.*;
  * @date 2019/11/27 23:14
  */
 @Service
-public class TableGroupServiceImpl implements TableGroupService {
+public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroupService {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -36,6 +34,7 @@ public class TableGroupServiceImpl implements TableGroupService {
     @Override
     public String add(Map<String, String> params) {
         TableGroup model = (TableGroup) tableGroupChecker.checkAddConfigModel(params);
+        assertRunning(model);
         String id = manager.addTableGroup(model);
 
         // 合并驱动公共字段
@@ -45,7 +44,8 @@ public class TableGroupServiceImpl implements TableGroupService {
 
     @Override
     public String edit(Map<String, String> params) {
-        ConfigModel model = tableGroupChecker.checkEditConfigModel(params);
+        TableGroup model = (TableGroup) tableGroupChecker.checkEditConfigModel(params);
+        assertRunning(model);
         return manager.editTableGroup(model);
     }
 
@@ -53,6 +53,7 @@ public class TableGroupServiceImpl implements TableGroupService {
     public boolean remove(String id) {
         TableGroup tableGroup = manager.getTableGroup(id);
         Assert.notNull(tableGroup, "tableGroup can not be null.");
+        assertRunning(tableGroup);
 
         manager.removeTableGroup(id);
         mergeMappingColumn(tableGroup.getMappingId());

+ 59 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/vo/DataVo.java

@@ -0,0 +1,59 @@
+package org.dbsyncer.biz.vo;
+
+public class DataVo {
+
+    private String id;
+    private boolean success;
+    private String event;
+    private String error;
+    private String json;
+    private long createTime;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public void setEvent(String event) {
+        this.event = event;
+    }
+
+    public String getError() {
+        return error;
+    }
+
+    public void setError(String error) {
+        this.error = error;
+    }
+
+    public String getJson() {
+        return json;
+    }
+
+    public void setJson(String json) {
+        this.json = json;
+    }
+
+    public long getCreateTime() {
+        return createTime;
+    }
+
+    public void setCreateTime(long createTime) {
+        this.createTime = createTime;
+    }
+}

+ 41 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/vo/LogVo.java

@@ -0,0 +1,41 @@
+package org.dbsyncer.biz.vo;
+
+public class LogVo {
+
+    private String id;
+    private String type;
+    private String json;
+    private long createTime;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getJson() {
+        return json;
+    }
+
+    public void setJson(String json) {
+        this.json = json;
+    }
+
+    public long getCreateTime() {
+        return createTime;
+    }
+
+    public void setCreateTime(long createTime) {
+        this.createTime = createTime;
+    }
+}

+ 12 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/vo/MetaVo.java

@@ -11,9 +11,12 @@ public class MetaVo extends Meta {
 
     // 同步方式
     private String model;
+    // 驱动名称
+    private String mappingName;
 
-    public MetaVo(String model) {
+    public MetaVo(String model, String mappingName) {
         this.model = model;
+        this.mappingName = mappingName;
     }
 
     public String getModel() {
@@ -23,4 +26,12 @@ public class MetaVo extends Meta {
     public void setModel(String model) {
         this.model = model;
     }
+
+    public String getMappingName() {
+        return mappingName;
+    }
+
+    public void setMappingName(String mappingName) {
+        this.mappingName = mappingName;
+    }
 }

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -256,7 +256,7 @@ public abstract class AbstractDatabaseConnector implements Database {
                 }
             });
             if (0 == update) {
-                throw new ConnectorException("写入失败");
+                throw new ConnectorException(String.format("执行%s操作失败:%s", event, data));
             }
         } catch (Exception e) {
             // 记录错误数据

+ 0 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/extractor/MysqlExtractor.java

@@ -118,7 +118,6 @@ public class MysqlExtractor extends DefaultExtractor {
         client.setBinlogPosition(nextPosition);
 
         // nextPosition
-        logger.info("{}:{}", client.getBinlogFileName(), client.getBinlogPosition());
         map.put(BINLOG_FILENAME, client.getBinlogFileName());
         map.put(BINLOG_POSITION, String.valueOf(client.getBinlogPosition()));
         flushEvent();

+ 10 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -75,6 +75,16 @@ public interface Manager extends Executor {
 
     List<Meta> getMetaAll();
 
+    // Data
+    List<Map> queryData(String id, int pageNum, int pageSize);
+
+    void clearData(String id);
+
+    // Log
+    List<Map> queryLog(String type, int pageNum, int pageSize);
+
+    void clearLog(String type);
+
     // ConnectorEnum
     List<ConnectorEnum> getConnectorEnumAll();
 

+ 26 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -12,6 +12,7 @@ import org.dbsyncer.manager.enums.GroupStrategyEnum;
 import org.dbsyncer.manager.enums.HandlerEnum;
 import org.dbsyncer.manager.puller.Puller;
 import org.dbsyncer.manager.template.impl.OperationTemplate;
+import org.dbsyncer.manager.template.impl.DataTemplate;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.enums.MetaEnum;
@@ -19,6 +20,7 @@ import org.dbsyncer.parser.model.*;
 import org.dbsyncer.plugin.PluginFactory;
 import org.dbsyncer.plugin.config.Plugin;
 import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeansException;
@@ -51,6 +53,9 @@ public class ManagerFactory implements Manager, ApplicationContextAware, Applica
     @Autowired
     private OperationTemplate operationTemplate;
 
+    @Autowired
+    private DataTemplate dataTemplate;
+
     private Map<String, Puller> map;
 
     @Override
@@ -196,8 +201,27 @@ public class ManagerFactory implements Manager, ApplicationContextAware, Applica
         Meta meta = new Meta();
         meta.setType(ConfigConstant.META);
         QueryConfig<Meta> queryConfig = new QueryConfig<>(meta);
-        List<Meta> metas = operationTemplate.queryAll(queryConfig);
-        return metas;
+        return operationTemplate.queryAll(queryConfig);
+    }
+
+    @Override
+    public List<Map> queryData(String id, int pageNum, int pageSize) {
+        return dataTemplate.query(StorageEnum.DATA, id, pageNum, pageSize);
+    }
+
+    @Override
+    public void clearData(String id) {
+        dataTemplate.clear(StorageEnum.DATA, id);
+    }
+
+    @Override
+    public List<Map> queryLog(String type, int pageNum, int pageSize) {
+        return dataTemplate.query(StorageEnum.LOG, type, pageNum, pageSize);
+    }
+
+    @Override
+    public void clearLog(String type) {
+        dataTemplate.clear(StorageEnum.LOG, type);
     }
 
     @Override

+ 3 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/OperationCallBack.java

@@ -2,6 +2,7 @@ package org.dbsyncer.manager.config;
 
 import org.dbsyncer.manager.template.Callback;
 import org.dbsyncer.storage.StorageService;
+import org.dbsyncer.storage.enums.StorageEnum;
 
 import java.util.Map;
 
@@ -9,11 +10,11 @@ public class OperationCallBack implements Callback {
 
     private StorageService storageService;
 
-    private String type;
+    private StorageEnum type;
 
     private Map params;
 
-    public OperationCallBack(StorageService storageService, String type, Map params) {
+    public OperationCallBack(StorageService storageService, StorageEnum type, Map params) {
         this.storageService = storageService;
         this.type = type;
         this.params = params;

+ 8 - 7
dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/PreloadConfig.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.manager.config;
 
 import org.dbsyncer.manager.enums.GroupStrategyEnum;
+import org.dbsyncer.manager.enums.HandlerEnum;
 import org.dbsyncer.manager.template.Handler;
 
 public class PreloadConfig {
@@ -9,17 +10,17 @@ public class PreloadConfig {
 
     private GroupStrategyEnum groupStrategyEnum;
 
-    private Handler handler;
+    private HandlerEnum handlerEnum;
 
-    public PreloadConfig(String filterType, Handler handler) {
+    public PreloadConfig(String filterType, HandlerEnum handlerEnum) {
         this.filterType = filterType;
-        this.handler = handler;
+        this.handlerEnum = handlerEnum;
     }
 
-    public PreloadConfig(String filterType, GroupStrategyEnum groupStrategyEnum, Handler handler) {
+    public PreloadConfig(String filterType, GroupStrategyEnum groupStrategyEnum, HandlerEnum handlerEnum) {
         this.filterType = filterType;
         this.groupStrategyEnum = groupStrategyEnum;
-        this.handler = handler;
+        this.handlerEnum = handlerEnum;
     }
 
     public String getFilterType() {
@@ -30,7 +31,7 @@ public class PreloadConfig {
         return groupStrategyEnum;
     }
 
-    public Handler getHandler() {
-        return handler;
+    public HandlerEnum getHandlerEnum() {
+        return handlerEnum;
     }
 }

+ 38 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/impl/DataTemplate.java

@@ -0,0 +1,38 @@
+package org.dbsyncer.manager.template.impl;
+
+import org.dbsyncer.storage.StorageService;
+import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.StorageEnum;
+import org.dbsyncer.storage.query.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 同步数据和日志模板
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/5/20 18:59
+ */
+@Component
+public final class DataTemplate {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private StorageService storageService;
+
+    public List<Map> query(StorageEnum type, String collectionId, int pageNum, int pageSize) {
+        Query query = new Query(pageNum, pageSize);
+        return storageService.query(type, query, collectionId);
+    }
+
+    public void clear(StorageEnum type, String collectionId) {
+        storageService.clear(type, collectionId);
+    }
+}

+ 6 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/impl/OperationTemplate.java

@@ -8,11 +8,13 @@ import org.dbsyncer.manager.config.OperationCallBack;
 import org.dbsyncer.manager.config.OperationConfig;
 import org.dbsyncer.manager.config.QueryConfig;
 import org.dbsyncer.manager.enums.GroupStrategyEnum;
-import org.dbsyncer.manager.template.*;
+import org.dbsyncer.manager.template.AbstractTemplate;
+import org.dbsyncer.manager.template.GroupStrategy;
+import org.dbsyncer.manager.template.Handler;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.util.ConfigModelUtil;
 import org.dbsyncer.storage.StorageService;
-import org.dbsyncer.storage.constant.StorageConstant;
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
@@ -79,7 +81,7 @@ public final class OperationTemplate extends AbstractTemplate {
         logger.debug("params:{}", params);
         Handler handler = config.getHandler();
         Assert.notNull(handler, "Handler can not be null.");
-        handler.execute(new OperationCallBack(storageService, StorageConstant.CONFIG, params));
+        handler.execute(new OperationCallBack(storageService, StorageEnum.CONFIG, params));
 
         // 3、缓存
         GroupStrategyEnum strategy = getDefaultStrategy(config);
@@ -115,7 +117,7 @@ public final class OperationTemplate extends AbstractTemplate {
             }
         }
         cacheService.remove(id);
-        storageService.remove(StorageConstant.CONFIG, id);
+        storageService.remove(StorageEnum.CONFIG, id);
     }
 
     private String getGroupId(ConfigModel model, GroupStrategyEnum strategy) {

+ 7 - 6
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/impl/PreloadTemplate.java

@@ -16,6 +16,7 @@ import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,11 +56,11 @@ public final class PreloadTemplate extends AbstractTemplate implements Applicati
         Query query = new Query();
         String filterType = config.getFilterType();
         query.put(ConfigConstant.CONFIG_MODEL_TYPE, filterType);
-        List<Map> list = storageService.queryConfig(query);
+        List<Map> list = storageService.query(StorageEnum.CONFIG, query);
         boolean empty = CollectionUtils.isEmpty(list);
         logger.info("PreLoad {}:{}", filterType, empty ? 0 : list.size());
         if (!empty) {
-            Handler handler = config.getHandler();
+            Handler handler = config.getHandlerEnum().getHandler();
             GroupStrategyEnum strategy = getDefaultStrategy(config);
             list.forEach(map -> {
                 String json = (String) map.get(ConfigConstant.CONFIG_MODEL_JSON);
@@ -74,13 +75,13 @@ public final class PreloadTemplate extends AbstractTemplate implements Applicati
     @Override
     public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
         // Load connectors
-        execute(new PreloadConfig(ConfigConstant.CONNECTOR, HandlerEnum.PRELOAD_CONNECTOR.getHandler()));
+        execute(new PreloadConfig(ConfigConstant.CONNECTOR, HandlerEnum.PRELOAD_CONNECTOR));
         // Load mappings
-        execute(new PreloadConfig(ConfigConstant.MAPPING, HandlerEnum.PRELOAD_MAPPING.getHandler()));
+        execute(new PreloadConfig(ConfigConstant.MAPPING, HandlerEnum.PRELOAD_MAPPING));
         // Load tableGroups
-        execute(new PreloadConfig(ConfigConstant.TABLE_GROUP, GroupStrategyEnum.TABLE, HandlerEnum.PRELOAD_TABLE_GROUP.getHandler()));
+        execute(new PreloadConfig(ConfigConstant.TABLE_GROUP, GroupStrategyEnum.TABLE, HandlerEnum.PRELOAD_TABLE_GROUP));
         // Load metas
-        execute(new PreloadConfig(ConfigConstant.META, HandlerEnum.PRELOAD_META.getHandler()));
+        execute(new PreloadConfig(ConfigConstant.META, HandlerEnum.PRELOAD_META));
 
         // 启动驱动
         Meta meta = new Meta();

+ 25 - 11
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -8,11 +8,13 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.enums.ParserEnum;
+import org.dbsyncer.parser.flush.FlushService;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
@@ -53,6 +55,9 @@ public class ParserFactory implements Parser {
     @Autowired
     private CacheService cacheService;
 
+    @Autowired
+    private FlushService flushService;
+
     @Autowired
     private ApplicationContext applicationContext;
 
@@ -216,7 +221,7 @@ public class ParserFactory implements Parser {
             Result writer = writeBatch(tConfig, command, picker.getTargetFields(), target, threadSize, batchSize);
 
             // 6、更新结果
-            flush(task, writer, target.size());
+            flush(task, writer, target);
 
             // 7、更新分页数
             params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
@@ -225,7 +230,7 @@ public class ParserFactory implements Parser {
 
     @Override
     public void execute(Mapping mapping, TableGroup tableGroup, DataEvent dataEvent) {
-        logger.info("同步数据=> dataEvent:{}", dataEvent);
+        logger.info("{}", dataEvent);
         final String metaId = mapping.getMetaId();
 
         ConnectorConfig tConfig = getConnectorConfig(mapping.getTargetConnectorId());
@@ -255,7 +260,9 @@ public class ParserFactory implements Parser {
         Result writer = connectorFactory.writer(tConfig, picker.getTargetFields(), command, event, target);
 
         // 5、更新结果
-        flush(metaId, writer, 1);
+        List<Map<String, Object>> list = new ArrayList<>(1);
+        list.add(target);
+        flush(metaId, writer, event, list);
     }
 
     /**
@@ -263,26 +270,33 @@ public class ParserFactory implements Parser {
      *
      * @param task
      * @param writer
-     * @param total
+     * @param data
      */
-    private void flush(Task task, Result writer, long total) {
-        flush(task.getId(), writer, total);
+    private void flush(Task task, Result writer, List<Map<String, Object>> data) {
+        flush(task.getId(), writer, ConnectorConstant.OPERTION_DELETE, data);
 
         // 发布刷新事件给FullExtractor
         task.setEndTime(System.currentTimeMillis());
         applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
     }
 
-    private void flush(String metaId, Result writer, long total) {
+    private void flush(String metaId, Result writer, String event, List<Map<String, Object>> data) {
         // 引用传递
+        long total = data.size();
         long fail = writer.getFail().get();
         Meta meta = getMeta(metaId);
         meta.getFail().getAndAdd(fail);
         meta.getSuccess().getAndAdd(total - fail);
-        // print process
-        logger.info("任务:{}, 成功:{}, 失败:{}", metaId, meta.getSuccess(), meta.getFail());
 
-        // TODO 记录错误日志
+        // 记录错误数据
+        Queue<Map<String, Object>> failData = writer.getFailData();
+        boolean success = CollectionUtils.isEmpty(failData);
+        if (!success) {
+            data.clear();
+            data.addAll(failData);
+        }
+        String error = writer.getError().toString();
+        flushService.asyncWrite(metaId, event, success, data, error);
     }
 
     /**
@@ -325,7 +339,7 @@ public class ParserFactory implements Parser {
      * @return
      */
     private Result writeBatch(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> target,
-                                int threadSize, int batchSize) {
+                              int threadSize, int batchSize) {
         // 总数
         int total = target.size();
         // 单次任务

+ 29 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java

@@ -0,0 +1,29 @@
+package org.dbsyncer.parser.flush;
+
+import org.springframework.scheduling.annotation.Async;
+
+import java.util.List;
+import java.util.Map;
+
+public interface FlushService {
+
+    /**
+     * 记录错误日志
+     *
+     * @param metaId
+     * @param error
+     */
+    @Async("taskExecutor")
+    void asyncWrite(String type, String error);
+
+    /**
+     * 记录数据
+     *
+     * @param metaId
+     * @param event
+     * @param success
+     * @param data
+     */
+    @Async("taskExecutor")
+    void asyncWrite(String metaId, String event, boolean success, List<Map<String, Object>> data, String error);
+}

+ 61 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushServiceImpl.java

@@ -0,0 +1,61 @@
+package org.dbsyncer.parser.flush;
+
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.storage.SnowflakeIdWorker;
+import org.dbsyncer.storage.StorageService;
+import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.StorageEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * 持久化
+ * <p>全量或增量数据</p>
+ * <p>系统日志</p>
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/05/19 18:38
+ */
+@Component
+public class FlushServiceImpl implements FlushService {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private StorageService storageService;
+
+    @Autowired
+    private SnowflakeIdWorker snowflakeIdWorker;
+
+    @Override
+    public void asyncWrite(String type, String error) {
+        Map<String, Object> params = new HashMap();
+        params.put(ConfigConstant.CONFIG_MODEL_ID, snowflakeIdWorker.nextId());
+        params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
+        params.put(ConfigConstant.CONFIG_MODEL_JSON, error);
+        params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, System.currentTimeMillis());
+        storageService.addLog(StorageEnum.LOG, params);
+    }
+
+    @Override
+    public void asyncWrite(String metaId, String event, boolean success, List<Map<String, Object>> data, String error) {
+        long now = System.currentTimeMillis();
+        List<Map> list = data.parallelStream().map(r -> {
+            Map<String, Object> params = new HashMap();
+            params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
+            params.put(ConfigConstant.DATA_SUCCESS, success);
+            params.put(ConfigConstant.DATA_EVENT, event);
+            params.put(ConfigConstant.DATA_ERROR, error);
+            params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(r));
+            params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
+            return params;
+        }).collect(Collectors.toList());
+        storageService.addData(StorageEnum.DATA, metaId, list);
+    }
+}

+ 76 - 11
dbsyncer-storage/src/main/java/org/dbsyncer/storage/AbstractStorageService.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.storage;
 
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
 import org.dbsyncer.storage.strategy.Strategy;
 import org.slf4j.Logger;
@@ -24,7 +25,7 @@ public abstract class AbstractStorageService implements StorageService, Applicat
 
     private Map<String, Strategy> map;
 
-    public abstract List<Map> query(String collectionId, Query query);
+    public abstract List<Map> select(String collectionId, Query query) throws IOException;
 
     public abstract void insert(String collectionId, Map params) throws IOException;
 
@@ -32,19 +33,52 @@ public abstract class AbstractStorageService implements StorageService, Applicat
 
     public abstract void delete(String collectionId, String id) throws IOException;
 
+    public abstract void deleteAll(String collectionId) throws IOException;
+
+    /**
+     * 记录日志
+     *
+     * @param collectionId
+     * @param params
+     */
+    public abstract void insertLog(String collectionId, Map<String, Object> params) throws IOException;
+
+    /**
+     * 记录错误数据
+     *
+     * @param collectionId
+     * @param list
+     */
+    public abstract void insertData(String collectionId, List<Map> list) throws IOException;
+
     @Override
     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
         map = applicationContext.getBeansOfType(Strategy.class);
     }
 
     @Override
-    public void add(String type, Map params) {
+    public List<Map> query(StorageEnum type, Query query) {
+        return query(type, query, null);
+    }
+
+    @Override
+    public List<Map> query(StorageEnum type, Query query, String collectionId) {
+        try {
+            collectionId = getCollectionId(type, collectionId);
+            return select(collectionId, query);
+        } catch (IOException e) {
+            logger.error("query collectionId:{}, query:{}, failed:{}", collectionId, query, e.getMessage());
+            throw new StorageException(e);
+        }
+    }
+
+    @Override
+    public void add(StorageEnum type, Map params) {
         add(type, params, null);
     }
 
     @Override
-    public void add(String type, Map params, String collectionId) {
-        Assert.hasText(type, "Type can not be empty.");
+    public void add(StorageEnum type, Map params, String collectionId) {
         Assert.notNull(params, "Params can not be null.");
         logger.debug("collectionId:{}, params:{}", collectionId, params);
         try {
@@ -56,12 +90,12 @@ public abstract class AbstractStorageService implements StorageService, Applicat
     }
 
     @Override
-    public void edit(String type, Map params) {
+    public void edit(StorageEnum type, Map params) {
         edit(type, params, null);
     }
 
     @Override
-    public void edit(String type, Map params, String collectionId) {
+    public void edit(StorageEnum type, Map params, String collectionId) {
         Assert.notNull(params, "Params can not be null.");
         logger.debug("collectionId:{}, params:{}", collectionId, params);
         try {
@@ -73,12 +107,12 @@ public abstract class AbstractStorageService implements StorageService, Applicat
     }
 
     @Override
-    public void remove(String type, String id) {
+    public void remove(StorageEnum type, String id) {
         remove(type, id, null);
     }
 
     @Override
-    public void remove(String type, String id, String collectionId) {
+    public void remove(StorageEnum type, String id, String collectionId) {
         Assert.hasText(id, "ID can not be null.");
         logger.debug("collectionId:{}, id:{}", collectionId, id);
         try {
@@ -89,9 +123,40 @@ public abstract class AbstractStorageService implements StorageService, Applicat
         }
     }
 
-    private String getCollectionId(String type, String collectionId) {
-        Strategy strategy = map.get(type);
-        Assert.notNull(strategy, "Type does not exist.");
+    @Override
+    public void addLog(StorageEnum type, Map<String, Object> params) {
+        try {
+            insertLog(getCollectionId(type, null), params);
+        } catch (IOException e) {
+            logger.error(e.getMessage());
+            throw new StorageException(e);
+        }
+    }
+
+    @Override
+    public void addData(StorageEnum type, String collectionId, List<Map> list) {
+        try {
+            insertData(getCollectionId(type, collectionId), list);
+        } catch (IOException e) {
+            logger.error(e.getMessage());
+            throw new StorageException(e);
+        }
+    }
+
+    @Override
+    public void clear(StorageEnum type, String collectionId) {
+        try {
+            deleteAll(getCollectionId(type, collectionId));
+        } catch (IOException e) {
+            logger.error("clear collectionId:{}, failed:{}", collectionId, e.getMessage());
+            throw new StorageException(e);
+        }
+    }
+
+    private String getCollectionId(StorageEnum type, String collectionId) {
+        Assert.notNull(type, "StorageEnum can not be null.");
+        Strategy strategy = map.get(type.getType().concat("Strategy"));
+        Assert.notNull(strategy, "Strategy does not exist.");
         return strategy.createCollectionId(collectionId);
     }
 

+ 35 - 8
dbsyncer-storage/src/main/java/org/dbsyncer/storage/StorageService.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.storage;
 
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
 
 import java.util.List;
@@ -12,18 +13,44 @@ import java.util.Map;
  */
 public interface StorageService {
 
-    List<Map> queryConfig(Query query);
+    List<Map> query(StorageEnum type, Query query);
 
-    void add(String type, Map params);
+    List<Map> query(StorageEnum type, Query query, String collectionId);
 
-    void add(String type, Map params, String collectionId);
+    void add(StorageEnum type, Map params);
 
-    void edit(String type, Map params);
+    void add(StorageEnum type, Map params, String collectionId);
 
-    void edit(String type, Map params, String collectionId);
+    void edit(StorageEnum type, Map params);
 
-    void remove(String type, String id);
+    void edit(StorageEnum type, Map params, String collectionId);
 
-    void remove(String type, String id, String collectionId);
+    void remove(StorageEnum type, String id);
 
-}
+    void remove(StorageEnum type, String id, String collectionId);
+
+    /**
+     * 记录日志
+     *
+     * @param log
+     * @param params
+     */
+    void addLog(StorageEnum log, Map<String,Object> params);
+
+    /**
+     * 记录数据
+     *
+     * @param data
+     * @param collectionId
+     * @param list
+     */
+    void addData(StorageEnum data, String collectionId, List<Map> list);
+
+    /**
+     * 清空数据/日志
+     *
+     * @param type
+     * @param collectionId
+     */
+    void clear(StorageEnum type, String collectionId);
+}

+ 7 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java

@@ -25,4 +25,11 @@ public class ConfigConstant {
     public static final String TABLE_GROUP = "tableGroup";
     public static final String META = "meta";
 
+    /**
+     * 数据
+     */
+    public static final String DATA_SUCCESS = "success";
+    public static final String DATA_EVENT = "event";
+    public static final String DATA_ERROR = "error";
+
 }

+ 0 - 25
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/StorageConstant.java

@@ -1,25 +0,0 @@
-package org.dbsyncer.storage.constant;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2019/11/16 20:31
- */
-public class StorageConstant {
-
-    /**
-     * 配置:连接器、驱动、运行状态
-     */
-    public static final String CONFIG = "configStrategy";
-
-    /**
-     * 日志:连接器、驱动、系统
-     */
-    public static final String LOG = "logStrategy";
-
-    /**
-     * 数据:驱动实时同步数据
-     */
-    public static final String DATA = "dataStrategy";
-
-}

+ 0 - 25
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/StrategyConstant.java

@@ -1,25 +0,0 @@
-package org.dbsyncer.storage.constant;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2019/11/16 20:31
- */
-public class StrategyConstant {
-
-    /**
-     * 配置:连接器、驱动、运行状态
-     */
-    public static final String CONFIG = "config";
-
-    /**
-     * 日志:连接器、驱动、系统
-     */
-    public static final String LOG = "log";
-
-    /**
-     * 数据:驱动实时同步数据
-     */
-    public static final String DATA = "data";
-
-}

+ 32 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/StorageEnum.java

@@ -0,0 +1,32 @@
+package org.dbsyncer.storage.enums;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/11/16 20:31
+ */
+public enum StorageEnum {
+
+    /**
+     * 配置:连接器、驱动、映射关系、同步信息、系统配置
+     */
+    CONFIG("config"),
+    /**
+     * 日志:连接器、驱动、映射关系、同步信息、系统日志
+     */
+    LOG("log"),
+    /**
+     * 数据:全量或增量数据
+     */
+    DATA("data");
+
+    private String type;
+
+    StorageEnum(String type) {
+        this.type = type;
+    }
+
+    public String getType() {
+        return type;
+    }
+}

+ 123 - 147
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/LuceneFactoryTest.java

@@ -1,11 +1,13 @@
 package org.dbsyncer.storage.lucene;
 
+import org.apache.commons.lang.math.RandomUtils;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.TokenStream;
 import org.apache.lucene.analysis.cn.smart.SmartChineseAnalyzer;
 import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
 import org.apache.lucene.document.*;
-import org.apache.lucene.index.*;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.Term;
 import org.apache.lucene.queryparser.classic.ParseException;
 import org.apache.lucene.queryparser.classic.QueryParser;
 import org.apache.lucene.search.*;
@@ -13,158 +15,114 @@ import org.apache.lucene.search.highlight.Highlighter;
 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
 import org.apache.lucene.search.highlight.QueryScorer;
 import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.io.StringReader;
-import java.nio.file.Paths;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Map;
 
 public class LuceneFactoryTest {
 
-    private Directory directory;
-
-    private Analyzer analyzer;
-
-    private IndexWriter indexWriter;
-
-    private IndexReader indexReader;
-
-    private IndexSearcher indexSearcher;
+    private Shard shard;
 
     @Before
     public void setUp() throws IOException {
-        //索引存放的位置,设置在当前目录中
-        directory = FSDirectory.open(Paths.get("target/indexDir/"));
-
-        analyzer = new SmartChineseAnalyzer();
-
-        //创建索引写入配置
-        IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
-
-        //创建索引写入对象
-        indexWriter = new IndexWriter(directory, indexWriterConfig);
-
-        //创建索引的读取器
-        indexReader = DirectoryReader.open(indexWriter);
-
-        //创建一个索引的查找器,来检索索引库
-        indexSearcher = new IndexSearcher(indexReader);
-
+        shard = new Shard("target/indexDir/");
     }
 
     @After
-    public void tearDown() throws Exception {
-        //关闭流
-        indexWriter.close();
-        indexReader.close();
+    public void tearDown() {
+        shard.close();
     }
 
-    /**
-     * 执行查询,并打印查询到的记录数
-     * @param query
-     * @throws IOException
-     */
-    protected void executeQuery(Query query) throws IOException {
-
-        TopDocs topDocs = indexSearcher.search(query, 100);
-
-        //打印查询到的记录数
-        System.out.println("总共查询到" + topDocs.totalHits + "个文档");
-        for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
-
-            //取得对应的文档对象
-            Document document = indexSearcher.doc(scoreDoc.doc);
-            System.out.println("id:" + document.get("id"));
-            System.out.println("title:" + document.get("title"));
-            System.out.println("content:" + document.get("content"));
-        }
-    }
-
-    /**
-     * 分词打印
-     *
-     * @param analyzer
-     * @param text
-     * @throws IOException
-     */
-    protected void printAnalyzerDoc(Analyzer analyzer, String text) throws IOException {
-
-        TokenStream tokenStream = analyzer.tokenStream("content", new StringReader(text));
-        CharTermAttribute charTermAttribute = tokenStream.addAttribute(CharTermAttribute.class);
-        try {
-            tokenStream.reset();
-            while (tokenStream.incrementToken()) {
-                System.out.println(charTermAttribute.toString());
-            }
-            tokenStream.end();
-        } finally {
-            tokenStream.close();
-            analyzer.close();
+    @Test
+    public void testQuery() throws IOException {
+        int size = 3;
+        for (int i = size; i > 0; i--) {
+            Document doc = new Document();
+            doc.add(new StringField("id", String.valueOf(i), Field.Store.YES));
+            doc.add(new StringField("name", "中文" + i, Field.Store.YES));
+            doc.add(new TextField("content", "这是一串很长长长长长长长的文本", Field.Store.YES));
+
+            // 创建索引
+            int age = RandomUtils.nextInt(50);
+            doc.add(new IntPoint("age", age));
+            // 需要存储内容
+            doc.add(new StoredField("age", age));
+            // 需要排序
+            doc.add(new NumericDocValuesField("age", age));
+            System.out.println(String.format("id=%s,age:=%s", String.valueOf(i), age));
+
+            // 2020-05-23 12:00:00
+            long createTime = 1590206400000L + i;
+            doc.add(new LongPoint("createTime", createTime));
+            doc.add(new StoredField("createTime", createTime));
+            doc.add(new NumericDocValuesField("createTime", createTime));
+
+            shard.insert(doc);
         }
+        // 范围查询 IntPoint.newRangeQuery("id", 1, 100)
+        // 集合查询 IntPoint.newSetQuery("id", 2, 3)
+        // 单个查询 IntPoint.newExactQuery("id", 3)
+        BooleanQuery query = new BooleanQuery.Builder()
+                .add(IntPoint.newRangeQuery("age", 1, 100), BooleanClause.Occur.MUST)
+                .build();
+        List<Map> maps = shard.query(query, new Sort(new SortField("createTime", SortField.Type.LONG, true)));
+        maps.forEach(m -> System.out.println(m));
+
+        // 清空
+        shard.deleteAll();
     }
 
     @Test
-    public void indexWriterTest() throws IOException {
-        long start = System.currentTimeMillis();
+    public void testCURD() throws IOException {
+        System.out.println("测试前:");
+        List<Map> maps = shard.query(new MatchAllDocsQuery());
+        maps.forEach(m -> System.out.println(m));
+        check();
 
-        //创建Document对象,存储索引
+        // 新增
         Document doc = new Document();
-
-        int id = 1;
-
-        //将字段加入到doc中
-        doc.add(new IntPoint("id", id));
-        doc.add(new StringField("title", "Spark", Field.Store.YES));
-        doc.add(new TextField("content", "Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎", Field.Store.YES));
-        doc.add(new StoredField("id", id));
-
-        //将doc对象保存到索引库中
-        indexWriter.addDocument(doc);
-
-        indexWriter.commit();
-
-        long end = System.currentTimeMillis();
-        System.out.println("索引花费了" + (end - start) + " 毫秒");
+        String id = "100";
+        doc.add(new StringField("id", id, Field.Store.YES));
+        doc.add(new StringField("name", "中文", Field.Store.YES));
+        shard.insert(doc);
+        System.out.println("新增后:");
+        maps = shard.query(new MatchAllDocsQuery());
+        maps.forEach(m -> System.out.println(m));
+        check();
+
+        // 修改
+        doc.add(new StringField("name", "中文[已修改]", Field.Store.YES));
+        shard.update(new Term("id", id), doc);
+        System.out.println("修改后:");
+        maps = shard.query(new MatchAllDocsQuery());
+        maps.forEach(m -> System.out.println(m));
+        check();
+
+        // 删除
+        shard.delete(new Term("id", id));
+        System.out.println("删除后:");
+        maps = shard.query(new MatchAllDocsQuery());
+        maps.forEach(m -> System.out.println(m));
+        check();
+
+        // 清空
+        shard.deleteAll();
     }
 
     @Test
-    public void updateDocumentTest() throws IOException {
-        Document doc = new Document();
-
-        int id = 1;
-
-        doc.add(new IntPoint("id", id));
-        doc.add(new StringField("title", "Spark", Field.Store.YES));
-        doc.add(new TextField("content", "Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎", Field.Store.YES));
-        doc.add(new StoredField("id", id));
-
-        long count = indexWriter.updateDocument(new Term("id", "1"), doc);
-        System.out.println("更新文档:" + count);
-        indexWriter.commit();
-    }
-
-    @Test
-    public void deleteDocumentsTest() throws IOException {
-
-        // 删除title中含有关键词“Spark”的文档
-        long count = indexWriter.deleteDocuments(new Term("title", "Spark"));
-
-        //  除此之外IndexWriter还提供了以下方法:
-        // DeleteDocuments(Query query):根据Query条件来删除单个或多个Document
-        // DeleteDocuments(Query[] queries):根据Query条件来删除单个或多个Document
-        // DeleteDocuments(Term term):根据Term来删除单个或多个Document
-        // DeleteDocuments(Term[] terms):根据Term来删除单个或多个Document
-        // DeleteAll():删除所有的Document
-
-        //使用IndexWriter进行Document删除操作时,文档并不会立即被删除,而是把这个删除动作缓存起来,当IndexWriter.Commit()或IndexWriter.Close()时,删除操作才会被真正执行。
-
-        indexWriter.commit();
-
-        System.out.println("删除完成:" + count);
+    public void fmtDate() {
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+        LocalDateTime localDateTime = LocalDateTime.parse("2020-05-23 12:00:00", formatter);
+        long timeStamp = localDateTime.toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
+        System.out.println(timeStamp);
     }
 
     /**
@@ -179,13 +137,12 @@ public class LuceneFactoryTest {
      */
     @Test
     public void termQueryTest() throws IOException {
-
         String searchField = "title";
         //这是一个条件查询的api,用于添加条件
         TermQuery query = new TermQuery(new Term(searchField, "Spark"));
 
         //执行查询,并打印查询到的记录数
-        executeQuery(query);
+        shard.query(query);
     }
 
     /**
@@ -227,7 +184,7 @@ public class LuceneFactoryTest {
         BooleanQuery query = builder.build();
 
         //执行查询,并打印查询到的记录数
-        executeQuery(query);
+        shard.query(query);
     }
 
     /**
@@ -245,7 +202,7 @@ public class LuceneFactoryTest {
         Query query = new PrefixQuery(term);
 
         //执行查询,并打印查询到的记录数
-        executeQuery(query);
+        shard.query(query);
     }
 
     /**
@@ -273,7 +230,7 @@ public class LuceneFactoryTest {
         PhraseQuery phraseQuery = builder.build();
 
         //执行查询,并打印查询到的记录数
-        executeQuery(phraseQuery);
+        shard.query(phraseQuery);
     }
 
     /**
@@ -291,7 +248,7 @@ public class LuceneFactoryTest {
         Query query = new FuzzyQuery(t);
 
         //执行查询,并打印查询到的记录数
-        executeQuery(query);
+        shard.query(query);
     }
 
     /**
@@ -309,7 +266,7 @@ public class LuceneFactoryTest {
         Query query = new WildcardQuery(term);
 
         //执行查询,并打印查询到的记录数
-        executeQuery(query);
+        shard.query(query);
     }
 
     /**
@@ -320,7 +277,7 @@ public class LuceneFactoryTest {
      */
     @Test
     public void queryParserTest() throws IOException, ParseException {
-
+        final Analyzer analyzer = shard.getAnalyzer();
         String searchField = "content";
 
         //指定搜索字段和分析器
@@ -331,7 +288,7 @@ public class LuceneFactoryTest {
         Query query = parser.parse("Spark");
 
         //执行查询,并打印查询到的记录数
-        executeQuery(query);
+        shard.query(query);
     }
 
     /**
@@ -341,6 +298,8 @@ public class LuceneFactoryTest {
      */
     @Test
     public void HighlighterTest() throws IOException, ParseException, InvalidTokenOffsetsException {
+        final Analyzer analyzer = shard.getAnalyzer();
+        final IndexSearcher searcher = shard.getSearcher();
 
         String searchField = "content";
         String text = "Apache Spark 大规模数据处理";
@@ -351,7 +310,7 @@ public class LuceneFactoryTest {
         //用户输入内容
         Query query = parser.parse(text);
 
-        TopDocs topDocs = indexSearcher.search(query, 100);
+        TopDocs topDocs = searcher.search(query, 100);
 
         // 关键字高亮显示的html标签,需要导入lucene-highlighter-xxx.jar
         SimpleHTMLFormatter simpleHTMLFormatter = new SimpleHTMLFormatter("<span style='color:red'>", "</span>");
@@ -360,7 +319,7 @@ public class LuceneFactoryTest {
         for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
 
             //取得对应的文档对象
-            Document document = indexSearcher.doc(scoreDoc.doc);
+            Document document = searcher.doc(scoreDoc.doc);
 
             // 内容增加高亮显示
             TokenStream tokenStream = analyzer.tokenStream("content", new StringReader(document.get("content")));
@@ -371,18 +330,35 @@ public class LuceneFactoryTest {
 
     }
 
-    /**
-     * IKAnalyzer  中文分词器
-     * SmartChineseAnalyzer  smartcn分词器 需要lucene依赖 且和lucene版本同步
-     *
-     * @throws IOException
-     */
     @Test
-    public void AnalyzerTest() throws IOException {
+    public void testAnalyzerDoc() throws IOException {
+        // SmartChineseAnalyzer  smartcn分词器 需要lucene依赖 且和lucene版本同步
+        Analyzer analyzer = new SmartChineseAnalyzer();
 
         String text = "Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎";
-        analyzer = new SmartChineseAnalyzer();
-        printAnalyzerDoc(analyzer, text);
+        TokenStream tokenStream = analyzer.tokenStream("content", new StringReader(text));
+        CharTermAttribute charTermAttribute = tokenStream.addAttribute(CharTermAttribute.class);
+        try {
+            tokenStream.reset();
+            while (tokenStream.incrementToken()) {
+                System.out.println(charTermAttribute.toString());
+            }
+            tokenStream.end();
+        } finally {
+            tokenStream.close();
+            analyzer.close();
+        }
     }
 
+    private void check() throws IOException {
+        final IndexSearcher searcher = shard.getSearcher();
+        IndexReader reader = searcher.getIndexReader();
+        // 通过reader可以有效的获取到文档的数量
+        // 有效的索引文档
+        System.out.println("有效的索引文档:" + reader.numDocs());
+        // 总共的索引文档
+        System.out.println("总共的索引文档:" + reader.maxDoc());
+        // 删掉的索引文档,其实不恰当,应该是在回收站里的索引文档
+        System.out.println("删掉的索引文档:" + reader.numDeletedDocs());
+    }
 }

+ 80 - 35
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -4,10 +4,7 @@ import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.cn.smart.SmartChineseAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.*;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.*;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.dbsyncer.storage.StorageException;
@@ -31,43 +28,25 @@ public class Shard {
 
     private IndexReader indexReader;
 
-    private IndexSearcher indexSearcher;
+    private IndexWriterConfig config;
 
-    private String path;
+    private final Object lock = new Object();
 
-    public Shard(String path) throws IOException {
-        this.path = path;
-        init();
-    }
+    private static final int MAX_SIZE = 10000;
 
-    private void init() throws IOException {
+    public Shard(String path) throws IOException {
         // 索引存放的位置,设置在当前目录中
         directory = FSDirectory.open(Paths.get(path));
         // 分词器
         analyzer = new SmartChineseAnalyzer();
         // 创建索引写入配置
-        IndexWriterConfig config = new IndexWriterConfig(analyzer);
+        config = new IndexWriterConfig(analyzer);
         // 默认32M, 减少合并次数
         config.setRAMBufferSizeMB(32);
         // 创建索引写入对象
         indexWriter = new IndexWriter(directory, config);
         // 创建索引的读取器
         indexReader = DirectoryReader.open(indexWriter);
-        // 创建一个索引的查找器,来检索索引库
-        indexSearcher = new IndexSearcher(indexReader);
-    }
-
-    public void close() {
-        try {
-            indexWriter.close();
-            indexReader.close();
-        } catch (IOException e) {
-            throw new StorageException(e);
-        }
-    }
-
-    public List<Map> prefixQuery(Query query) throws IOException {
-        return executeQuery(query);
     }
 
     public void insert(Document doc) throws IOException {
@@ -77,6 +56,13 @@ public class Shard {
         }
     }
 
+    public void insertBatch(List<Document> docs) throws IOException {
+        if (null != docs) {
+            indexWriter.addDocuments(docs);
+            indexWriter.commit();
+        }
+    }
+
     public void update(Term term, Document doc) throws IOException {
         if (null != term && null != doc) {
             indexWriter.updateDocument(term, doc);
@@ -96,24 +82,83 @@ public class Shard {
         indexWriter.commit();
     }
 
+    public void close() {
+        try {
+            indexWriter.close();
+            indexReader.close();
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
+    }
+
+    public IndexSearcher getSearcher() throws IOException {
+        // 复用索引读取器
+        IndexReader changeReader = DirectoryReader.openIfChanged((DirectoryReader) indexReader, indexWriter, true);
+        if (null != changeReader) {
+            indexReader.close();
+            indexReader = null;
+            synchronized (lock) {
+                if (null == indexReader) {
+                    indexReader = changeReader;
+                }
+            }
+        }
+        return new IndexSearcher(indexReader);
+    }
+
+    public Analyzer getAnalyzer() {
+        return analyzer;
+    }
+
+    public List<Map> query(Query query) throws IOException {
+        return query(query, 1, 20);
+    }
+
+    public List<Map> query(Query query, Sort sort) throws IOException {
+        return query(query, 1, 20, sort);
+    }
+
+    public List<Map> query(Query query, int pageNum, int pageSize) throws IOException {
+        final IndexSearcher searcher = getSearcher();
+        final TopDocs topDocs = searcher.search(query, MAX_SIZE);
+        return search(searcher, topDocs, pageNum, pageSize);
+    }
+
+    public List<Map> query(Query query, int pageNum, int pageSize, Sort sort) throws IOException {
+        final IndexSearcher searcher = getSearcher();
+        final TopDocs topDocs = searcher.search(query, MAX_SIZE, sort);
+        return search(searcher, topDocs, pageNum, pageSize);
+    }
+
     /**
-     * 执行查询,并打印查询到的记录数
+     * 执行查询
      *
-     * @param query
+     * @param searcher
+     * @param topDocs
+     * @param pageNum
+     * @param pageSize
      * @throws IOException
      */
-    private List<Map> executeQuery(Query query) throws IOException {
+    private List<Map> search(final IndexSearcher searcher, final TopDocs topDocs, int pageNum, int pageSize) throws IOException {
+        ScoreDoc[] docs = topDocs.scoreDocs;
+        int total = docs.length;
+        int begin = (pageNum - 1) * pageSize;
+        int end = pageNum * pageSize;
 
-        TopDocs topDocs = indexSearcher.search(query, 10000);
+        // 判断边界
+        begin = begin > total ? total : begin;
+        end = end > total ? total : end;
 
         List<Map> list = new ArrayList<>();
+        Document doc = null;
         Map r = null;
         IndexableField f = null;
-        for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
-            //取得对应的文档对象
-            Document doc = indexSearcher.doc(scoreDoc.doc);
+        Iterator<IndexableField> iterator = null;
+        while (begin < end) {
+            // 取得对应的文档对象
+            doc = searcher.doc(docs[begin++].doc);
+            iterator = doc.iterator();
             r = new LinkedHashMap<>();
-            Iterator<IndexableField> iterator = doc.iterator();
             while (iterator.hasNext()) {
                 f = iterator.next();
                 r.put(f.name(), f.stringValue());

+ 4 - 4
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Param.java

@@ -7,9 +7,9 @@ package org.dbsyncer.storage.query;
  */
 public class Param {
     private String key;
-    private Object value;
+    private String value;
 
-    public Param(String key, Object value) {
+    public Param(String key, String value) {
         this.key = key;
         this.value = value;
     }
@@ -18,7 +18,7 @@ public class Param {
         return key;
     }
 
-    public Object getValue() {
+    public String getValue() {
         return value;
     }
-}
+}

+ 28 - 4
dbsyncer-storage/src/main/java/org/dbsyncer/storage/query/Query.java

@@ -12,8 +12,22 @@ public class Query {
 
     private List<Param> params;
 
+    private int pageNum = 1;
+
+    private int pageSize = 20;
+
     public Query() {
-        params = new ArrayList<>();
+        this.params = new ArrayList<>();
+    }
+
+    public Query(int pageNum, int pageSize) {
+        this.pageNum = pageNum;
+        this.pageSize = pageSize;
+        this.params = new ArrayList<>();
+    }
+
+    public void put(String key, String value) {
+        params.add(new Param(key, value));
     }
 
     public List<Param> getParams() {
@@ -24,9 +38,19 @@ public class Query {
         this.params = params;
     }
 
-    public void put(String key, String value) {
-        params.add(new Param(key, value));
+    public int getPageNum() {
+        return pageNum;
     }
-}
 
+    public void setPageNum(int pageNum) {
+        this.pageNum = pageNum;
+    }
 
+    public int getPageSize() {
+        return pageSize;
+    }
+
+    public void setPageSize(int pageSize) {
+        this.pageSize = pageSize;
+    }
+}

+ 3 - 5
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/ConfigStrategy.java

@@ -1,11 +1,11 @@
 package org.dbsyncer.storage.strategy.impl;
 
-import org.dbsyncer.storage.constant.StrategyConstant;
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.strategy.Strategy;
 import org.springframework.stereotype.Component;
 
 /**
- * 配置:连接器、驱动、运行状态
+ * 配置:Connector、Mapping、TableGroup、Meta、SysConfig
  *
  * @author AE86
  * @version 1.0.0
@@ -14,10 +14,8 @@ import org.springframework.stereotype.Component;
 @Component
 public class ConfigStrategy implements Strategy {
 
-    private static final String COLLECTION_ID = StrategyConstant.CONFIG;
-
     @Override
     public String createCollectionId(String id) {
-        return COLLECTION_ID;
+        return StorageEnum.CONFIG.getType();
     }
 }

+ 4 - 4
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/DataStrategy.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.storage.strategy.impl;
 
-import org.dbsyncer.storage.constant.StrategyConstant;
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.strategy.Strategy;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
@@ -8,7 +8,7 @@ import org.springframework.util.Assert;
 import java.io.File;
 
 /**
- * 数据:驱动实时同步数据
+ * 数据:全量或增量数据
  *
  * @author AE86
  * @version 1.0.0
@@ -17,12 +17,12 @@ import java.io.File;
 @Component
 public class DataStrategy implements Strategy {
 
-    private static final String COLLECTION_ID = StrategyConstant.DATA + File.separator;
+    private static final String COLLECTION_ID = StorageEnum.DATA.getType() + File.separator;
 
     @Override
     public String createCollectionId(String id) {
         Assert.hasText(id, "Id can not be empty.");
-        // 同步数据较多,根据不同的驱动生成集合ID: /data/123
+        // 同步数据较多,根据不同的驱动生成集合ID: data/123
         return COLLECTION_ID + id;
     }
 }

+ 3 - 5
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/LogStrategy.java

@@ -1,11 +1,11 @@
 package org.dbsyncer.storage.strategy.impl;
 
-import org.dbsyncer.storage.constant.StrategyConstant;
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.strategy.Strategy;
 import org.springframework.stereotype.Component;
 
 /**
- * 日志:连接器、驱动、系统
+ * 日志:Connector、Mapping、TableGroup、Meta、系统日志
  *
  * @author AE86
  * @version 1.0.0
@@ -14,10 +14,8 @@ import org.springframework.stereotype.Component;
 @Component
 public class LogStrategy implements Strategy {
 
-    private static final String COLLECTION_ID = StrategyConstant.LOG;
-
     @Override
     public String createCollectionId(String id) {
-        return COLLECTION_ID;
+        return StorageEnum.LOG.getType();
     }
 }

+ 74 - 30
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -3,12 +3,12 @@ package org.dbsyncer.storage.support;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.Term;
-import org.apache.lucene.search.PrefixQuery;
+import org.apache.lucene.search.*;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.storage.AbstractStorageService;
 import org.dbsyncer.storage.StorageException;
 import org.dbsyncer.storage.constant.ConfigConstant;
-import org.dbsyncer.storage.constant.StrategyConstant;
+import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.lucene.Shard;
 import org.dbsyncer.storage.query.Param;
 import org.dbsyncer.storage.query.Query;
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /**
  * @author AE86
@@ -39,44 +40,50 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
 
     private Map<String, Shard> map = new ConcurrentHashMap();
 
-    // 获取相对路径:./data
-    private static final String PATH = "data";
+    // 相对路径:./data/
+    private static final String PATH = "data" + File.separator;
 
     @PostConstruct
     private void init() {
         try {
-            String currentPath = PATH + File.separator;
-            // 存放配置:连接器、驱动、运行状态
-            map.putIfAbsent(StrategyConstant.CONFIG, new Shard(currentPath + StrategyConstant.CONFIG));
-            // 存放日志:连接器、驱动、系统
-            map.putIfAbsent(StrategyConstant.LOG, new Shard(currentPath + StrategyConstant.LOG));
+            // 创建配置和日志索引shard
+            String config = StorageEnum.CONFIG.getType();
+            map.putIfAbsent(config, new Shard(PATH + config));
+
+            String log = StorageEnum.LOG.getType();
+            map.putIfAbsent(log, new Shard(PATH + log));
         } catch (IOException e) {
             throw new StorageException(e);
         }
     }
 
     @Override
-    public List<Map> queryConfig(Query query) {
-        Shard shard = map.get(StrategyConstant.CONFIG);
-        try {
+    public List<Map> select(String collectionId, Query query) throws IOException {
+        Shard shard = map.get(collectionId);
+
+        // 检查是否存在历史
+        if (null == shard) {
+            shard = cacheShardIfExist(collectionId);
+        }
+
+        if (null != shard) {
+            int pageNum = query.getPageNum() <= 0 ? 1 : query.getPageNum();
+            int pageSize = query.getPageSize() <= 0 ? 20 : query.getPageSize();
+            // 根据修改时间 > 创建时间排序
+            Sort sort = new Sort(new SortField(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, SortField.Type.LONG, true),
+                    new SortField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, SortField.Type.LONG, true));
+            // 设置参数
             List<Param> params = query.getParams();
             if (!CollectionUtils.isEmpty(params)) {
-                Param p = params.get(0);
-                Term term = new Term(p.getKey(), (String) p.getValue());
-                PrefixQuery q = new PrefixQuery(term);
-                return shard.prefixQuery(q);
+                BooleanQuery.Builder builder = new BooleanQuery.Builder();
+                params.forEach(p ->
+                        builder.add(new TermQuery(new Term(p.getKey(), p.getValue())), BooleanClause.Occur.MUST)
+                );
+                BooleanQuery q = builder.build();
+                return shard.query(q, pageNum, pageSize, sort);
             }
-        } catch (IOException e) {
-            logger.error(e.getMessage());
-        }
-        return Collections.emptyList();
-    }
 
-    @Override
-    public List<Map> query(String collectionId, Query query) {
-        Shard shard = map.get(collectionId);
-        if (null != shard) {
-            return Collections.emptyList();
+            return shard.query(new MatchAllDocsQuery(), pageNum, pageSize, sort);
         }
         return Collections.emptyList();
     }
@@ -84,14 +91,14 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
     @Override
     public void insert(String collectionId, Map params) throws IOException {
         createShardIfNotExist(collectionId);
-        Document doc = ParamsUtil.convertParamsToDocument(params);
+        Document doc = ParamsUtil.convertParams2Doc(params);
         map.get(collectionId).insert(doc);
     }
 
     @Override
     public void update(String collectionId, Map params) throws IOException {
         createShardIfNotExist(collectionId);
-        Document doc = ParamsUtil.convertParamsToDocument(params);
+        Document doc = ParamsUtil.convertParams2Doc(params);
         IndexableField field = doc.getField(ConfigConstant.CONFIG_MODEL_ID);
         map.get(collectionId).update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), doc);
     }
@@ -102,17 +109,54 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
         map.get(collectionId).delete(new Term(ConfigConstant.CONFIG_MODEL_ID, id));
     }
 
+    @Override
+    public void deleteAll(String collectionId) throws IOException {
+        Shard shard = map.get(collectionId);
+        if (null != shard) {
+            shard.deleteAll();
+            map.remove(collectionId);
+        }
+    }
+
+    @Override
+    public void insertLog(String collectionId, Map<String, Object> params) throws IOException {
+        createShardIfNotExist(collectionId);
+        Document doc = ParamsUtil.convertLog2Doc(params);
+        map.get(collectionId).insert(doc);
+    }
+
+    @Override
+    public void insertData(String collectionId, List<Map> list) throws IOException {
+        createShardIfNotExist(collectionId);
+        List<Document> docs = list.parallelStream().map(r -> ParamsUtil.convertData2Doc(r)).collect(Collectors.toList());
+        map.get(collectionId).insertBatch(docs);
+    }
+
     /**
      * 如果不存在分片则创建(线程安全)
+     * <p>/data/config</p>
+     * <p>/data/log</p>
+     * <p>/data/data/123</p>
      *
-     * @param collectionId /data/123
+     * @param collectionId
      * @throws IOException
      */
     private void createShardIfNotExist(String collectionId) throws IOException {
         if (null == map.get(collectionId)) {
-            // 存放数据:驱动实时同步数据, /data/${collectionId}
             map.putIfAbsent(collectionId, new Shard(PATH + collectionId));
         }
     }
 
+    private Shard cacheShardIfExist(String collectionId) {
+        String path = PATH + collectionId;
+        if (new File(path).exists()) {
+            try {
+                map.putIfAbsent(collectionId, new Shard(path));
+            } catch (IOException e) {
+                logger.error(e.getMessage());
+            }
+        }
+        return map.get(collectionId);
+    }
+
 }

+ 17 - 7
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -19,12 +19,7 @@ import java.util.Map;
 public class MysqlStorageServiceImpl extends AbstractStorageService {
 
     @Override
-    public List<Map> queryConfig(Query query) {
-        return null;
-    }
-
-    @Override
-    public List<Map> query(String collectionId, Query query) {
+    public List<Map> select(String collectionId, Query query) {
         return null;
     }
 
@@ -42,4 +37,19 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     public void delete(String collectionId, String id) throws IOException {
 
     }
-}
+
+    @Override
+    public void deleteAll(String collectionId) throws IOException {
+
+    }
+
+    @Override
+    public void insertLog(String collectionId, Map<String, Object> params) throws IOException {
+
+    }
+
+    @Override
+    public void insertData(String collectionId, List<Map> list) throws IOException {
+
+    }
+}

+ 77 - 2
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/ParamsUtil.java

@@ -7,6 +7,35 @@ import org.springframework.util.Assert;
 import java.util.Map;
 
 /**
+ * <b>索引维护工具类</b>
+ * <p/>1、使用方法:
+ * <p/> new IntPoint(name, value); 存放int类型
+ * <p/> new StoredField(name, value); 要存储值,必须添加一个同名的StoredField
+ * <p/> new NumericDocValuesField(name, value); 要排序,必须添加一个同名的SortedNumericDocValuesField
+ * <p/> 其他FloatPoint、LongPoint、DoublePoint同上
+ * <p/> id使用字符串,防止更新失败
+ *
+ * <p/>2、Field:
+ * <p/>IntPoint
+ * <p/>FloatPoint
+ * <p/>LongPoint
+ * <p/>DoublePoint
+ * <p/>BinaryPoint
+ * <p/>StringField 索引不分词,所有的字符串会作为一个整体进行索引,例如通常用于country或id等
+ * <p/>TextField 索引并分词,不包括term vectors,例如通常用于一个body Field
+ * <p/>StoredField 存储Field的值,可以用 IndexSearcher.doc和IndexReader.document来获取存储的Field和存储的值
+ * <p/>SortedDocValuesField 存储String、Text类型排序
+ * <p/>NumericDocValuesField 存储Int、Long类型索引并排序,用于评分、排序和值检索
+ * <p/>FloatDocValuesField 存储Float类型索引并排序
+ * <p/>DoubleDocValuesField 存储Double类型索引并排序
+ * <p/>BinaryDocValuesField 只存储不共享,例如标题类字段,如果需要共享并排序,推荐使用SortedDocValuesField
+ *
+ * <p/>3、Lucene 6.0版本后:
+ * <p>IntField 替换为 IntPoint</p>
+ * <p>FloatField 替换为 FloatPoint</p>
+ * <p>LongField 替换为 LongPoint</p>
+ * <p>DoubleField 替换为 DoublePoint</p>
+ *
  * @author AE86
  * @version 1.0.0
  * @date 2019/11/19 22:07
@@ -14,22 +43,68 @@ import java.util.Map;
 public abstract class ParamsUtil {
     private ParamsUtil(){}
 
-    public static Document convertParamsToDocument(Map params) {
+    public static Document convertParams2Doc(Map params) {
         Assert.notNull(params, "Params can not be null.");
         Document doc = new Document();
         String id = (String) params.get(ConfigConstant.CONFIG_MODEL_ID);
         String type = (String) params.get(ConfigConstant.CONFIG_MODEL_TYPE);
         String name = (String) params.get(ConfigConstant.CONFIG_MODEL_NAME);
+        String json = (String) params.get(ConfigConstant.CONFIG_MODEL_JSON);
         Long createTime = (Long) params.get(ConfigConstant.CONFIG_MODEL_CREATE_TIME);
         Long updateTime = (Long) params.get(ConfigConstant.CONFIG_MODEL_UPDATE_TIME);
-        String json = (String) params.get(ConfigConstant.CONFIG_MODEL_JSON);
 
         doc.add(new StringField(ConfigConstant.CONFIG_MODEL_ID, id, Field.Store.YES));
         doc.add(new StringField(ConfigConstant.CONFIG_MODEL_TYPE, type, Field.Store.YES));
         doc.add(new TextField(ConfigConstant.CONFIG_MODEL_NAME, name, Field.Store.YES));
+        doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_JSON, json));
+        // 创建时间(不需要存储)
         doc.add(new LongPoint(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
+        doc.add(new NumericDocValuesField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
+        // 修改时间(不需要存储)
         doc.add(new LongPoint(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, updateTime));
+        doc.add(new NumericDocValuesField(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, updateTime));
+        return doc;
+    }
+
+    public static Document convertLog2Doc(Map params) {
+        Assert.notNull(params, "Params can not be null.");
+        Document doc = new Document();
+        String id = (String) params.get(ConfigConstant.CONFIG_MODEL_ID);
+        String type = (String) params.get(ConfigConstant.CONFIG_MODEL_TYPE);
+        String json = (String) params.get(ConfigConstant.CONFIG_MODEL_JSON);
+        Long createTime = (Long) params.get(ConfigConstant.CONFIG_MODEL_CREATE_TIME);
+
+        doc.add(new StringField(ConfigConstant.CONFIG_MODEL_ID, id, Field.Store.YES));
+        doc.add(new StringField(ConfigConstant.CONFIG_MODEL_TYPE, type, Field.Store.YES));
+        // 日志信息
+        doc.add(new TextField(ConfigConstant.CONFIG_MODEL_JSON, json, Field.Store.YES));
+        // 创建时间
+        doc.add(new LongPoint(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
+        doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
+        doc.add(new NumericDocValuesField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
+        return doc;
+    }
+
+    public static Document convertData2Doc(Map params) {
+        Assert.notNull(params, "Params can not be null.");
+        Document doc = new Document();
+        String id = (String) params.get(ConfigConstant.CONFIG_MODEL_ID);
+        Boolean success = (Boolean) params.get(ConfigConstant.DATA_SUCCESS);
+        String event = (String) params.get(ConfigConstant.DATA_EVENT);
+        String error = (String) params.get(ConfigConstant.DATA_ERROR);
+        String json = (String) params.get(ConfigConstant.CONFIG_MODEL_JSON);
+        Long createTime = (Long) params.get(ConfigConstant.CONFIG_MODEL_CREATE_TIME);
+
+        doc.add(new StringField(ConfigConstant.CONFIG_MODEL_ID, id, Field.Store.YES));
+        doc.add(new StringField(ConfigConstant.DATA_SUCCESS, String.valueOf(success), Field.Store.YES));
+        doc.add(new StringField(ConfigConstant.DATA_EVENT, event, Field.Store.YES));
+        doc.add(new TextField(ConfigConstant.DATA_ERROR, error, Field.Store.YES));
         doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_JSON, json));
+        // 创建时间
+        doc.add(new LongPoint(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
+        doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
+        doc.add(new NumericDocValuesField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
         return doc;
     }
+
 }

+ 4 - 5
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/index/MetaController.java

@@ -1,8 +1,7 @@
 package org.dbsyncer.web.controller.index;
 
-import org.dbsyncer.biz.MappingService;
+import org.dbsyncer.biz.MonitorService;
 import org.dbsyncer.biz.vo.RestResult;
-import org.dbsyncer.web.controller.BaseController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -13,18 +12,18 @@ import org.springframework.web.bind.annotation.ResponseBody;
 
 @Controller
 @RequestMapping("/meta")
-public class MetaController extends BaseController {
+public class MetaController {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Autowired
-    private MappingService mappingService;
+    private MonitorService monitorService;
 
     @GetMapping("/getAll")
     @ResponseBody
     public RestResult getAll() {
         try {
-            return RestResult.restSuccess(mappingService.getMetaAll());
+            return RestResult.restSuccess(monitorService.getMetaAll());
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e.getClass());
             return RestResult.restFail(e.getMessage());

+ 52 - 0
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/monitor/MonitorController.java

@@ -1,10 +1,16 @@
 package org.dbsyncer.web.controller.monitor;
 
 import org.dbsyncer.biz.MonitorService;
+import org.dbsyncer.biz.vo.RestResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Controller;
 import org.springframework.ui.ModelMap;
+import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
 
 import javax.servlet.http.HttpServletRequest;
 
@@ -12,6 +18,8 @@ import javax.servlet.http.HttpServletRequest;
 @RequestMapping("/monitor")
 public class MonitorController {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
     @Autowired
     private MonitorService monitorService;
 
@@ -21,4 +29,48 @@ public class MonitorController {
         return "monitor/monitor.html";
     }
 
+    @GetMapping("/queryData")
+    @ResponseBody
+    public RestResult queryData(@RequestParam(value = "id") String id, @RequestParam(value = "pageNum") int pageNum, @RequestParam(value = "pageSize") int pageSize) {
+        try {
+            return RestResult.restSuccess(monitorService.queryData(id, pageNum, pageSize));
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e.getClass());
+            return RestResult.restFail(e.getMessage());
+        }
+    }
+
+    @GetMapping("/queryLog")
+    @ResponseBody
+    public RestResult queryLog(@RequestParam(value = "type") String type, @RequestParam(value = "pageNum") int pageNum, @RequestParam(value = "pageSize") int pageSize) {
+        try {
+            return RestResult.restSuccess(monitorService.queryLog(type, pageNum, pageSize));
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e.getClass());
+            return RestResult.restFail(e.getMessage());
+        }
+    }
+
+    @GetMapping("/clearData")
+    @ResponseBody
+    public RestResult clearData(@RequestParam(value = "id") String id) {
+        try {
+            return RestResult.restSuccess(monitorService.clearData(id));
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e.getClass());
+            return RestResult.restFail(e.getMessage());
+        }
+    }
+
+    @GetMapping("/clearLog")
+    @ResponseBody
+    public RestResult clearLog(@RequestParam(value = "type") String type) {
+        try {
+            return RestResult.restSuccess(monitorService.clearLog(type));
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e.getClass());
+            return RestResult.restFail(e.getMessage());
+        }
+    }
+
 }

+ 7 - 7
dbsyncer-web/src/main/resources/templates/index/index.html

@@ -124,24 +124,24 @@
                                                 <table class="table table-hover">
                                                     <tbody>
                                                     <tr>
-                                                        <td>同步方式: <span th:text="${m?.meta?.model}"></span></td>
+                                                        <td>同步方式> <span th:text="${m?.meta?.model}"></span></td>
                                                     </tr>
                                                     <tr>
                                                         <td class="text-left">
-                                                            同步结果:
+                                                            同步结果>
                                                             总数:[[${m?.meta?.total}]]
                                                             <span th:if="${m?.meta?.model eq '全量' and (m?.meta?.success + m?.meta?.fail) gt 0}">
-                                                                        ,进度:[[${#numbers.formatDecimal(((m?.meta?.success + m?.meta?.fail) / m?.meta?.total * 100.00),0 ,2)}]]%
-                                                                        ,耗时:[[${(m?.meta?.endTime - m?.meta?.beginTime) / 1000}]]秒
-                                                                    </span>
+                                                            ,进度:[[${#numbers.formatDecimal(((m?.meta?.success + m?.meta?.fail) / m?.meta?.total * 100.00),0 ,2)}]]%
+                                                            ,耗时:[[${(m?.meta?.endTime - m?.meta?.beginTime) / 1000}]]秒
+                                                            </span>
                                                             <span th:if="${m?.meta?.success gt 0}">,成功:[[${m?.meta?.success}]]</span>
                                                             <span th:if="${m?.meta?.fail gt 0}">,失败:[[${m?.meta?.fail}]] <a href="javascript:;" class="label label-danger">日志</a></span>
                                                         </td>
                                                     </tr>
                                                     <tr>
                                                         <td class="text-left">
-                                                            启动时间:
-                                                            <span th:if="${m?.meta?.state != 0 and m?.meta?.beginTime gt 0}">[[${#dates.format(m?.meta?.beginTime, 'yyyy-MM-dd HH:mm:ss')}]]</span>
+                                                            启动时间>
+                                                            <span th:if="${m?.meta?.beginTime gt 0}">[[${#dates.format(m?.meta?.beginTime, 'yyyy-MM-dd HH:mm:ss')}]]</span>
                                                         </td>
                                                     </tr>
                                                     </tbody>