Răsfoiți Sursa

!11 merge
Merge pull request !11 from AE86/V_1.0.0

AE86 5 ani în urmă
părinte
comite
fca8ed9739
50 a modificat fișierele cu 1586 adăugiri și 826 ștergeri
  1. 1 1
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/ConnectorService.java
  2. 14 11
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/MonitorService.java
  3. 8 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/ConnectorChecker.java
  4. 51 3
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java
  5. 36 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/BaseServiceImpl.java
  6. 11 8
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConnectorServiceImpl.java
  7. 22 2
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MappingServiceImpl.java
  8. 43 8
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java
  9. 9 4
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/TableGroupServiceImpl.java
  10. 23 3
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java
  11. 7 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/CompareFilter.java
  12. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  13. 35 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/FilterEnum.java
  14. 15 24
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java
  15. 0 6
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/Action.java
  16. 3 8
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java
  17. 2 17
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/Listener.java
  18. 4 15
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java
  19. 16 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/config/ListenerConfig.java
  20. 7 3
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java
  21. 11 19
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerTypeEnum.java
  22. 201 203
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java
  23. 130 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java
  24. 10 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskJob.java
  25. 22 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskService.java
  26. 53 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskServiceImpl.java
  27. 0 352
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/TaskQuartzHandle.java
  28. 1 1
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java
  29. 2 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java
  30. 51 0
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/ExtractorConfig.java
  31. 65 7
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/FieldPicker.java
  32. 0 4
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/AbstractPuller.java
  33. 200 43
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java
  34. 7 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  35. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushServiceImpl.java
  36. 15 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/logger/LogService.java
  37. 32 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/logger/LogServiceImpl.java
  38. 206 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/logger/LogType.java
  39. 14 9
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java
  40. 6 4
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java
  41. 2 3
      dbsyncer-storage/src/main/test/LuceneFactoryTest.java
  42. 14 0
      dbsyncer-web/src/main/java/org/dbsyncer/web/config/TaskPoolConfig.java
  43. 0 6
      dbsyncer-web/src/main/java/org/dbsyncer/web/controller/index/MappingController.java
  44. 20 14
      dbsyncer-web/src/main/java/org/dbsyncer/web/controller/monitor/MonitorController.java
  45. 7 0
      dbsyncer-web/src/main/resources/static/js/common.js
  46. 11 0
      dbsyncer-web/src/main/resources/static/js/index/index.js
  47. 2 0
      dbsyncer-web/src/main/resources/static/js/mapping/editTableGroup.js
  48. 72 0
      dbsyncer-web/src/main/resources/static/js/monitor/index.js
  49. 19 15
      dbsyncer-web/src/main/resources/templates/index/index.html
  50. 104 19
      dbsyncer-web/src/main/resources/templates/monitor/monitor.html

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/ConnectorService.java

@@ -31,7 +31,7 @@ public interface ConnectorService {
      *
      *
      * @param id
      * @param id
      */
      */
-    boolean remove(String id);
+    String remove(String id);
 
 
     /**
     /**
      * 获取连接器
      * 获取连接器

+ 14 - 11
dbsyncer-biz/src/main/java/org/dbsyncer/biz/MonitorService.java

@@ -22,21 +22,26 @@ public interface MonitorService {
     Map getThreadInfo();
     Map getThreadInfo();
 
 
     /**
     /**
-     * 获取运行的驱动列表
+     * 获取驱动元信息列表
      *
      *
      * @return
      * @return
      */
      */
     List<MetaVo> getMetaAll();
     List<MetaVo> getMetaAll();
 
 
+    /**
+     * 获取驱动默认元信息id
+     * @param params
+     * @return
+     */
+    String getDefaultMetaId(Map<String, String> params);
+
     /**
     /**
      * 查询驱动同步数据
      * 查询驱动同步数据
      *
      *
      * @return
      * @return
-     * @param id
-     * @param pageNum
-     * @param pageSize
+     * @param params
      */
      */
-    List<DataVo> queryData(String id, int pageNum, int pageSize);
+    List<DataVo> queryData(Map<String, String> params);
 
 
     /**
     /**
      * 清空驱动同步数据
      * 清空驱动同步数据
@@ -49,18 +54,16 @@ public interface MonitorService {
     /**
     /**
      * 查询操作日志
      * 查询操作日志
      *
      *
-     * @param type
-     * @param pageNum
-     * @param pageSize
+     * @param params
      * @return
      * @return
      */
      */
-    List<LogVo> queryLog(String type, int pageNum, int pageSize);
+    List<LogVo> queryLog(Map<String, String> params);
 
 
     /**
     /**
      * 清空操作日志
      * 清空操作日志
      *
      *
-     * @param id
      * @return
      * @return
      */
      */
-    String clearLog(String type);
+    String clearLog();
+
 }
 }

+ 8 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/ConnectorChecker.java

@@ -7,6 +7,8 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.parser.logger.LogService;
+import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
@@ -35,6 +37,9 @@ public class ConnectorChecker extends AbstractChecker implements ApplicationCont
     @Autowired
     @Autowired
     private Manager manager;
     private Manager manager;
 
 
+    @Autowired
+    private LogService logService;
+
     private Map<String, ConnectorConfigChecker> map;
     private Map<String, ConnectorConfigChecker> map;
 
 
     @Override
     @Override
@@ -110,6 +115,9 @@ public class ConnectorChecker extends AbstractChecker implements ApplicationCont
     private void setTable(Connector connector) {
     private void setTable(Connector connector) {
         // 获取表信息
         // 获取表信息
         boolean alive = manager.alive(connector.getConfig());
         boolean alive = manager.alive(connector.getConfig());
+        if(!alive){
+            logService.log(LogType.ConnectorLog.FAILED);
+        }
         Assert.isTrue(alive, "无法连接.");
         Assert.isTrue(alive, "无法连接.");
         List<String> table = manager.getTable(connector.getConfig());
         List<String> table = manager.getTable(connector.getConfig());
         connector.setTable(table);
         connector.setTable(table);

+ 51 - 3
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -5,6 +5,7 @@ import org.dbsyncer.biz.BizException;
 import org.dbsyncer.biz.checker.AbstractChecker;
 import org.dbsyncer.biz.checker.AbstractChecker;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.config.Filter;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.Manager;
@@ -101,12 +102,15 @@ public class TableGroupChecker extends AbstractChecker {
 
 
     public void setCommand(Mapping mapping, TableGroup tableGroup) {
     public void setCommand(Mapping mapping, TableGroup tableGroup) {
         TableGroup group = new TableGroup();
         TableGroup group = new TableGroup();
-        group.setFieldMapping(tableGroup.getFieldMapping());
+        group.setFieldMapping(new ArrayList<>(tableGroup.getFieldMapping()));
         group.setSourceTable(tableGroup.getSourceTable());
         group.setSourceTable(tableGroup.getSourceTable());
         group.setTargetTable(tableGroup.getTargetTable());
         group.setTargetTable(tableGroup.getTargetTable());
         // 默认使用全局的过滤条件
         // 默认使用全局的过滤条件
         group.setFilter(CollectionUtils.isEmpty(tableGroup.getFilter()) ? mapping.getFilter() : tableGroup.getFilter());
         group.setFilter(CollectionUtils.isEmpty(tableGroup.getFilter()) ? mapping.getFilter() : tableGroup.getFilter());
 
 
+        // 添加增量配置事件和过滤条件字段
+        appendEventAndFilter(mapping, group);
+
         Map<String, String> command = manager.getCommand(mapping, group);
         Map<String, String> command = manager.getCommand(mapping, group);
         tableGroup.setCommand(command);
         tableGroup.setCommand(command);
 
 
@@ -136,6 +140,44 @@ public class TableGroupChecker extends AbstractChecker {
         }
         }
     }
     }
 
 
+    private void appendEventAndFilter(Mapping mapping, TableGroup group) {
+        final List<FieldMapping> fieldMapping = group.getFieldMapping();
+
+        // 检查增量字段是否在映射关系中
+        String eventFieldName = mapping.getListener().getEventFieldName();
+        if (StringUtils.isNotBlank(eventFieldName)) {
+            Map<String, Field> fields = convert2Map(group.getSourceTable().getColumn());
+            addFieldMapping(fieldMapping, eventFieldName, fields);
+        }
+
+        // 检查过滤条件是否在映射关系中
+        List<Filter> filter = group.getFilter();
+        if (!CollectionUtils.isEmpty(filter)) {
+            Map<String, Field> fields = convert2Map(group.getSourceTable().getColumn());
+            filter.forEach(f -> addFieldMapping(fieldMapping, f.getName(), fields));
+        }
+
+    }
+
+    private void addFieldMapping(List<FieldMapping> fieldMapping, String name, Map<String, Field> fields) {
+        if (StringUtils.isNotBlank(name)) {
+            boolean exist = false;
+            for (FieldMapping m : fieldMapping) {
+                Field source = m.getSource();
+                if (null == source) {
+                    continue;
+                }
+                if (StringUtils.equals(source.getName(), name)) {
+                    exist = true;
+                    break;
+                }
+            }
+            if (!exist && null != fields.get(name)) {
+                fieldMapping.add(new FieldMapping(fields.get(name), null));
+            }
+        }
+    }
+
     private void mergeFieldMapping(TableGroup tableGroup) {
     private void mergeFieldMapping(TableGroup tableGroup) {
         List<Field> sCol = tableGroup.getSourceTable().getColumn();
         List<Field> sCol = tableGroup.getSourceTable().getColumn();
         List<Field> tCol = tableGroup.getTargetTable().getColumn();
         List<Field> tCol = tableGroup.getTargetTable().getColumn();
@@ -173,7 +215,7 @@ public class TableGroupChecker extends AbstractChecker {
      * 解析映射关系
      * 解析映射关系
      *
      *
      * @param tableGroup
      * @param tableGroup
-     * @param json [{"source":"id","target":"id"}]
+     * @param json       [{"source":"id","target":"id"}]
      * @return
      * @return
      */
      */
     private void setFieldMapping(TableGroup tableGroup, String json) {
     private void setFieldMapping(TableGroup tableGroup, String json) {
@@ -194,7 +236,13 @@ public class TableGroupChecker extends AbstractChecker {
                 row = mapping.getJSONObject(i);
                 row = mapping.getJSONObject(i);
                 s = sMap.get(row.getString("source"));
                 s = sMap.get(row.getString("source"));
                 t = tMap.get(row.getString("target"));
                 t = tMap.get(row.getString("target"));
-                t.setPk(row.getBoolean("pk"));
+                if (null == s && null == t) {
+                    continue;
+                }
+
+                if (null != t) {
+                    t.setPk(row.getBoolean("pk"));
+                }
                 list.add(new FieldMapping(s, t));
                 list.add(new FieldMapping(s, t));
             }
             }
             tableGroup.setFieldMapping(list);
             tableGroup.setFieldMapping(list);

+ 36 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/BaseServiceImpl.java

@@ -2,6 +2,10 @@ package org.dbsyncer.biz.impl;
 
 
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.enums.MetaEnum;
 import org.dbsyncer.parser.enums.MetaEnum;
+import org.dbsyncer.parser.enums.ModelEnum;
+import org.dbsyncer.parser.logger.LogService;
+import org.dbsyncer.parser.logger.LogType;
+import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.TableGroup;
@@ -13,6 +17,9 @@ public class BaseServiceImpl {
     @Autowired
     @Autowired
     protected Manager manager;
     protected Manager manager;
 
 
+    @Autowired
+    private LogService logService;
+
     /**
     /**
      * 驱动启停锁
      * 驱动启停锁
      */
      */
@@ -38,4 +45,33 @@ public class BaseServiceImpl {
         }
         }
     }
     }
 
 
+    protected void log(LogType log, ConfigModel model) {
+        if (null != model) {
+            // 新增连接器:知识库
+            logService.log(log, "%s%s:%s", log.getMessage(), log.getName(), model.getName());
+        }
+    }
+
+    protected void log(LogType log, Mapping mapping) {
+        if (null != mapping) {
+            // 新增驱动:知识库(全量)
+            String model = ModelEnum.getModelEnum(mapping.getModel()).getName();
+            logService.log(log, "%s%s:%s(%s)", log.getMessage(), log.getName(), mapping.getName(), model);
+        }
+    }
+
+    protected void log(LogType log, TableGroup tableGroup) {
+        if (null != tableGroup) {
+            Mapping mapping = manager.getMapping(tableGroup.getMappingId());
+            if (null != mapping) {
+                // 新增驱动知识库(全量)映射关系:[My_User] >> [My_User_Target]
+                String name = mapping.getName();
+                String model = ModelEnum.getModelEnum(mapping.getModel()).getName();
+                String s = tableGroup.getSourceTable().getName();
+                String t = tableGroup.getTargetTable().getName();
+                logService.log(log, "%s驱动%s(%s)%s:[%s] >> [%s]", log.getMessage(), name, model, log.getName(), s, t);
+            }
+        }
+    }
+
 }
 }

+ 11 - 8
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConnectorServiceImpl.java

@@ -4,10 +4,9 @@ import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.biz.ConnectorService;
 import org.dbsyncer.biz.ConnectorService;
 import org.dbsyncer.biz.checker.Checker;
 import org.dbsyncer.biz.checker.Checker;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.ConfigModel;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Connector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 
 
@@ -23,9 +22,7 @@ import java.util.stream.Collectors;
  * @date 2019/10/17 23:20
  * @date 2019/10/17 23:20
  */
  */
 @Service
 @Service
-public class ConnectorServiceImpl implements ConnectorService {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
+public class ConnectorServiceImpl extends BaseServiceImpl implements ConnectorService {
 
 
     @Autowired
     @Autowired
     private Manager manager;
     private Manager manager;
@@ -36,20 +33,26 @@ public class ConnectorServiceImpl implements ConnectorService {
     @Override
     @Override
     public String add(Map<String, String> params) {
     public String add(Map<String, String> params) {
         ConfigModel model = connectorChecker.checkAddConfigModel(params);
         ConfigModel model = connectorChecker.checkAddConfigModel(params);
+        log(LogType.ConnectorLog.INSERT, model);
+
         return manager.addConnector(model);
         return manager.addConnector(model);
     }
     }
 
 
     @Override
     @Override
     public String edit(Map<String, String> params) {
     public String edit(Map<String, String> params) {
         ConfigModel model = connectorChecker.checkEditConfigModel(params);
         ConfigModel model = connectorChecker.checkEditConfigModel(params);
+        log(LogType.ConnectorLog.UPDATE, model);
+
         return manager.editConnector(model);
         return manager.editConnector(model);
     }
     }
 
 
     @Override
     @Override
-    public boolean remove(String id) {
-        logger.info("remove:{}", id);
+    public String remove(String id) {
+        Connector connector = manager.getConnector(id);
+        log(LogType.ConnectorLog.DELETE, connector);
+
         manager.removeConnector(id);
         manager.removeConnector(id);
-        return true;
+        return "删除连接器成功!";
     }
     }
 
 
     @Override
     @Override

+ 22 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MappingServiceImpl.java

@@ -9,6 +9,7 @@ import org.dbsyncer.biz.vo.MetaVo;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.monitor.Monitor;
 import org.dbsyncer.monitor.Monitor;
 import org.dbsyncer.parser.enums.ModelEnum;
 import org.dbsyncer.parser.enums.ModelEnum;
+import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -42,6 +43,8 @@ public class MappingServiceImpl extends BaseServiceImpl implements MappingServic
     @Override
     @Override
     public String add(Map<String, String> params) {
     public String add(Map<String, String> params) {
         ConfigModel model = mappingChecker.checkAddConfigModel(params);
         ConfigModel model = mappingChecker.checkAddConfigModel(params);
+        log(LogType.MappingLog.INSERT, (Mapping) model);
+
         return manager.addMapping(model);
         return manager.addMapping(model);
     }
     }
 
 
@@ -52,6 +55,8 @@ public class MappingServiceImpl extends BaseServiceImpl implements MappingServic
         synchronized (LOCK) {
         synchronized (LOCK) {
             assertRunning(mapping.getMetaId());
             assertRunning(mapping.getMetaId());
             ConfigModel model = mappingChecker.checkEditConfigModel(params);
             ConfigModel model = mappingChecker.checkEditConfigModel(params);
+            log(LogType.MappingLog.UPDATE, (Mapping) model);
+
             return manager.editMapping(model);
             return manager.editMapping(model);
         }
         }
     }
     }
@@ -59,18 +64,28 @@ public class MappingServiceImpl extends BaseServiceImpl implements MappingServic
     @Override
     @Override
     public String remove(String id) {
     public String remove(String id) {
         Mapping mapping = assertMappingExist(id);
         Mapping mapping = assertMappingExist(id);
+        String metaId = mapping.getMetaId();
+        Meta meta = manager.getMeta(metaId);
         synchronized (LOCK) {
         synchronized (LOCK) {
-            assertRunning(mapping.getMetaId());
+            assertRunning(metaId);
+
+            // 删除数据
+            manager.clearData(metaId);
+            log(LogType.MetaLog.CLEAR, meta);
 
 
             // 删除meta
             // 删除meta
-            manager.removeMeta(mapping.getMetaId());
+            manager.removeMeta(metaId);
+            log(LogType.MetaLog.DELETE, meta);
 
 
             // 删除tableGroup
             // 删除tableGroup
             List<TableGroup> groupList = manager.getTableGroupAll(id);
             List<TableGroup> groupList = manager.getTableGroupAll(id);
             if (!CollectionUtils.isEmpty(groupList)) {
             if (!CollectionUtils.isEmpty(groupList)) {
                 groupList.forEach(t -> manager.removeTableGroup(t.getId()));
                 groupList.forEach(t -> manager.removeTableGroup(t.getId()));
             }
             }
+
+            // 删除驱动
             manager.removeMapping(id);
             manager.removeMapping(id);
+            log(LogType.MappingLog.DELETE, mapping);
         }
         }
         return "驱动删除成功";
         return "驱动删除成功";
     }
     }
@@ -104,7 +119,10 @@ public class MappingServiceImpl extends BaseServiceImpl implements MappingServic
             meta.getSuccess().set(0);
             meta.getSuccess().set(0);
             manager.editMeta(meta);
             manager.editMeta(meta);
 
 
+            // 启动
             manager.start(mapping);
             manager.start(mapping);
+
+            log(LogType.MappingLog.RUNNING, mapping);
         }
         }
         return "驱动启动成功";
         return "驱动启动成功";
     }
     }
@@ -117,6 +135,8 @@ public class MappingServiceImpl extends BaseServiceImpl implements MappingServic
                 throw new BizException("驱动已停止.");
                 throw new BizException("驱动已停止.");
             }
             }
             manager.close(mapping);
             manager.close(mapping);
+
+            log(LogType.MappingLog.STOP, mapping);
         }
         }
         return "驱动停止成功";
         return "驱动停止成功";
     }
     }

+ 43 - 8
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java

@@ -1,15 +1,19 @@
 package org.dbsyncer.biz.impl;
 package org.dbsyncer.biz.impl;
 
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
 import org.dbsyncer.biz.MonitorService;
 import org.dbsyncer.biz.MonitorService;
 import org.dbsyncer.biz.vo.DataVo;
 import org.dbsyncer.biz.vo.DataVo;
 import org.dbsyncer.biz.vo.LogVo;
 import org.dbsyncer.biz.vo.LogVo;
 import org.dbsyncer.biz.vo.MetaVo;
 import org.dbsyncer.biz.vo.MetaVo;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.monitor.Monitor;
 import org.dbsyncer.monitor.Monitor;
 import org.dbsyncer.parser.enums.ModelEnum;
 import org.dbsyncer.parser.enums.ModelEnum;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.storage.constant.ConfigConstant;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.BeanUtils;
@@ -17,9 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
 
 
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 /**
 /**
@@ -31,6 +33,7 @@ import java.util.stream.Collectors;
 public class MonitorServiceImpl implements MonitorService {
 public class MonitorServiceImpl implements MonitorService {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
+
     @Autowired
     @Autowired
     private Monitor monitor;
     private Monitor monitor;
 
 
@@ -53,8 +56,29 @@ public class MonitorServiceImpl implements MonitorService {
     }
     }
 
 
     @Override
     @Override
-    public List<DataVo> queryData(String id, int pageNum, int pageSize) {
-        Assert.hasText(id, "id不能为空.");
+    public String getDefaultMetaId(Map<String, String> params) {
+        String id = params.get(ConfigConstant.CONFIG_MODEL_ID);
+        if (StringUtils.isNotBlank(id)) {
+            return id;
+        }
+        return getDefaultMetaId();
+    }
+
+    @Override
+    public List<DataVo> queryData(Map<String, String> params) {
+        String id = params.get(ConfigConstant.CONFIG_MODEL_ID);
+        // 获取默认驱动元信息
+        if (StringUtils.isBlank(id)) {
+            id = getDefaultMetaId();
+        }
+
+        // 没有驱动
+        if (StringUtils.isBlank(id)) {
+            return Collections.EMPTY_LIST;
+        }
+
+        int pageNum = NumberUtils.toInt(params.get("pageNum"), 1);
+        int pageSize = NumberUtils.toInt(params.get("pageSize"), 20);
         List<DataVo> list = manager.queryData(id, pageNum, pageSize)
         List<DataVo> list = manager.queryData(id, pageNum, pageSize)
                 .stream()
                 .stream()
                 .map(m -> convert2Vo(m, DataVo.class))
                 .map(m -> convert2Vo(m, DataVo.class))
@@ -64,12 +88,16 @@ public class MonitorServiceImpl implements MonitorService {
 
 
     @Override
     @Override
     public String clearData(String id) {
     public String clearData(String id) {
+        Assert.hasText(id, "驱动不存在.");
         manager.clearData(id);
         manager.clearData(id);
         return "清空同步数据成功";
         return "清空同步数据成功";
     }
     }
 
 
     @Override
     @Override
-    public List<LogVo> queryLog(String type, int pageNum, int pageSize) {
+    public List<LogVo> queryLog(Map<String, String> params) {
+        String type = params.get(ConfigConstant.CONFIG_MODEL_TYPE);
+        int pageNum = NumberUtils.toInt(params.get("pageNum"), 1);
+        int pageSize = NumberUtils.toInt(params.get("pageSize"), 20);
         List<LogVo> list = manager.queryLog(type, pageNum, pageSize)
         List<LogVo> list = manager.queryLog(type, pageNum, pageSize)
                 .stream()
                 .stream()
                 .map(m -> convert2Vo(m, LogVo.class))
                 .map(m -> convert2Vo(m, LogVo.class))
@@ -78,8 +106,8 @@ public class MonitorServiceImpl implements MonitorService {
     }
     }
 
 
     @Override
     @Override
-    public String clearLog(String type) {
-        manager.clearLog(type);
+    public String clearLog() {
+        manager.clearLog();
         return "清空日志成功";
         return "清空日志成功";
     }
     }
 
 
@@ -98,4 +126,11 @@ public class MonitorServiceImpl implements MonitorService {
         return (T) JsonUtil.jsonToObj(json, clazz);
         return (T) JsonUtil.jsonToObj(json, clazz);
     }
     }
 
 
+    private String getDefaultMetaId() {
+        List<MetaVo> list = getMetaAll();
+        if (!CollectionUtils.isEmpty(list)) {
+            return list.get(0).getId();
+        }
+        return "";
+    }
 }
 }

+ 9 - 4
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/TableGroupServiceImpl.java

@@ -4,7 +4,7 @@ import org.dbsyncer.biz.TableGroupService;
 import org.dbsyncer.biz.checker.Checker;
 import org.dbsyncer.biz.checker.Checker;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.Field;
-import org.dbsyncer.manager.Manager;
+import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.TableGroup;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -25,9 +25,6 @@ public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroup
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
-    @Autowired
-    private Manager manager;
-
     @Autowired
     @Autowired
     private Checker tableGroupChecker;
     private Checker tableGroupChecker;
 
 
@@ -35,10 +32,13 @@ public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroup
     public String add(Map<String, String> params) {
     public String add(Map<String, String> params) {
         TableGroup model = (TableGroup) tableGroupChecker.checkAddConfigModel(params);
         TableGroup model = (TableGroup) tableGroupChecker.checkAddConfigModel(params);
         assertRunning(model);
         assertRunning(model);
+        log(LogType.TableGroupLog.INSERT, model);
+
         String id = manager.addTableGroup(model);
         String id = manager.addTableGroup(model);
 
 
         // 合并驱动公共字段
         // 合并驱动公共字段
         mergeMappingColumn(model.getMappingId());
         mergeMappingColumn(model.getMappingId());
+
         return id;
         return id;
     }
     }
 
 
@@ -46,6 +46,8 @@ public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroup
     public String edit(Map<String, String> params) {
     public String edit(Map<String, String> params) {
         TableGroup model = (TableGroup) tableGroupChecker.checkEditConfigModel(params);
         TableGroup model = (TableGroup) tableGroupChecker.checkEditConfigModel(params);
         assertRunning(model);
         assertRunning(model);
+        log(LogType.TableGroupLog.UPDATE, model);
+
         return manager.editTableGroup(model);
         return manager.editTableGroup(model);
     }
     }
 
 
@@ -54,9 +56,12 @@ public class TableGroupServiceImpl extends BaseServiceImpl implements TableGroup
         TableGroup tableGroup = manager.getTableGroup(id);
         TableGroup tableGroup = manager.getTableGroup(id);
         Assert.notNull(tableGroup, "tableGroup can not be null.");
         Assert.notNull(tableGroup, "tableGroup can not be null.");
         assertRunning(tableGroup);
         assertRunning(tableGroup);
+        log(LogType.TableGroupLog.DELETE, tableGroup);
 
 
         manager.removeTableGroup(id);
         manager.removeTableGroup(id);
+        // 合并驱动公共字段
         mergeMappingColumn(tableGroup.getMappingId());
         mergeMappingColumn(tableGroup.getMappingId());
+
         return true;
         return true;
     }
     }
 
 

+ 23 - 3
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.common.event;
 package org.dbsyncer.common.event;
 
 
 import java.util.List;
 import java.util.List;
+import java.util.Map;
 
 
 /**
 /**
  * @version 1.0.0
  * @version 1.0.0
@@ -10,18 +11,37 @@ import java.util.List;
 public interface Event {
 public interface Event {
 
 
     /**
     /**
-     * 数据变更事件
+     * 日志数据变更事件
      *
      *
      * @param tableName 表名
      * @param tableName 表名
      * @param event     事件
      * @param event     事件
      * @param before    变化前
      * @param before    变化前
      * @param after     变化后
      * @param after     变化后
      */
      */
-    void changedEvent(String tableName, String event, List<Object> before, List<Object> after);
+    void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after);
+
+    /**
+     * 定时数据变更事件
+     *
+     * @param tableGroupIndex
+     * @param event
+     * @param before
+     * @param after
+     */
+    void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after);
 
 
     /**
     /**
      * 写入增量点事件
      * 写入增量点事件
+     *
+     * @param map
+     */
+    void flushEvent(Map<String, String> map);
+
+    /**
+     * 异常事件
+     *
+     * @param e
      */
      */
-    void flushEvent();
+    void errorEvent(Exception e);
 
 
 }
 }

+ 7 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/CompareFilter.java

@@ -0,0 +1,7 @@
+package org.dbsyncer.connector;
+
+public interface CompareFilter {
+
+    boolean compare(String value, String filterValue);
+
+}

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -256,7 +256,7 @@ public abstract class AbstractDatabaseConnector implements Database {
                 }
                 }
             });
             });
             if (0 == update) {
             if (0 == update) {
-                throw new ConnectorException(String.format("执行%s操作失败:%s", event, data));
+                throw new ConnectorException(String.format("执行%s操作失败, 数据不存在", event));
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
             // 记录错误数据
             // 记录错误数据

+ 35 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/FilterEnum.java

@@ -1,5 +1,10 @@
 package org.dbsyncer.connector.enums;
 package org.dbsyncer.connector.enums;
 
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
+import org.dbsyncer.connector.CompareFilter;
+import org.dbsyncer.connector.ConnectorException;
+
 /**
 /**
  * 运算符表达式类型
  * 运算符表达式类型
  *
  *
@@ -12,37 +17,60 @@ public enum FilterEnum {
     /**
     /**
      * 等于
      * 等于
      */
      */
-    EQUAL("="),
+    EQUAL("=", (value, filterValue) -> StringUtils.equals(value, filterValue)),
     /**
     /**
      * 不等于
      * 不等于
      */
      */
-    NOT_EQUAL("!="),
+    NOT_EQUAL("!=", (value, filterValue) -> !StringUtils.equals(value, filterValue)),
     /**
     /**
      * 大于
      * 大于
      */
      */
-    GT(">"),
+    GT(">", (value, filterValue) -> NumberUtils.toInt(value) > NumberUtils.toInt(filterValue)),
     /**
     /**
      * 小于
      * 小于
      */
      */
-    LT("<"),
+    LT("<", (value, filterValue) -> NumberUtils.toInt(value) < NumberUtils.toInt(filterValue)),
     /**
     /**
      * 大于等于
      * 大于等于
      */
      */
-    GT_AND_EQUAL(">="),
+    GT_AND_EQUAL(">=", (value, filterValue) -> NumberUtils.toInt(value) >= NumberUtils.toInt(filterValue)),
     /**
     /**
      * 小于等于
      * 小于等于
      */
      */
-    LT_AND_EQUAL("<=");
+    LT_AND_EQUAL("<=", (value, filterValue) -> NumberUtils.toInt(value) <= NumberUtils.toInt(filterValue));
 
 
     // 运算符名称
     // 运算符名称
     private String name;
     private String name;
+    // 比较器
+    private CompareFilter compareFilter;
 
 
-    FilterEnum(String name) {
+    FilterEnum(String name, CompareFilter compareFilter) {
         this.name = name;
         this.name = name;
+        this.compareFilter = compareFilter;
+    }
+
+    /**
+     * 获取比较器
+     *
+     * @param filterName
+     * @return
+     * @throws ConnectorException
+     */
+    public static CompareFilter getCompareFilter(String filterName) throws ConnectorException {
+        for (FilterEnum e : FilterEnum.values()) {
+            if (StringUtils.equals(filterName, e.getName())) {
+                return e.getCompareFilter();
+            }
+        }
+        throw new ConnectorException(String.format("FilterEnum name \"%s\" does not exist.", filterName));
     }
     }
 
 
     public String getName() {
     public String getName() {
         return name;
         return name;
     }
     }
 
 
+    public CompareFilter getCompareFilter() {
+        return compareFilter;
+    }
+
 }
 }

+ 15 - 24
dbsyncer-listener/src/main/java/org/dbsyncer/listener/DefaultExtractor.java → dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -12,20 +12,14 @@ import java.util.concurrent.CopyOnWriteArrayList;
 /**
 /**
  * @version 1.0.0
  * @version 1.0.0
  * @Author AE86
  * @Author AE86
- * @Date 2020-05-12 20:35
+ * @Date 2020-05-25 22:35
  */
  */
-public abstract class DefaultExtractor implements Extractor {
+public abstract class AbstractExtractor implements Extractor {
 
 
     protected ConnectorConfig connectorConfig;
     protected ConnectorConfig connectorConfig;
     protected ListenerConfig listenerConfig;
     protected ListenerConfig listenerConfig;
     protected Map<String, String> map;
     protected Map<String, String> map;
-
     private List<Event> watcher;
     private List<Event> watcher;
-    private Action action;
-
-    public void run() {
-        action.execute(this);
-    }
 
 
     public void addListener(Event event) {
     public void addListener(Event event) {
         if (null != event) {
         if (null != event) {
@@ -43,43 +37,40 @@ public abstract class DefaultExtractor implements Extractor {
         }
         }
     }
     }
 
 
-    public void changedEvent(String tableName, String event, List<Object> before, List<Object> after) {
+    public void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after) {
+        if (!CollectionUtils.isEmpty(watcher)) {
+            watcher.forEach(w -> w.changedLogEvent(tableName, event, before, after));
+        }
+    }
+
+    public void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
         if (!CollectionUtils.isEmpty(watcher)) {
         if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.changedEvent(tableName, event, before, after));
+            watcher.forEach(w -> w.changedQuartzEvent(tableGroupIndex, event, before, after));
         }
         }
     }
     }
 
 
     public void flushEvent() {
     public void flushEvent() {
         if (!CollectionUtils.isEmpty(watcher)) {
         if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.flushEvent());
+            watcher.forEach(w -> w.flushEvent(map));
         }
         }
     }
     }
 
 
-    public ConnectorConfig getConnectorConfig() {
-        return connectorConfig;
+    public void errorEvent(Exception e) {
+        if (!CollectionUtils.isEmpty(watcher)) {
+            watcher.forEach(w -> w.errorEvent(e));
+        }
     }
     }
 
 
     public void setConnectorConfig(ConnectorConfig connectorConfig) {
     public void setConnectorConfig(ConnectorConfig connectorConfig) {
         this.connectorConfig = connectorConfig;
         this.connectorConfig = connectorConfig;
     }
     }
 
 
-    public ListenerConfig getListenerConfig() {
-        return listenerConfig;
-    }
-
     public void setListenerConfig(ListenerConfig listenerConfig) {
     public void setListenerConfig(ListenerConfig listenerConfig) {
         this.listenerConfig = listenerConfig;
         this.listenerConfig = listenerConfig;
     }
     }
 
 
-    public Map<String, String> getMap() {
-        return map;
-    }
-
     public void setMap(Map<String, String> map) {
     public void setMap(Map<String, String> map) {
         this.map = map;
         this.map = map;
     }
     }
 
 
-    public void setAction(Action action) {
-        this.action = action;
-    }
 }
 }

+ 0 - 6
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Action.java

@@ -1,6 +0,0 @@
-package org.dbsyncer.listener;
-
-public interface Action {
-
-    void execute(Extractor extractor);
-}

+ 3 - 8
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -3,18 +3,13 @@ package org.dbsyncer.listener;
 public interface Extractor {
 public interface Extractor {
 
 
     /**
     /**
-     * 根据日志/事件等抽取
+     * 启动定时/日志等方式抽取增量数据
      */
      */
-    void extract();
-
-    /**
-     * 定时抽取
-     */
-    void extractTiming();
+    void start();
 
 
     /**
     /**
      * 关闭任务
      * 关闭任务
      */
      */
     void close();
     void close();
 
 
-}
+}

+ 2 - 17
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Listener.java

@@ -1,22 +1,7 @@
 package org.dbsyncer.listener;
 package org.dbsyncer.listener;
 
 
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.listener.config.ListenerConfig;
-
-import java.util.Map;
-
 public interface Listener {
 public interface Listener {
 
 
-    /**
-     * 创建抽取器
-     *
-     * @param connectorConfig 连接器配置
-     * @param listenerConfig  监听器配置
-     * @param map             增量参数
-     * @return
-     * @throws IllegalAccessException
-     * @throws InstantiationException
-     */
-    DefaultExtractor createExtractor(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map)
-            throws IllegalAccessException, InstantiationException;
+    <T> T getExtractor(String type, Class<T> valueType) throws IllegalAccessException, InstantiationException;
+
 }
 }

+ 4 - 15
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java

@@ -1,26 +1,15 @@
 package org.dbsyncer.listener;
 package org.dbsyncer.listener;
 
 
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerEnum;
 import org.dbsyncer.listener.enums.ListenerEnum;
-import org.dbsyncer.listener.enums.ListenerTypeEnum;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
-import java.util.Map;
-
 @Component
 @Component
 public class ListenerFactory implements Listener {
 public class ListenerFactory implements Listener {
 
 
     @Override
     @Override
-    public DefaultExtractor createExtractor(ConnectorConfig config, ListenerConfig listenerConfig, Map<String, String> map)
-            throws IllegalAccessException, InstantiationException {
-        Class<DefaultExtractor> clazz = (Class<DefaultExtractor>) ListenerEnum.getExtractor(config.getConnectorType());
-        DefaultExtractor extractor = clazz.newInstance();
-        // log/timing
-        extractor.setAction(ListenerTypeEnum.getAction(listenerConfig.getListenerType()));
-        extractor.setConnectorConfig(config);
-        extractor.setListenerConfig(listenerConfig);
-        extractor.setMap(map);
-        return extractor;
+    public <T> T getExtractor(String type, Class<T> valueType) throws IllegalAccessException, InstantiationException {
+        Class<T> clazz = (Class<T>) ListenerEnum.getExtractor(type);
+        return clazz.newInstance();
     }
     }
+
 }
 }

+ 16 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/config/ListenerConfig.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.listener.config;
 package org.dbsyncer.listener.config;
 
 
+import org.dbsyncer.listener.enums.ListenerTypeEnum;
+
 /**
 /**
  * @author AE86
  * @author AE86
  * @version 1.0.0
  * @version 1.0.0
@@ -9,9 +11,15 @@ public class ListenerConfig {
 
 
     /**
     /**
      * 监听器类型
      * 监听器类型
+     * {@link ListenerTypeEnum}
      */
      */
     private String listenerType;
     private String listenerType;
 
 
+    /**
+     * 每次读取数
+     */
+    private int readNum = 200;
+
     // 定时表达式, 格式: [秒] [分] [小时] [日] [月] [周]
     // 定时表达式, 格式: [秒] [分] [小时] [日] [月] [周]
     private String cronExpression = "*/30 * * * * ?";
     private String cronExpression = "*/30 * * * * ?";
 
 
@@ -45,6 +53,14 @@ public class ListenerConfig {
         this.listenerType = listenerType;
         this.listenerType = listenerType;
     }
     }
 
 
+    public int getReadNum() {
+        return readNum;
+    }
+
+    public void setReadNum(int readNum) {
+        this.readNum = readNum;
+    }
+
     public String getCronExpression() {
     public String getCronExpression() {
         return cronExpression;
         return cronExpression;
     }
     }

+ 7 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java

@@ -2,8 +2,9 @@ package org.dbsyncer.listener.enums;
 
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.ConnectorEnum;
+import org.dbsyncer.listener.quartz.QuartzExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.ListenerException;
-import org.dbsyncer.listener.extractor.MysqlExtractor;
+import org.dbsyncer.listener.mysql.MysqlExtractor;
 
 
 /**
 /**
  * @author AE86
  * @author AE86
@@ -12,11 +13,14 @@ import org.dbsyncer.listener.extractor.MysqlExtractor;
  */
  */
 public enum ListenerEnum {
 public enum ListenerEnum {
 
 
+    /**
+     * 定时
+     */
+    DEFAULT(ListenerTypeEnum.TIMING.getType(), QuartzExtractor.class),
     /**
     /**
      * Mysql
      * Mysql
      */
      */
-    MYSQL(ConnectorEnum.MYSQL.getType(), MysqlExtractor.class),
-    ;
+    MYSQL(ConnectorEnum.MYSQL.getType(), MysqlExtractor.class);
 
 
     private String type;
     private String type;
     private Class<?> clazz;
     private Class<?> clazz;

+ 11 - 19
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerTypeEnum.java

@@ -1,8 +1,6 @@
 package org.dbsyncer.listener.enums;
 package org.dbsyncer.listener.enums;
 
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
-import org.dbsyncer.listener.Action;
-import org.dbsyncer.listener.ListenerException;
 
 
 /**
 /**
  * @author AE86
  * @author AE86
@@ -12,36 +10,30 @@ import org.dbsyncer.listener.ListenerException;
 public enum ListenerTypeEnum {
 public enum ListenerTypeEnum {
 
 
     /**
     /**
-     * 日志
+     * 定时
      */
      */
-    LOG("log", extractor -> extractor.extract()),
+    TIMING("timing"),
     /**
     /**
-     * 定时
+     * 日志
      */
      */
-    TIMING("timing", extractor -> extractor.extractTiming());
+    LOG("log");
 
 
     private String type;
     private String type;
-    private Action action;
 
 
-    ListenerTypeEnum(String type, Action action) {
+    ListenerTypeEnum(String type) {
         this.type = type;
         this.type = type;
-        this.action = action;
     }
     }
 
 
-    public static Action getAction(String type) throws ListenerException {
-        for (ListenerTypeEnum e : ListenerTypeEnum.values()) {
-            if (StringUtils.equals(type, e.getType())) {
-                return e.getAction();
-            }
-        }
-        throw new ListenerException(String.format("Action type \"%s\" does not exist.", type));
+    public static boolean isTiming(String type) {
+        return StringUtils.equals(TIMING.getType(), type);
+    }
+
+    public static boolean isLog(String type) {
+        return StringUtils.equals(LOG.getType(), type);
     }
     }
 
 
     public String getType() {
     public String getType() {
         return type;
         return type;
     }
     }
 
 
-    public Action getAction() {
-        return action;
-    }
 }
 }

+ 201 - 203
dbsyncer-listener/src/main/java/org/dbsyncer/listener/extractor/MysqlExtractor.java → dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -1,204 +1,202 @@
-package org.dbsyncer.listener.extractor;
-
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.commons.lang.StringUtils;
-import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.constant.ConnectorConstant;
-import org.dbsyncer.listener.DefaultExtractor;
-import org.dbsyncer.listener.ListenerException;
-import org.dbsyncer.listener.config.Host;
-import org.dbsyncer.listener.mysql.binlog.*;
-import org.dbsyncer.listener.mysql.binlog.impl.event.*;
-import org.dbsyncer.listener.mysql.common.glossary.Column;
-import org.dbsyncer.listener.mysql.common.glossary.Pair;
-import org.dbsyncer.listener.mysql.common.glossary.Row;
-import org.dbsyncer.listener.mysql.common.glossary.column.StringColumn;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-
-import java.util.*;
-import java.util.regex.Matcher;
-
-import static java.util.regex.Pattern.compile;
-
-/**
- * @version 1.0.0
- * @Author AE86
- * @Date 2020-05-12 21:14
- */
-public class MysqlExtractor extends DefaultExtractor {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    private static final String BINLOG_FILENAME = "fileName";
-    private static final String BINLOG_POSITION = "position";
-    private BinlogRemoteClient client;
-    private List<Host> cluster;
-    private int master = 0;
-
-    @Override
-    public void extract() {
-        try {
-            final DatabaseConfig config = (DatabaseConfig) connectorConfig;
-            cluster = readNodes(config.getUrl());
-            Assert.notEmpty(cluster, "Mysql连接地址有误.");
-
-            final Host host = cluster.get(master);
-            final String username = config.getUsername();
-            final String password = config.getPassword();
-            // mysql-binlog-127.0.0.1:3306-654321
-            final String threadSuffixName = new StringBuilder("mysql-binlog-")
-                    .append(host.getIp()).append(":").append(host.getPort()).append("-")
-                    .append(RandomStringUtils.randomNumeric(6))
-                    .toString();
-
-            client = new BinlogRemoteClient(host.getIp(), host.getPort(), username, password, threadSuffixName);
-            client.setBinlogFileName(map.get(BINLOG_FILENAME));
-            String pos = map.get(BINLOG_POSITION);
-            client.setBinlogPosition(StringUtils.isBlank(pos) ? 0 : Long.parseLong(pos));
-            client.setBinlogEventListener(new MysqlEventListener());
-            client.start();
-        } catch (Exception e) {
-            logger.error("启动失败:{}", e.getMessage());
-            throw new ListenerException(e);
-        }
-    }
-
-    @Override
-    public void extractTiming() {
-
-    }
-
-    @Override
-    public void close() {
-        try {
-            client.stopQuietly();
-        } catch (Exception e) {
-            logger.error("关闭失败:{}", e.getMessage());
-        }
-    }
-
-    private List<Host> readNodes(String url) {
-        if (StringUtils.isBlank(url)) {
-            return Collections.EMPTY_LIST;
-        }
-        Matcher matcher = compile("(//)(?!(/)).+?(/)").matcher(url);
-        while (matcher.find()) {
-            url = matcher.group(0);
-            break;
-        }
-        url = StringUtils.replace(url, "/", "");
-
-        List<Host> cluster = new ArrayList<>();
-        String[] arr = StringUtils.split(url, ",");
-        int size = arr.length;
-        for (int i = 0; i < size; i++) {
-            String[] host = StringUtils.split(arr[i], ":");
-            if (2 == host.length) {
-                cluster.add(new Host(host[0], Integer.parseInt(host[1])));
-            }
-        }
-        return cluster;
-    }
-
-    /**
-     * 有变化触发刷新binlog增量事件
-     *
-     * @param event
-     */
-    private void refresh(AbstractBinlogEventV4 event) {
-        String binlogFilename = event.getBinlogFilename();
-        long nextPosition = event.getHeader().getNextPosition();
-
-        // binlogFileName
-        if (StringUtils.isNotBlank(binlogFilename) && !StringUtils.equals(binlogFilename, client.getBinlogFileName())) {
-            client.setBinlogFileName(binlogFilename);
-        }
-        client.setBinlogPosition(nextPosition);
-
-        // nextPosition
-        map.put(BINLOG_FILENAME, client.getBinlogFileName());
-        map.put(BINLOG_POSITION, String.valueOf(client.getBinlogPosition()));
-        flushEvent();
-    }
-
-    final class MysqlEventListener implements BinlogEventListener {
-
-        private Map<Long, String> table = new HashMap<>();
-
-        @Override
-        public void onEvents(BinlogEventV4 event) {
-            if (event == null) {
-                logger.error("binlog event is null");
-                return;
-            }
-
-            if (event instanceof TableMapEvent) {
-                TableMapEvent tableEvent = (TableMapEvent) event;
-                table.putIfAbsent(tableEvent.getTableId(), tableEvent.getTableName().toString());
-                return;
-            }
-
-            if (event instanceof UpdateRowsEventV2) {
-                UpdateRowsEventV2 e = (UpdateRowsEventV2) event;
-                final String tableName = table.get(e.getTableId());
-                List<Pair<Row>> rows = e.getRows();
-                for (Pair<Row> p : rows) {
-                    List<Object> before = new ArrayList<>();
-                    List<Object> after = new ArrayList<>();
-                    addAll(before, p.getBefore().getColumns());
-                    addAll(after, p.getAfter().getColumns());
-                    changedEvent(tableName, ConnectorConstant.OPERTION_UPDATE, before, after);
-                    break;
-                }
-                return;
-            }
-
-            if (event instanceof WriteRowsEventV2) {
-                WriteRowsEventV2 e = (WriteRowsEventV2) event;
-                final String tableName = table.get(e.getTableId());
-                List<Row> rows = e.getRows();
-                for (Row row : rows) {
-                    List<Object> after = new ArrayList<>();
-                    addAll(after, row.getColumns());
-                    changedEvent(tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after);
-                    break;
-                }
-                return;
-            }
-
-            if (event instanceof DeleteRowsEventV2) {
-                DeleteRowsEventV2 e = (DeleteRowsEventV2) event;
-                final String tableName = table.get(e.getTableId());
-                List<Row> rows = e.getRows();
-                for (Row row : rows) {
-                    List<Object> before = new ArrayList<>();
-                    addAll(before, row.getColumns());
-                    changedEvent(tableName, ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST);
-                    break;
-                }
-                return;
-            }
-
-            // 处理事件优先级:RotateEvent > FormatDescriptionEvent > TableMapEvent > RowsEvent > XidEvent
-            if (event instanceof XidEvent) {
-                refresh((XidEvent) event);
-                return;
-            }
-
-            // 切换binlog
-            if (event instanceof RotateEvent) {
-                refresh((RotateEvent) event);
-                return;
-            }
-
-        }
-
-        private void addAll(List<Object> before, List<Column> columns) {
-            columns.forEach(c -> before.add((c instanceof StringColumn) ? c.toString() : c.getValue()));
-        }
-
-    }
-
+package org.dbsyncer.listener.mysql;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.ListenerException;
+import org.dbsyncer.listener.config.Host;
+import org.dbsyncer.listener.mysql.binlog.BinlogEventListener;
+import org.dbsyncer.listener.mysql.binlog.BinlogEventV4;
+import org.dbsyncer.listener.mysql.binlog.BinlogRemoteClient;
+import org.dbsyncer.listener.mysql.binlog.impl.event.*;
+import org.dbsyncer.listener.mysql.common.glossary.Column;
+import org.dbsyncer.listener.mysql.common.glossary.Pair;
+import org.dbsyncer.listener.mysql.common.glossary.Row;
+import org.dbsyncer.listener.mysql.common.glossary.column.StringColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+import java.util.*;
+import java.util.regex.Matcher;
+
+import static java.util.regex.Pattern.compile;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-12 21:14
+ */
+public class MysqlExtractor extends AbstractExtractor {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private static final String BINLOG_FILENAME = "fileName";
+    private static final String BINLOG_POSITION = "position";
+    private BinlogRemoteClient client;
+    private List<Host> cluster;
+    private int master = 0;
+
+    @Override
+    public void start() {
+        try {
+            final DatabaseConfig config = (DatabaseConfig) connectorConfig;
+            cluster = readNodes(config.getUrl());
+            Assert.notEmpty(cluster, "Mysql连接地址有误.");
+
+            final Host host = cluster.get(master);
+            final String username = config.getUsername();
+            final String password = config.getPassword();
+            // mysql-binlog-127.0.0.1:3306-654321
+            final String threadSuffixName = new StringBuilder("mysql-binlog-")
+                    .append(host.getIp()).append(":").append(host.getPort()).append("-")
+                    .append(RandomStringUtils.randomNumeric(6))
+                    .toString();
+
+            client = new BinlogRemoteClient(host.getIp(), host.getPort(), username, password, threadSuffixName);
+            client.setBinlogFileName(map.get(BINLOG_FILENAME));
+            String pos = map.get(BINLOG_POSITION);
+            client.setBinlogPosition(StringUtils.isBlank(pos) ? 0 : Long.parseLong(pos));
+            client.setBinlogEventListener(new MysqlEventListener());
+            client.start();
+        } catch (Exception e) {
+            logger.error("启动失败:{}", e.getMessage());
+            throw new ListenerException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            if(null != client){
+                client.stopQuietly();
+            }
+        } catch (Exception e) {
+            logger.error("关闭失败:{}", e.getMessage());
+        }
+    }
+
+    private List<Host> readNodes(String url) {
+        if (StringUtils.isBlank(url)) {
+            return Collections.EMPTY_LIST;
+        }
+        Matcher matcher = compile("(//)(?!(/)).+?(/)").matcher(url);
+        while (matcher.find()) {
+            url = matcher.group(0);
+            break;
+        }
+        url = StringUtils.replace(url, "/", "");
+
+        List<Host> cluster = new ArrayList<>();
+        String[] arr = StringUtils.split(url, ",");
+        int size = arr.length;
+        for (int i = 0; i < size; i++) {
+            String[] host = StringUtils.split(arr[i], ":");
+            if (2 == host.length) {
+                cluster.add(new Host(host[0], Integer.parseInt(host[1])));
+            }
+        }
+        return cluster;
+    }
+
+    /**
+     * 有变化触发刷新binlog增量事件
+     *
+     * @param event
+     */
+    private void refresh(AbstractBinlogEventV4 event) {
+        String binlogFilename = event.getBinlogFilename();
+        long nextPosition = event.getHeader().getNextPosition();
+
+        // binlogFileName
+        if (StringUtils.isNotBlank(binlogFilename) && !StringUtils.equals(binlogFilename, client.getBinlogFileName())) {
+            client.setBinlogFileName(binlogFilename);
+        }
+        client.setBinlogPosition(nextPosition);
+
+        // nextPosition
+        map.put(BINLOG_FILENAME, client.getBinlogFileName());
+        map.put(BINLOG_POSITION, String.valueOf(client.getBinlogPosition()));
+    }
+
+    final class MysqlEventListener implements BinlogEventListener {
+
+        private Map<Long, String> table = new HashMap<>();
+
+        @Override
+        public void onEvents(BinlogEventV4 event) {
+            if (event == null) {
+                logger.error("binlog event is null");
+                return;
+            }
+
+            if (event instanceof TableMapEvent) {
+                TableMapEvent tableEvent = (TableMapEvent) event;
+                table.putIfAbsent(tableEvent.getTableId(), tableEvent.getTableName().toString());
+                return;
+            }
+
+            if (event instanceof UpdateRowsEventV2) {
+                UpdateRowsEventV2 e = (UpdateRowsEventV2) event;
+                final String tableName = table.get(e.getTableId());
+                List<Pair<Row>> rows = e.getRows();
+                for (Pair<Row> p : rows) {
+                    List<Object> before = new ArrayList<>();
+                    List<Object> after = new ArrayList<>();
+                    addAll(before, p.getBefore().getColumns());
+                    addAll(after, p.getAfter().getColumns());
+                    changedLogEvent(tableName, ConnectorConstant.OPERTION_UPDATE, before, after);
+                    break;
+                }
+                return;
+            }
+
+            if (event instanceof WriteRowsEventV2) {
+                WriteRowsEventV2 e = (WriteRowsEventV2) event;
+                final String tableName = table.get(e.getTableId());
+                List<Row> rows = e.getRows();
+                for (Row row : rows) {
+                    List<Object> after = new ArrayList<>();
+                    addAll(after, row.getColumns());
+                    changedLogEvent(tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after);
+                    break;
+                }
+                return;
+            }
+
+            if (event instanceof DeleteRowsEventV2) {
+                DeleteRowsEventV2 e = (DeleteRowsEventV2) event;
+                final String tableName = table.get(e.getTableId());
+                List<Row> rows = e.getRows();
+                for (Row row : rows) {
+                    List<Object> before = new ArrayList<>();
+                    addAll(before, row.getColumns());
+                    changedLogEvent(tableName, ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST);
+                    break;
+                }
+                return;
+            }
+
+            // 处理事件优先级:RotateEvent > FormatDescriptionEvent > TableMapEvent > RowsEvent > XidEvent
+            if (event instanceof XidEvent) {
+                refresh((XidEvent) event);
+                return;
+            }
+
+            // 切换binlog
+            if (event instanceof RotateEvent) {
+                refresh((RotateEvent) event);
+                return;
+            }
+
+        }
+
+        private void addAll(List<Object> before, List<Column> columns) {
+            columns.forEach(c -> before.add((c instanceof StringColumn) ? c.toString() : c.getValue()));
+        }
+
+    }
+
 }
 }

+ 130 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java

@@ -0,0 +1,130 @@
+package org.dbsyncer.listener.quartz;
+
+import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.UUIDUtil;
+import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.listener.AbstractExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * 默认定时抽取
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-12 20:35
+ */
+public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJob {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private ConnectorFactory connectorFactory;
+    private ScheduledTaskService scheduledTaskService;
+    private List<Map<String, String>> commands;
+    private int commandSize;
+
+    private int readNum;
+    private String eventFieldName;
+    private Set<String> update;
+    private Set<String> insert;
+    private Set<String> delete;
+    private String key;
+    private String cron;
+    private AtomicBoolean running;
+
+    @Override
+    public void start() {
+        init();
+        scheduledTaskService.start(key, cron, this);
+        logger.info("启动定时任务:{} >> {}", key, cron);
+    }
+
+    @Override
+    public void run() {
+        try {
+            logger.info("执行定时任务:{} >> {}", key, cron);
+            if (running.compareAndSet(false, true)) {
+                // 依次执行同步映射关系
+                for (int i = 0; i < commandSize; i++) {
+                    execute(commands.get(i), i);
+                }
+            }
+            running.compareAndSet(true, false);
+        } catch (Exception e) {
+            running.compareAndSet(true, false);
+            errorEvent(e);
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Override
+    public void close() {
+        scheduledTaskService.stop(key);
+    }
+
+    private void execute(Map<String, String> command, int index) {
+        int pageIndex = 1;
+        for (; ; ) {
+            Result reader = connectorFactory.reader(connectorConfig, command, pageIndex++, readNum);
+            List<Map<String, Object>> data = reader.getData();
+            if (CollectionUtils.isEmpty(data)) {
+                break;
+            }
+
+            Object event = null;
+            for (Map<String, Object> row : data) {
+                event = row.get(eventFieldName);
+                if (update.contains(event)) {
+                    changedQuartzEvent(index, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_MAP, row);
+                    continue;
+                }
+                if (insert.contains(event)) {
+                    changedQuartzEvent(index, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_MAP, row);
+                    continue;
+                }
+                if (delete.contains(event)) {
+                    changedQuartzEvent(index, ConnectorConstant.OPERTION_DELETE, row, Collections.EMPTY_MAP);
+                    continue;
+                }
+
+            }
+        }
+
+    }
+
+    private void init() {
+        commandSize = commands.size();
+
+        readNum = listenerConfig.getReadNum();
+        eventFieldName = listenerConfig.getEventFieldName();
+        update = Stream.of(listenerConfig.getUpdate().split(",")).collect(Collectors.toSet());
+        insert = Stream.of(listenerConfig.getInsert().split(",")).collect(Collectors.toSet());
+        delete = Stream.of(listenerConfig.getDelete().split(",")).collect(Collectors.toSet());
+
+        key = UUIDUtil.getUUID();
+        cron = listenerConfig.getCronExpression();
+        running = new AtomicBoolean();
+    }
+
+    public void setConnectorFactory(ConnectorFactory connectorFactory) {
+        this.connectorFactory = connectorFactory;
+    }
+
+    public void setScheduledTaskService(ScheduledTaskService scheduledTaskService) {
+        this.scheduledTaskService = scheduledTaskService;
+    }
+
+    public void setCommands(List<Map<String, String>> commands) {
+        this.commands = commands;
+    }
+}

+ 10 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskJob.java

@@ -0,0 +1,10 @@
+package org.dbsyncer.listener.quartz;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-24 22:18
+ */
+public interface ScheduledTaskJob extends Runnable {
+
+}

+ 22 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskService.java

@@ -0,0 +1,22 @@
+package org.dbsyncer.listener.quartz;
+
+public interface ScheduledTaskService {
+
+    /**
+     * 第一位,表示秒,取值0-59
+     * 第二位,表示分,取值0-59
+     * 第三位,表示小时,取值0-23
+     * 第四位,日期天/日,取值1-31
+     * 第五位,日期月份,取值1-12
+     * 第六位,星期,取值1-7
+     * [秒 分 时 日 月 星期]
+     *
+     * @param key 任务唯一key
+     * @param cron 任务表达式
+     * @param job 任务实现
+     */
+    void start(String key, String cron, ScheduledTaskJob job);
+
+    void stop(String key);
+
+}

+ 53 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskServiceImpl.java

@@ -0,0 +1,53 @@
+package org.dbsyncer.listener.quartz;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import org.springframework.scheduling.support.CronTrigger;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-24 22:06
+ */
+@Component
+public class ScheduledTaskServiceImpl implements ScheduledTaskService {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * 定时任务线程池
+     */
+    @Autowired
+    private ThreadPoolTaskScheduler taskScheduler;
+
+    private Map<String, ScheduledFuture> map = new ConcurrentHashMap<>();
+
+    @Override
+    public void start(String key, String cron, ScheduledTaskJob job) {
+        //校验任务key是否已经启动
+        final ScheduledFuture scheduledFuture = map.get(key);
+        if (null != scheduledFuture && !scheduledFuture.isCancelled()) {
+            logger.warn(">>>>>> 当前任务已经启动,无需重复启动!");
+            return;
+        }
+        //获取需要定时调度的接口
+        map.putIfAbsent(key, taskScheduler.schedule(job, (trigger) -> new CronTrigger(cron).nextExecutionTime(trigger) ));
+    }
+
+    @Override
+    public void stop(String key) {
+        ScheduledFuture job = map.get(key);
+        if (null != job) {
+            logger.info(">>>>>> 进入停止任务 {}  >>>>>>", key);
+            job.cancel(true);
+        }
+    }
+
+}

+ 0 - 352
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/TaskQuartzHandle.java

@@ -1,352 +0,0 @@
-package org.dbsyncer.listener.quartz;/*
-package org.dbsyncer.connector.quartz;
-
-import org.apache.commons.lang.StringUtils;
-import org.dbsyncer.common.constant.ConnectorConstant;
-import org.dbsyncer.common.constant.MappingConstant;
-import org.dbsyncer.common.entity.DatabaseConfig;
-import org.dbsyncer.common.entity.Mapping;
-import org.dbsyncer.common.entity.MappingTask;
-import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.connector.database.Database;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.jdbc.core.JdbcTemplate;
-
-import java.sql.Timestamp;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-public final class TaskQuartzHandle {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    // 正在运行的job
-    private static Map<String, Boolean> running = new ConcurrentHashMap<String, Boolean>();
-
-    */
-/**
- * <ul>
- * <dt>即将面临的问题如下:</dt>
- * <dl>
- * <li>1.如果上一次任务未处理完成,而当前又产生了新的任务,此时多个任务可能会在同一时间执行相同工作,会造成重复数据.</li>
- * <li>2.如果上一次任务未处理完成,而当前又产生了新的任务,长时间下去,可能会造成任务堆积,影响系统性能.</li>
- * </dl>
- * </ul>
- * <ul>
- * <dt>相应的解决方法:</dt>
- * <dl>
- * <li>1.创建集合,存放执行的任务</li>
- * <li>2.从集合里检查上一次任务存在,继续执行.</li>
- * <li>3.从集合里检查上一次任务不存在,拒绝当前任务.</li>
- * </dl>
- * </ul>
- *
- * @param taskId
- * <p>
- * 组装数据格式
- * @param msg
- * @param eventType
- * @param beforeArr
- * @param afterArr
- * @throws JSONException
- * <p>
- * 解析操作事件,用","分割成map集合
- * @param event
- * @param filter
- * <p>
- * 根据增量策略过滤数据
- * @param inc  增量策略
- * @param rows 数据
- * @return 过滤数据
- * <p>
- * 刷新最后记录点
- * @param task
- * @param inc
- * @param scnPos
- * <p>
- * 组装数据格式
- * @param msg
- * @param eventType
- * @param beforeArr
- * @param afterArr
- * @throws JSONException
- * <p>
- * 解析操作事件,用","分割成map集合
- * @param event
- * @param filter
- * <p>
- * 根据增量策略过滤数据
- * @param inc  增量策略
- * @param rows 数据
- * @return 过滤数据
- * <p>
- * 刷新最后记录点
- * @param task
- * @param inc
- * @param scnPos
- *//*
-
-    public void handle(String taskId) {
-        // 1.检查是否存在
-        Boolean flg = running.get(taskId);
-        if (null != flg && flg) {
-            logger.error("the task ID \"" + taskId + "\" is running.");
-            return;
-        }
-        // 2.添加驱动job
-        running.put(taskId, true);
-        try {
-            // 3.获取驱动配置
-//            MappingTask task = data.getMapping(taskId);
-//            // 4.校验是否驱动配置为空
-//            if (null == task) {
-//                running.remove(taskId);
-//                logger.error("the task ID \"" + taskId + "\" can not be null.");
-//                return;
-//            }
-//            // 5.提交任务
-//            this.submit(task);
-        } catch (Exception e) {
-            logger.error(e.getClass() + " >> " + e.getLocalizedMessage());
-        } finally {
-            // 6.执行结束,移除正在执行的job
-            running.remove(taskId);
-        }
-    }
-
-    private void submit(MappingTask task) {
-        Mapping mapping = task.getSourceMapping();
-        Database connector = ConnectorFactory.getInstance().getConnector(mapping.getConnector(), Database.class);
-
-        // 1.建立连接
-        JdbcTemplate jdbcTemplate = null;
-        try {
-            DatabaseConfig config = (DatabaseConfig) task.getSourceMapping().getConfig();
-            jdbcTemplate = connector.getJdbcTemplate(config);
-            // 2.解析最新的数据
-            JSONArray msg = this.pull(jdbcTemplate, task);
-
-            // 3.将增量消息发送给manager处理
-            if (null != msg && 0 < msg.length()) {
-                JSONObject t = new JSONObject();
-                t.put("taskId", task.getId());
-                t.put("msg", msg);
-//                manager.handle(t);
-            }
-        } catch (Exception e) {
-            logger.error(e.getClass() + " >> " + e.getLocalizedMessage());
-        } finally {
-            if (null != connector) {
-                // 断开连接
-                connector.close(jdbcTemplate);
-            }
-        }
-    }
-
-    private JSONArray pull(JdbcTemplate jdbcTemplate, MappingTask task) {
-        // 1. 获取执行命令
-        Map<String, String> executeCommond = task.getSourceMapping().getExecuteCommond();
-
-        // 2. 获取最近记录点
-        // 获取最近时间 SELECT MAX(LASTDATE) FROM ASD_TEST;
-        String queryMax = executeCommond.get(ConnectorConstant.OPERTION_QUERY_QUARTZ_MAX);
-        Timestamp scnPos = this.getScnPos(jdbcTemplate, queryMax);
-        // 没有扫描到数据直接返回
-        if (null == scnPos) {
-            return null;
-        }
-
-        // 3.读取最近记录点
-        Map<String, Map<String, String>> po = task.getPolicy();
-        Map<String, String> inc = po.get(MappingConstant.POLICY_INCREMENT);
-        String lastScnPosStr = inc.get("scnPos");
-        String sql = executeCommond.get(ConnectorConstant.OPERTION_QUERY_QUARTZ);
-        Object[] args = null;
-        // 4.没有记录,证明是首次读取数据,查询>=lastScnPosStr的所有数据
-        if (StringUtils.isBlank(lastScnPosStr) || StringUtils.equals("null", StringUtils.trim(lastScnPosStr))) {
-            sql += executeCommond.get(ConnectorConstant.OPERTION_QUERY_QUARTZ_ALL);
-            args = new Object[]{scnPos};
-        } else {
-            // 5. 防止时间戳格式不正确
-            Timestamp lastScnPos = null;
-            try {
-                lastScnPos = Timestamp.valueOf(lastScnPosStr);
-            } catch (Exception e1) {
-                // 如果时间戳格式不正确,则设置为空,下一次读取重新读取最新的记录点
-                this.refreshScnPos(task, inc, null);
-                return null;
-            }
-
-            // 6. 如果最近记录时间<=上次记录点,证明没有新增数据,直接结束.
-            if (null == lastScnPos || scnPos.getTime() <= lastScnPos.getTime()) {
-                return null;
-            }
-            sql += executeCommond.get(ConnectorConstant.OPERTION_QUERY_QUARTZ_RANGE);
-            args = new Object[]{lastScnPos, scnPos};
-        }
-
-        // 7.刷新最后记录点
-        this.refreshScnPos(task, inc, scnPos.toString());
-
-        // 8.获取增量语句  SELECT ASD_TEST.* FROM ASD_TEST LASTDATE > ? AND LASTDATE <= ?
-        List<Map<String, Object>> rows = jdbcTemplate.queryForList(sql, args);
-        if (null == rows || rows.isEmpty()) {
-            return null;
-        }
-
-        // 9. 解析增量数据格式
-        return this.filterRowsByPolicyIncrement(inc, rows);
-    }
-
-    */
-/**
- * 组装数据格式
- *
- * @param msg
- * @param eventType
- * @param beforeArr
- * @param afterArr
- * @throws JSONException
- *//*
-
-    private void putRow(JSONArray msg, String eventType, JSONArray beforeArr, JSONArray afterArr) throws JSONException {
-        JSONObject row = new JSONObject();
-        row.put("eventType", eventType);
-        row.put("before", beforeArr);
-        row.put("after", afterArr);
-        msg.put(row);
-    }
-
-    */
-/**
- * 解析操作事件,用","分割成map集合
- *
- * @param event
- * @param filter
- *//*
-
-    private void splitEvent(String event, Map<String, Boolean> filter) {
-        if (StringUtils.isBlank(event)) {
-            return;
-        }
-        String[] arr = event.split(",");
-        if (null == arr) {
-            return;
-        }
-        int len = arr.length;
-        String e;
-        for (int i = 0; i < len; i++) {
-            e = arr[i];
-            filter.put(e, true);
-        }
-    }
-
-    */
-/**
- * 根据增量策略过滤数据
- *
- * @param inc  增量策略
- * @param rows 数据
- * @return 过滤数据
- *//*
-
-    private JSONArray filterRowsByPolicyIncrement(Map<String, String> inc, List<Map<String, Object>> rows) {
-        // 用于区分事件的字段名称
-        String eventFiled = inc.get("quartzEvent");
-        // 事件:新增、修改和删除
-        String quartzEventInsert = inc.get("quartzEventInsert");
-        String quartzEventUpdate = inc.get("quartzEventUpdate");
-        String quartzEventDelete = inc.get("quartzEventDelete");
-
-        // 由于新增、修改和删除过滤事件可能有多个组合
-        Map<String, Boolean> iFilter = new HashMap<String, Boolean>();
-        Map<String, Boolean> uFilter = new HashMap<String, Boolean>();
-        Map<String, Boolean> dFilter = new HashMap<String, Boolean>();
-        this.splitEvent(quartzEventInsert, iFilter);
-        this.splitEvent(quartzEventUpdate, uFilter);
-        this.splitEvent(quartzEventDelete, dFilter);
-
-        JSONArray msg = null;
-        JSONArray beforeArr = null;
-        JSONArray afterArr = null;
-        for (Map<String, Object> col : rows) {
-            // 创建返回过滤数据对象
-            if (null == msg) {
-                msg = new JSONArray();
-            }
-            try {
-                // 1.解析增量数据事件
-                String event = String.valueOf(col.get(eventFiled));
-                if (null != uFilter.get(event)) {
-                    // 1.1修改数据
-                    afterArr = this.parseColumn(col);
-                    // 组装数据格式
-                    this.putRow(msg, ConnectorConstant.OPERTION_UPDATE, new JSONArray(), afterArr);
-                } else if (null != iFilter.get(event)) {
-                    // 1.2新增数据
-                    afterArr = this.parseColumn(col);
-                    // 组装数据格式
-                    this.putRow(msg, ConnectorConstant.OPERTION_INSERT, new JSONArray(), afterArr);
-                } else if (null != dFilter.get(event)) {
-                    // 1.3删除数据
-                    beforeArr = this.parseColumn(col);
-                    // 组装数据格式
-                    this.putRow(msg, ConnectorConstant.OPERTION_DELETE, beforeArr, new JSONArray());
-                }
-            } catch (JSONException e) {
-                logger.error(e.getClass() + " >> " + e.getLocalizedMessage());
-            }
-        }
-
-        iFilter = null;
-        uFilter = null;
-        dFilter = null;
-        return msg;
-    }
-
-    */
-/**
- * 刷新最后记录点
- *
- * @param task
- * @param inc
- * @param scnPos
- *//*
-
-    private synchronized void refreshScnPos(MappingTask task, Map<String, String> inc, String scnPos) {
-        // 刷新最后记录点
-        inc.put("scnPos", scnPos);
-//        data.saveMapping(task.getId(), task);
-    }
-
-    // 转换行数据格式为JSONArray
-    private JSONArray parseColumn(Map<String, Object> col) throws JSONException {
-        JSONArray row = new JSONArray();
-        JSONObject attr = null;
-        Object value = null;
-        for (Entry<String, Object> obj : col.entrySet()) {
-            attr = new JSONObject();
-            attr.put("name", obj.getKey());
-            // 防止为null
-            value = obj.getValue();
-            value = null == value ? "" : value;
-            attr.put("value", value);
-            row.put(attr);
-        }
-        return row;
-    }
-
-    // 获取最近记录点
-    private Timestamp getScnPos(JdbcTemplate jdbcTemplate, String queryMax) {
-        return jdbcTemplate.queryForObject(queryMax, Timestamp.class);
-    }
-
-}
-*/

+ 1 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -83,7 +83,7 @@ public interface Manager extends Executor {
     // Log
     // Log
     List<Map> queryLog(String type, int pageNum, int pageSize);
     List<Map> queryLog(String type, int pageNum, int pageSize);
 
 
-    void clearLog(String type);
+    void clearLog();
 
 
     // ConnectorEnum
     // ConnectorEnum
     List<ConnectorEnum> getConnectorEnumAll();
     List<ConnectorEnum> getConnectorEnumAll();

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -220,8 +220,8 @@ public class ManagerFactory implements Manager, ApplicationContextAware, Applica
     }
     }
 
 
     @Override
     @Override
-    public void clearLog(String type) {
-        dataTemplate.clear(StorageEnum.LOG, type);
+    public void clearLog() {
+        dataTemplate.clear(StorageEnum.LOG, null);
     }
     }
 
 
     @Override
     @Override

+ 51 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/ExtractorConfig.java

@@ -0,0 +1,51 @@
+package org.dbsyncer.manager.config;
+
+import org.dbsyncer.common.event.Event;
+import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.config.ListenerConfig;
+
+import java.util.Map;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-25 23:52
+ */
+public class ExtractorConfig {
+
+    private ConnectorConfig connectorConfig;
+    private ListenerConfig listenerConfig;
+    private Map<String, String> map;
+    private Event event;
+
+    /**
+     * 抽取器配置
+     *
+     * @param connectorConfig 连接器配置
+     * @param listenerConfig 监听配置
+     * @param map 增量元信息
+     * @param event 监听器
+     */
+    public ExtractorConfig(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map, Event event) {
+        this.connectorConfig = connectorConfig;
+        this.listenerConfig = listenerConfig;
+        this.map = map;
+        this.event = event;
+    }
+
+    public ConnectorConfig getConnectorConfig() {
+        return connectorConfig;
+    }
+
+    public ListenerConfig getListenerConfig() {
+        return listenerConfig;
+    }
+
+    public Map<String, String> getMap() {
+        return map;
+    }
+
+    public Event getEvent() {
+        return event;
+    }
+}

+ 65 - 7
dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/FieldPicker.java

@@ -1,7 +1,13 @@
 package org.dbsyncer.manager.config;
 package org.dbsyncer.manager.config;
 
 
+import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.CompareFilter;
 import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.config.Filter;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.connector.enums.OperationEnum;
+import org.dbsyncer.parser.model.DataEvent;
 import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.model.TableGroup;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
@@ -12,16 +18,19 @@ import java.util.stream.Collectors;
 public class FieldPicker {
 public class FieldPicker {
 
 
     private TableGroup tableGroup;
     private TableGroup tableGroup;
-    private List<Field> column;
-    private List<FieldMapping> fieldMapping;
     private List<Node> index;
     private List<Node> index;
     private int indexSize;
     private int indexSize;
+    private boolean filterSwitch;
+    private List<Filter> add;
+    private List<Filter> or;
 
 
-    public FieldPicker(TableGroup tableGroup, List<Field> column, List<FieldMapping> fieldMapping) {
+    public FieldPicker(TableGroup tableGroup) {
         this.tableGroup = tableGroup;
         this.tableGroup = tableGroup;
-        this.column = column;
-        this.fieldMapping = fieldMapping;
-        init();
+    }
+
+    public FieldPicker(TableGroup tableGroup, List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
+        this.tableGroup = tableGroup;
+        init(filter, column, fieldMapping);
     }
     }
 
 
     public Map<String, Object> getColumns(List<Object> list) {
     public Map<String, Object> getColumns(List<Object> list) {
@@ -42,7 +51,56 @@ public class FieldPicker {
         return tableGroup;
         return tableGroup;
     }
     }
 
 
-    private void init() {
+    /**
+     * 根据过滤条件过滤
+     *
+     * @param data
+     * @return
+     */
+    public boolean filter(DataEvent data) {
+        if (!filterSwitch) {
+            return true;
+        }
+        final Map<String, Object> row = data.getData();
+        // where (id > 1 and id < 100) or (id = 100 or id =101)
+        // 或 关系(成立任意条件)
+        CompareFilter filter = null;
+        Object value = null;
+        for (Filter f: or) {
+            value = row.get(f.getName());
+            if(null == value){
+                continue;
+            }
+            filter = FilterEnum.getCompareFilter(f.getFilter());
+            if(filter.compare(String.valueOf(value), f.getValue())){
+                return true;
+            }
+        }
+
+        boolean pass = false;
+        // 并 关系(成立所有条件)
+        for (Filter f: add) {
+            value = row.get(f.getName());
+            if(null == value){
+                continue;
+            }
+            filter = FilterEnum.getCompareFilter(f.getFilter());
+            if(!filter.compare(String.valueOf(value), f.getValue())){
+                return false;
+            }
+            pass = true;
+        }
+
+        return pass;
+    }
+
+    private void init(List<Filter> filter, List<Field> column, List<FieldMapping> fieldMapping) {
+        // 解析过滤条件
+        if ((filterSwitch = !CollectionUtils.isEmpty(filter))) {
+            add = filter.stream().filter(f -> StringUtils.equals(f.getOperation(), OperationEnum.AND.getName())).collect(Collectors.toList());
+            or = filter.stream().filter(f -> StringUtils.equals(f.getOperation(), OperationEnum.OR.getName())).collect(Collectors.toList());
+        }
+
         // column  => [1, 86, 0, 中文, 2020-05-15T12:17:22.000+0800, 备注信息]
         // column  => [1, 86, 0, 中文, 2020-05-15T12:17:22.000+0800, 备注信息]
         Assert.notEmpty(column, "读取字段不能为空.");
         Assert.notEmpty(column, "读取字段不能为空.");
         Assert.notEmpty(fieldMapping, "映射关系不能为空.");
         Assert.notEmpty(fieldMapping, "映射关系不能为空.");

+ 0 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/AbstractPuller.java

@@ -1,15 +1,11 @@
 package org.dbsyncer.manager.puller;
 package org.dbsyncer.manager.puller;
 
 
 import org.dbsyncer.common.event.ClosedEvent;
 import org.dbsyncer.common.event.ClosedEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContext;
 
 
 public abstract class AbstractPuller implements Puller {
 public abstract class AbstractPuller implements Puller {
 
 
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
     @Autowired
     @Autowired
     private ApplicationContext applicationContext;
     private ApplicationContext applicationContext;
 
 

+ 200 - 43
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -2,25 +2,37 @@ package org.dbsyncer.manager.puller.impl;
 
 
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.UUIDUtil;
+import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.config.Table;
-import org.dbsyncer.listener.DefaultExtractor;
+import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.Listener;
 import org.dbsyncer.listener.Listener;
+import org.dbsyncer.listener.config.ListenerConfig;
+import org.dbsyncer.listener.enums.ListenerTypeEnum;
+import org.dbsyncer.listener.quartz.QuartzExtractor;
+import org.dbsyncer.listener.quartz.ScheduledTaskJob;
+import org.dbsyncer.listener.quartz.ScheduledTaskService;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.manager.config.ExtractorConfig;
 import org.dbsyncer.manager.config.FieldPicker;
 import org.dbsyncer.manager.config.FieldPicker;
 import org.dbsyncer.manager.puller.AbstractPuller;
 import org.dbsyncer.manager.puller.AbstractPuller;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.Parser;
+import org.dbsyncer.parser.logger.LogService;
+import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.model.*;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
 
 
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 
 /**
 /**
  * 增量同步
  * 增量同步
@@ -30,7 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * @date 2020/04/26 15:28
  * @date 2020/04/26 15:28
  */
  */
 @Component
 @Component
-public class IncrementPuller extends AbstractPuller {
+public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob, InitializingBean, DisposableBean {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
@@ -43,7 +55,18 @@ public class IncrementPuller extends AbstractPuller {
     @Autowired
     @Autowired
     private Manager manager;
     private Manager manager;
 
 
-    private Map<String, DefaultExtractor> map = new ConcurrentHashMap<>();
+    @Autowired
+    private LogService logService;
+
+    @Autowired
+    private ScheduledTaskService scheduledTaskService;
+
+    @Autowired
+    private ConnectorFactory connectorFactory;
+
+    private String key;
+
+    private Map<String, AbstractExtractor> map = new ConcurrentHashMap<>();
 
 
     @Override
     @Override
     public void asyncStart(Mapping mapping) {
     public void asyncStart(Mapping mapping) {
@@ -56,89 +79,223 @@ public class IncrementPuller extends AbstractPuller {
             Assert.notEmpty(list, "映射关系不能为空.");
             Assert.notEmpty(list, "映射关系不能为空.");
             Meta meta = manager.getMeta(metaId);
             Meta meta = manager.getMeta(metaId);
             Assert.notNull(meta, "Meta不能为空.");
             Assert.notNull(meta, "Meta不能为空.");
-            DefaultExtractor extractor = listener.createExtractor(connector.getConfig(), mapping.getListener(), meta.getMap());
+            AbstractExtractor extractor = getExtractor(mapping, connector, list, meta);
             Assert.notNull(extractor, "未知的监听配置.");
             Assert.notNull(extractor, "未知的监听配置.");
+
             long now = System.currentTimeMillis();
             long now = System.currentTimeMillis();
             meta.setBeginTime(now);
             meta.setBeginTime(now);
             meta.setEndTime(now);
             meta.setEndTime(now);
             manager.editMeta(meta);
             manager.editMeta(meta);
-
-            // 监听数据变更事件
-            extractor.addListener(new DefaultListener(mapping, list));
             map.putIfAbsent(metaId, extractor);
             map.putIfAbsent(metaId, extractor);
 
 
             // 执行任务
             // 执行任务
             logger.info("启动成功:{}", metaId);
             logger.info("启动成功:{}", metaId);
-            map.get(metaId).run();
+            map.get(metaId).start();
         } catch (Exception e) {
         } catch (Exception e) {
-            finished(metaId);
+            close(metaId);
             logger.error("运行异常,结束任务{}:{}", metaId, e.getMessage());
             logger.error("运行异常,结束任务{}:{}", metaId, e.getMessage());
         }
         }
     }
     }
 
 
     @Override
     @Override
     public void close(String metaId) {
     public void close(String metaId) {
-        DefaultExtractor extractor = map.get(metaId);
+        AbstractExtractor extractor = map.get(metaId);
         if (null != extractor) {
         if (null != extractor) {
             extractor.clearAllListener();
             extractor.clearAllListener();
             extractor.close();
             extractor.close();
-            finished(metaId);
+            map.remove(metaId);
+            publishClosedEvent(metaId);
             logger.info("关闭成功:{}", metaId);
             logger.info("关闭成功:{}", metaId);
         }
         }
     }
     }
 
 
-    private void finished(String metaId) {
-        map.remove(metaId);
-        publishClosedEvent(metaId);
+    @Override
+    public void run() {
+        // 定时同步增量信息
+        map.forEach((k, v) -> v.flushEvent());
+    }
+
+    @Override
+    public void afterPropertiesSet() {
+        key = UUIDUtil.getUUID();
+        scheduledTaskService.start(key, "*/10 * * * * ?", this);
+    }
+
+    @Override
+    public void destroy() {
+        scheduledTaskService.stop(key);
+    }
+
+    private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta)
+            throws InstantiationException, IllegalAccessException {
+        ConnectorConfig connectorConfig = connector.getConfig();
+        ListenerConfig listenerConfig = mapping.getListener();
+
+        // timing/log
+        final String listenerType = listenerConfig.getListenerType();
+
+        // 默认定时抽取
+        if (ListenerTypeEnum.isTiming(listenerType)) {
+            QuartzExtractor extractor = listener.getExtractor(listenerType, QuartzExtractor.class);
+            List<Map<String, String>> commands = list.stream().map(t -> t.getCommand()).collect(Collectors.toList());
+
+            ExtractorConfig config = new ExtractorConfig(connectorConfig, listenerConfig, meta.getMap(), new QuartzListener(mapping, list));
+            setExtractorConfig(extractor, config);
+            extractor.setConnectorFactory(connectorFactory);
+            extractor.setScheduledTaskService(scheduledTaskService);
+            extractor.setCommands(commands);
+            return extractor;
+        }
+
+        // 基于日志抽取
+        if (ListenerTypeEnum.isLog(listenerType)) {
+            final String connectorType = connectorConfig.getConnectorType();
+            AbstractExtractor extractor = listener.getExtractor(connectorType, AbstractExtractor.class);
+
+            ExtractorConfig config = new ExtractorConfig(connectorConfig, listenerConfig, meta.getMap(), new LogListener(mapping, list));
+            setExtractorConfig(extractor, config);
+            return extractor;
+        }
+        return null;
+    }
+
+    private void setExtractorConfig(AbstractExtractor extractor, ExtractorConfig config) {
+        extractor.setConnectorConfig(config.getConnectorConfig());
+        extractor.setListenerConfig(config.getListenerConfig());
+        extractor.setMap(config.getMap());
+        extractor.addListener(config.getEvent());
+    }
+
+    abstract class AbstractListener implements Event {
+        protected Mapping mapping;
+        protected String metaId;
+        protected AtomicBoolean changed = new AtomicBoolean();
+
+        @Override
+        public void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after) {
+            // nothing to do
+        }
+
+        @Override
+        public void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
+            // nothing to do
+        }
+
+        @Override
+        public void flushEvent(Map<String, String> map) {
+            // 如果有变更,执行更新
+            if (changed.compareAndSet(true, false)) {
+                Meta meta = manager.getMeta(metaId);
+                if (null != meta) {
+                    logger.info("同步增量信息:{}>>{}", metaId, map);
+                    meta.setMap(map);
+                    manager.editMeta(meta);
+                }
+            }
+        }
+
+        @Override
+        public void errorEvent(Exception e) {
+            logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
+        }
+
+    }
+
+    /**
+     * </p>定时模式
+     * <ol>
+     * <li>根据过滤条件筛选</li>
+     * </ol>
+     * </p>同步关系:
+     * </p>数据源表 >> 目标源表
+     * <ul>
+     * <li>A >> B</li>
+     * <li>A >> C</li>
+     * <li>E >> F</li>
+     * </ul>
+     * </p>PS:
+     * <ol>
+     * <li>依次执行同步关系A >> B 然后 A >> C ...</li>
+     * </ol>
+     */
+    final class QuartzListener extends AbstractListener {
+
+        private List<FieldPicker> tablePicker;
+
+        public QuartzListener(Mapping mapping, List<TableGroup> list) {
+            this.mapping = mapping;
+            this.metaId = mapping.getMetaId();
+            this.tablePicker = new LinkedList<>();
+            list.forEach(t -> tablePicker.add(new FieldPicker(t)));
+        }
+
+        @Override
+        public void changedQuartzEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
+            final FieldPicker picker = tablePicker.get(tableGroupIndex);
+            logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}", picker.getTableGroup().getSourceTable().getName(), event, before, after);
+
+            // 处理过程有异常向上抛
+            DataEvent data = new DataEvent(event, before, after);
+            parser.execute(mapping, picker.getTableGroup(), data);
+
+            // 标记有变更记录
+            changed.compareAndSet(false, true);
+        }
     }
     }
 
 
-    final class DefaultListener implements Event {
+    /**
+     * </p>日志模式
+     * <ol>
+     * <li>监听表增量数据</li>
+     * <li>根据过滤条件筛选</li>
+     * </ol>
+     * </p>同步关系:
+     * </p>数据源表 >> 目标源表
+     * <ul>
+     * <li>A >> B</li>
+     * <li>A >> C</li>
+     * <li>E >> F</li>
+     * </ul>
+     * </p>PS:
+     * <ol>
+     * <li>为减少开销而选择复用监听器实例, 启动时只需创建一个数据源连接器.</li>
+     * <li>关系A >> B和A >> C会复用A监听的数据, A监听到增量数据,会发送给B和C.</li>
+     * <li>该模式下,会监听表所有字段.</li>
+     * </ol>
+     */
+    final class LogListener extends AbstractListener {
 
 
-        private Mapping mapping;
-        private List<TableGroup> list;
-        private String metaId;
         private Map<String, List<FieldPicker>> tablePicker;
         private Map<String, List<FieldPicker>> tablePicker;
 
 
-        public DefaultListener(Mapping mapping, List<TableGroup> list) {
+        public LogListener(Mapping mapping, List<TableGroup> list) {
             this.mapping = mapping;
             this.mapping = mapping;
-            this.list = list;
             this.metaId = mapping.getMetaId();
             this.metaId = mapping.getMetaId();
             this.tablePicker = new LinkedHashMap<>();
             this.tablePicker = new LinkedHashMap<>();
             list.forEach(t -> {
             list.forEach(t -> {
                 final Table table = t.getSourceTable();
                 final Table table = t.getSourceTable();
                 final String tableName = table.getName();
                 final String tableName = table.getName();
                 tablePicker.putIfAbsent(tableName, new ArrayList<>());
                 tablePicker.putIfAbsent(tableName, new ArrayList<>());
-                tablePicker.get(tableName).add(new FieldPicker(t, table.getColumn(), t.getFieldMapping()));
+                tablePicker.get(tableName).add(new FieldPicker(t, t.getFilter(), table.getColumn(), t.getFieldMapping()));
             });
             });
         }
         }
 
 
         @Override
         @Override
-        public void changedEvent(String tableName, String event, List<Object> before, List<Object> after) {
+        public void changedLogEvent(String tableName, String event, List<Object> before, List<Object> after) {
             logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}", tableName, event, before, after);
             logger.info("监听数据=> tableName:{}, event:{}, before:{}, after:{}", tableName, event, before, after);
 
 
             // 处理过程有异常向上抛
             // 处理过程有异常向上抛
             List<FieldPicker> pickers = tablePicker.get(tableName);
             List<FieldPicker> pickers = tablePicker.get(tableName);
             if (!CollectionUtils.isEmpty(pickers)) {
             if (!CollectionUtils.isEmpty(pickers)) {
-                pickers.parallelStream().forEach(p -> {
-                    DataEvent data = new DataEvent(event, p.getColumns(before), p.getColumns(after));
-                    parser.execute(mapping, p.getTableGroup(), data);
+                pickers.parallelStream().forEach(picker -> {
+                    DataEvent data = new DataEvent(event, picker.getColumns(before), picker.getColumns(after));
+                    if (picker.filter(data)) {
+                        parser.execute(mapping, picker.getTableGroup(), data);
+                    }
                 });
                 });
             }
             }
 
 
-        }
-
-        @Override
-        public void flushEvent() {
-            // TODO 更新待优化,存在性能问题
-            DefaultExtractor extractor = map.get(metaId);
-            if (null != extractor) {
-                logger.info("flushEvent map:{}", extractor.getMap());
-                Meta meta = manager.getMeta(metaId);
-                if (null != meta) {
-                    meta.setMap(extractor.getMap());
-                    manager.editMeta(meta);
-                }
-            }
+            // 标记有变更记录
+            changed.compareAndSet(false, true);
         }
         }
 
 
     }
     }

+ 7 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -92,8 +92,12 @@ public class ParserFactory implements Parser {
         Table sTable = new Table().setName(sTableName).setColumn(new ArrayList<>());
         Table sTable = new Table().setName(sTableName).setColumn(new ArrayList<>());
         Table tTable = new Table().setName(tTableName).setColumn(new ArrayList<>());
         Table tTable = new Table().setName(tTableName).setColumn(new ArrayList<>());
         fieldMapping.forEach(m -> {
         fieldMapping.forEach(m -> {
-            sTable.getColumn().add(m.getSource());
-            tTable.getColumn().add(m.getTarget());
+            if(null != m.getSource()){
+                sTable.getColumn().add(m.getSource());
+            }
+            if(null != m.getTarget()){
+                tTable.getColumn().add(m.getTarget());
+            }
         });
         });
         final CommandConfig sourceConfig = new CommandConfig(sType, sTable, tableGroup.getFilter());
         final CommandConfig sourceConfig = new CommandConfig(sType, sTable, tableGroup.getFilter());
         final CommandConfig targetConfig = new CommandConfig(tType, tTable);
         final CommandConfig targetConfig = new CommandConfig(tType, tTable);
@@ -273,7 +277,7 @@ public class ParserFactory implements Parser {
      * @param data
      * @param data
      */
      */
     private void flush(Task task, Result writer, List<Map<String, Object>> data) {
     private void flush(Task task, Result writer, List<Map<String, Object>> data) {
-        flush(task.getId(), writer, ConnectorConstant.OPERTION_DELETE, data);
+        flush(task.getId(), writer, ConnectorConstant.OPERTION_INSERT, data);
 
 
         // 发布刷新事件给FullExtractor
         // 发布刷新事件给FullExtractor
         task.setEndTime(System.currentTimeMillis());
         task.setEndTime(System.currentTimeMillis());

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushServiceImpl.java

@@ -36,7 +36,7 @@ public class FlushServiceImpl implements FlushService {
     @Override
     @Override
     public void asyncWrite(String type, String error) {
     public void asyncWrite(String type, String error) {
         Map<String, Object> params = new HashMap();
         Map<String, Object> params = new HashMap();
-        params.put(ConfigConstant.CONFIG_MODEL_ID, snowflakeIdWorker.nextId());
+        params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
         params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
         params.put(ConfigConstant.CONFIG_MODEL_TYPE, type);
         params.put(ConfigConstant.CONFIG_MODEL_JSON, error);
         params.put(ConfigConstant.CONFIG_MODEL_JSON, error);
         params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, System.currentTimeMillis());
         params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, System.currentTimeMillis());

+ 15 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/logger/LogService.java

@@ -0,0 +1,15 @@
+package org.dbsyncer.parser.logger;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-21 23:18
+ */
+public interface LogService {
+
+    void log(LogType logType);
+
+    void log(LogType logType, String msg);
+
+    void log(LogType logType, String format, Object... args);
+}

+ 32 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/logger/LogServiceImpl.java

@@ -0,0 +1,32 @@
+package org.dbsyncer.parser.logger;
+
+import org.dbsyncer.parser.flush.FlushService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-21 23:18
+ */
+@Component
+public class LogServiceImpl implements LogService {
+
+    @Autowired
+    private FlushService flushService;
+
+    @Override
+    public void log(LogType logType) {
+        flushService.asyncWrite(logType.getType(), logType.getMessage());
+    }
+
+    @Override
+    public void log(LogType logType, String msg) {
+        flushService.asyncWrite(logType.getType(), msg);
+    }
+
+    @Override
+    public void log(LogType logType, String format, Object... args) {
+        flushService.asyncWrite(logType.getType(), String.format(format, args));
+    }
+}

+ 206 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/logger/LogType.java

@@ -0,0 +1,206 @@
+package org.dbsyncer.parser.logger;
+
+/**
+ * 日志类型枚举
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/21 16:19
+ */
+public interface LogType {
+
+    /**
+     * 分类名称
+     *
+     * @return
+     */
+    String getName();
+
+    /**
+     * 类型
+     *
+     * @return
+     */
+    String getType();
+
+    /**
+     * 内容
+     *
+     * @return
+     */
+    String getMessage();
+
+    /**
+     * 系统日志1
+     */
+    enum SystemLog implements LogType {
+        /**
+         * 正常
+         */
+        INFO("10", "正常"),
+        /**
+         * 警告
+         */
+        WARN("11", "警告"),
+        /**
+         * 错误
+         */
+        ERROR("12", "错误");
+
+        private String type;
+        private String message;
+
+        SystemLog(String type, String message) {
+            this.type = type;
+            this.message = message;
+        }
+
+        @Override
+        public String getName() {
+            return "系统日志";
+        }
+
+        @Override
+        public String getType() {
+            return type;
+        }
+
+        @Override
+        public String getMessage() {
+            return message;
+        }
+    }
+
+    /**
+     * 连接器2
+     */
+    enum ConnectorLog implements LogType {
+        INSERT("20", "新增"),
+        UPDATE("21", "修改"),
+        DELETE("22", "删除"),
+        FAILED("23", "连接失败");
+
+        private String type;
+        private String message;
+
+        ConnectorLog(String type, String message) {
+            this.type = type;
+            this.message = message;
+        }
+
+        @Override
+        public String getName() {
+            return "连接器";
+        }
+
+        @Override
+        public String getType() {
+            return type;
+        }
+
+        @Override
+        public String getMessage() {
+            return message;
+        }
+    }
+
+    /**
+     * 驱动3
+     */
+    enum MappingLog implements LogType {
+        INSERT("30", "新增"),
+        UPDATE("31", "修改"),
+        DELETE("32", "删除"),
+        RUNNING("33", "启动"),
+        STOP("34", "停止");
+
+        private String type;
+        private String message;
+
+        MappingLog(String type, String message) {
+            this.type = type;
+            this.message = message;
+        }
+
+        @Override
+        public String getName() {
+            return "驱动";
+        }
+
+        @Override
+        public String getType() {
+            return type;
+        }
+
+        @Override
+        public String getMessage() {
+            return message;
+        }
+    }
+
+    /**
+     * 映射关系4
+     */
+    enum TableGroupLog implements LogType {
+        INSERT("40", "新增"),
+        UPDATE("41", "修改"),
+        DELETE("42", "删除"),
+        INCREMENT_FAILED("43", "增量同步异常");
+
+        private String type;
+        private String message;
+
+        TableGroupLog(String type, String message) {
+            this.type = type;
+            this.message = message;
+        }
+
+        @Override
+        public String getName() {
+            return "映射关系";
+        }
+
+        @Override
+        public String getType() {
+            return type;
+        }
+
+        @Override
+        public String getMessage() {
+            return message;
+        }
+    }
+
+    /**
+     * 元信息5
+     */
+    enum MetaLog implements LogType {
+        DELETE("50", "删除"),
+        CLEAR("51", "删除数据"),
+        TASK("52", "任务");
+
+        private String type;
+        private String message;
+
+        MetaLog(String type, String message) {
+            this.type = type;
+            this.message = message;
+        }
+
+        @Override
+        public String getName() {
+            return "元信息";
+        }
+
+        @Override
+        public String getType() {
+            return type;
+        }
+
+        @Override
+        public String getMessage() {
+            return message;
+        }
+    }
+
+}

+ 14 - 9
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.storage.lucene;
 package org.dbsyncer.storage.lucene;
 
 
+import org.apache.commons.io.FileUtils;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.cn.smart.SmartChineseAnalyzer;
 import org.apache.lucene.analysis.cn.smart.SmartChineseAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Document;
@@ -7,9 +8,10 @@ import org.apache.lucene.index.*;
 import org.apache.lucene.search.*;
 import org.apache.lucene.search.*;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.store.FSDirectory;
-import org.dbsyncer.storage.StorageException;
 
 
+import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.Paths;
 import java.util.*;
 import java.util.*;
 
 
@@ -20,6 +22,8 @@ import java.util.*;
  */
  */
 public class Shard {
 public class Shard {
 
 
+    private File indexPath;
+
     private Directory directory;
     private Directory directory;
 
 
     private Analyzer analyzer;
     private Analyzer analyzer;
@@ -36,7 +40,9 @@ public class Shard {
 
 
     public Shard(String path) throws IOException {
     public Shard(String path) throws IOException {
         // 索引存放的位置,设置在当前目录中
         // 索引存放的位置,设置在当前目录中
-        directory = FSDirectory.open(Paths.get(path));
+        Path dir = Paths.get(path);
+        indexPath = new File(dir.toUri());
+        directory = FSDirectory.open(dir);
         // 分词器
         // 分词器
         analyzer = new SmartChineseAnalyzer();
         analyzer = new SmartChineseAnalyzer();
         // 创建索引写入配置
         // 创建索引写入配置
@@ -80,15 +86,14 @@ public class Shard {
     public void deleteAll() throws IOException {
     public void deleteAll() throws IOException {
         indexWriter.deleteAll();
         indexWriter.deleteAll();
         indexWriter.commit();
         indexWriter.commit();
+        close();
+        directory.close();
+        FileUtils.deleteDirectory(indexPath);
     }
     }
 
 
-    public void close() {
-        try {
-            indexWriter.close();
-            indexReader.close();
-        } catch (IOException e) {
-            throw new StorageException(e);
-        }
+    public void close() throws IOException {
+        indexReader.close();
+        indexWriter.close();
     }
     }
 
 
     public IndexSearcher getSearcher() throws IOException {
     public IndexSearcher getSearcher() throws IOException {

+ 6 - 4
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -111,10 +111,12 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
 
 
     @Override
     @Override
     public void deleteAll(String collectionId) throws IOException {
     public void deleteAll(String collectionId) throws IOException {
-        Shard shard = map.get(collectionId);
-        if (null != shard) {
-            shard.deleteAll();
-            map.remove(collectionId);
+        synchronized (this){
+            Shard shard = map.get(collectionId);
+            if (null != shard) {
+                shard.deleteAll();
+                map.remove(collectionId);
+            }
         }
         }
     }
     }
 
 

+ 2 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/LuceneFactoryTest.java → dbsyncer-storage/src/main/test/LuceneFactoryTest.java

@@ -1,5 +1,3 @@
-package org.dbsyncer.storage.lucene;
-
 import org.apache.commons.lang.math.RandomUtils;
 import org.apache.commons.lang.math.RandomUtils;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.TokenStream;
 import org.apache.lucene.analysis.TokenStream;
@@ -15,6 +13,7 @@ import org.apache.lucene.search.highlight.Highlighter;
 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
 import org.apache.lucene.search.highlight.QueryScorer;
 import org.apache.lucene.search.highlight.QueryScorer;
 import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
 import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
+import org.dbsyncer.storage.lucene.Shard;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
@@ -37,7 +36,7 @@ public class LuceneFactoryTest {
     }
     }
 
 
     @After
     @After
-    public void tearDown() {
+    public void tearDown() throws IOException {
         shard.close();
         shard.close();
     }
     }
 
 

+ 14 - 0
dbsyncer-web/src/main/java/org/dbsyncer/web/config/TaskPoolConfig.java

@@ -2,7 +2,9 @@ package org.dbsyncer.web.config;
 
 
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -47,4 +49,16 @@ public class TaskPoolConfig {
         return executor;
         return executor;
     }
     }
 
 
+    @Bean
+    public TaskScheduler taskScheduler() {
+        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
+        taskScheduler.setPoolSize(5);
+        taskScheduler.setRemoveOnCancelPolicy(true);
+        taskScheduler.setThreadNamePrefix("taskScheduler");
+        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
+        taskScheduler.setAwaitTerminationSeconds(60);
+        return taskScheduler;
+    }
+
+
 }
 }

+ 0 - 6
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/index/MappingController.java

@@ -36,12 +36,6 @@ public class MappingController extends BaseController {
         return "mapping/add";
         return "mapping/add";
     }
     }
 
 
-    @GetMapping("/pageEdit")
-    public String page(ModelMap model, @RequestParam(value = "id") String id) {
-        model.put("mapping", mappingService.getMapping(id));
-        return "mapping/edit";
-    }
-
     @GetMapping("/page/{page}")
     @GetMapping("/page/{page}")
     public String page(ModelMap model, @PathVariable("page") String page, @RequestParam(value = "id") String id) {
     public String page(ModelMap model, @PathVariable("page") String page, @RequestParam(value = "id") String id) {
         model.put("mapping", mappingService.getMapping(id));
         model.put("mapping", mappingService.getMapping(id));

+ 20 - 14
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/monitor/MonitorController.java

@@ -2,21 +2,20 @@ package org.dbsyncer.web.controller.monitor;
 
 
 import org.dbsyncer.biz.MonitorService;
 import org.dbsyncer.biz.MonitorService;
 import org.dbsyncer.biz.vo.RestResult;
 import org.dbsyncer.biz.vo.RestResult;
+import org.dbsyncer.web.controller.BaseController;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Controller;
 import org.springframework.stereotype.Controller;
 import org.springframework.ui.ModelMap;
 import org.springframework.ui.ModelMap;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestParam;
-import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.*;
 
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletRequest;
+import java.util.Map;
 
 
 @Controller
 @Controller
 @RequestMapping("/monitor")
 @RequestMapping("/monitor")
-public class MonitorController {
+public class MonitorController extends BaseController {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
@@ -25,15 +24,21 @@ public class MonitorController {
 
 
     @RequestMapping("")
     @RequestMapping("")
     public String index(HttpServletRequest request, ModelMap model) {
     public String index(HttpServletRequest request, ModelMap model) {
+        Map<String, String> params = getParams(request);
         model.put("threadInfo", monitorService.getThreadInfo());
         model.put("threadInfo", monitorService.getThreadInfo());
+        model.put("metaId", monitorService.getDefaultMetaId(params));
+        model.put("meta", monitorService.getMetaAll());
+        model.put("data", monitorService.queryData(params));
+        model.put("log", monitorService.queryLog(params));
         return "monitor/monitor.html";
         return "monitor/monitor.html";
     }
     }
 
 
     @GetMapping("/queryData")
     @GetMapping("/queryData")
     @ResponseBody
     @ResponseBody
-    public RestResult queryData(@RequestParam(value = "id") String id, @RequestParam(value = "pageNum") int pageNum, @RequestParam(value = "pageSize") int pageSize) {
+    public RestResult queryData(HttpServletRequest request) {
         try {
         try {
-            return RestResult.restSuccess(monitorService.queryData(id, pageNum, pageSize));
+            Map<String, String> params = getParams(request);
+            return RestResult.restSuccess(monitorService.queryData(params));
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e.getClass());
             logger.error(e.getLocalizedMessage(), e.getClass());
             return RestResult.restFail(e.getMessage());
             return RestResult.restFail(e.getMessage());
@@ -42,18 +47,19 @@ public class MonitorController {
 
 
     @GetMapping("/queryLog")
     @GetMapping("/queryLog")
     @ResponseBody
     @ResponseBody
-    public RestResult queryLog(@RequestParam(value = "type") String type, @RequestParam(value = "pageNum") int pageNum, @RequestParam(value = "pageSize") int pageSize) {
+    public RestResult queryLog(HttpServletRequest request) {
         try {
         try {
-            return RestResult.restSuccess(monitorService.queryLog(type, pageNum, pageSize));
+            Map<String, String> params = getParams(request);
+            return RestResult.restSuccess(monitorService.queryLog(params));
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e.getClass());
             logger.error(e.getLocalizedMessage(), e.getClass());
             return RestResult.restFail(e.getMessage());
             return RestResult.restFail(e.getMessage());
         }
         }
     }
     }
 
 
-    @GetMapping("/clearData")
+    @PostMapping("/clearData")
     @ResponseBody
     @ResponseBody
-    public RestResult clearData(@RequestParam(value = "id") String id) {
+    public RestResult clearData(String id) {
         try {
         try {
             return RestResult.restSuccess(monitorService.clearData(id));
             return RestResult.restSuccess(monitorService.clearData(id));
         } catch (Exception e) {
         } catch (Exception e) {
@@ -62,11 +68,11 @@ public class MonitorController {
         }
         }
     }
     }
 
 
-    @GetMapping("/clearLog")
+    @PostMapping("/clearLog")
     @ResponseBody
     @ResponseBody
-    public RestResult clearLog(@RequestParam(value = "type") String type) {
+    public RestResult clearLog() {
         try {
         try {
-            return RestResult.restSuccess(monitorService.clearLog(type));
+            return RestResult.restSuccess(monitorService.clearLog());
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e.getClass());
             logger.error(e.getLocalizedMessage(), e.getClass());
             return RestResult.restFail(e.getMessage());
             return RestResult.restFail(e.getMessage());

+ 7 - 0
dbsyncer-web/src/main/resources/static/js/common.js

@@ -20,6 +20,13 @@ function backIndexPage() {
     $initContainer.load("/index?refresh=" + new Date().getTime());
     $initContainer.load("/index?refresh=" + new Date().getTime());
 }
 }
 
 
+// 跳转菜单
+function activeMenu($url){
+    var $menu = $('#menu > li');
+    $menu.removeClass('active');
+    $menu.find("a[url='" + $url + "']").parent().addClass('active');
+}
+
 // 美化SQL
 // 美化SQL
 function beautifySql(){
 function beautifySql(){
     var $sql = $("#sql");
     var $sql = $("#sql");

+ 11 - 0
dbsyncer-web/src/main/resources/static/js/index/index.js

@@ -29,6 +29,16 @@ function bindEditMapping() {
     });
     });
 }
 }
 
 
+// 查看驱动日志
+function bindQueryData() {
+    $(".mappingList .queryData").click(function () {
+        // 阻止触发click传递事件
+        event.cancelBubble=true;
+        var $id = $(this).attr("id");
+        activeMenu('/monitor');
+        $initContainer.load('/monitor?id=' + $id);
+    });
+}
 
 
 function bindConnectorDropdownMenu() {
 function bindConnectorDropdownMenu() {
     $(".connectorList .dropdown-menu li").click(function () {
     $(".connectorList .dropdown-menu li").click(function () {
@@ -108,6 +118,7 @@ $(function () {
 
 
     bindAddMapping();
     bindAddMapping();
     bindEditMapping();
     bindEditMapping();
+    bindQueryData();
 
 
     bindConnectorDropdownMenu();
     bindConnectorDropdownMenu();
     bindMappingDropdownMenu();
     bindMappingDropdownMenu();

+ 2 - 0
dbsyncer-web/src/main/resources/static/js/mapping/editTableGroup.js

@@ -62,6 +62,8 @@ function bindFieldMappingAddClick(){
     $btn.bind('click', function(){
     $btn.bind('click', function(){
         var sField = $("#sourceFieldMapping").select2("val");
         var sField = $("#sourceFieldMapping").select2("val");
         var tField = $("#targetFieldMapping").select2("val");
         var tField = $("#targetFieldMapping").select2("val");
+        sField = sField == null ? "" : sField;
+        tField = tField == null ? "" : tField;
         // 非空检查
         // 非空检查
         if(sField == "" && tField == ""){
         if(sField == "" && tField == ""){
             bootGrowl("至少有一个表字段.", "danger");
             bootGrowl("至少有一个表字段.", "danger");

+ 72 - 0
dbsyncer-web/src/main/resources/static/js/monitor/index.js

@@ -0,0 +1,72 @@
+// 查看详细数据
+function bindQueryDataEvent() {
+    $(".metaDataList .queryData").click(function () {
+        var json = $(this).attr("json");
+        var html = '<div class="row driver_break_word">' + json + '</div>';
+        BootstrapDialog.show({
+            title: "注意信息安全",
+            type: BootstrapDialog.TYPE_INFO,
+            message: html,
+            size: BootstrapDialog.SIZE_NORMAL,
+            buttons: [{
+                label: "关闭",
+                action: function (dialog) {
+                    dialog.close();
+                }
+            }]
+        });
+    });
+}
+
+// 清空数据
+function bindClearEvent($btn, $title, $msg, $url){
+    $btn.click(function () {
+        var $id = $(this).attr("id");
+        var data = {"id": $id};
+        BootstrapDialog.show({
+            title: "警告",
+            type: BootstrapDialog.TYPE_DANGER,
+            message: $title,
+            size: BootstrapDialog.SIZE_NORMAL,
+            buttons: [{
+                label: "确定",
+                action: function (dialog) {
+                    doPoster($url, data, function (data) {
+                        if (data.success == true) {
+                            bootGrowl($msg, "success");
+                            $initContainer.load('/monitor?id=' + $id);
+                        } else {
+                            bootGrowl(data.resultValue, "danger");
+                        }
+                    });
+                    dialog.close();
+                }
+            }, {
+                label: "取消",
+                action: function (dialog) {
+                    dialog.close();
+                }
+            }]
+        });
+
+    });
+}
+
+$(function () {
+    // 初始化select2插件
+    $(".select-control").select2({
+        width: "100%",
+        theme: "classic"
+    });
+
+    //连接器类型切换事件
+    $("select[name='metaData']").change(function () {
+        var $id = $(this).val();
+        $initContainer.load('/monitor?id=' + $id);
+    });
+
+    bindQueryDataEvent();
+    bindClearEvent($(".clearDataBtn"), "确认清空数据?", "清空数据成功!", "/monitor/clearData");
+    bindClearEvent($(".clearLogBtn"), "确认清空日志?", "清空日志成功!", "/monitor/clearLog");
+
+});

+ 19 - 15
dbsyncer-web/src/main/resources/templates/index/index.html

@@ -5,10 +5,10 @@
 <!-- Connector Mapping -->
 <!-- Connector Mapping -->
 <div class="container-fluid">
 <div class="container-fluid">
     <div class="row">
     <div class="row">
+        <form class="form-horizontal" role="form" method="post">
 
 
-        <!-- 连接器管理 -->
-        <div class="col-md-12">
-            <form class="form-horizontal" role="form" method="post">
+            <!-- 连接器管理 -->
+            <div class="col-md-12">
                 <!-- 连接器开始位置 -->
                 <!-- 连接器开始位置 -->
                 <div class="form-group">
                 <div class="form-group">
                     <div class="col-md-12">
                     <div class="col-md-12">
@@ -51,12 +51,10 @@
                     </div>
                     </div>
                 </div>
                 </div>
                 <!-- 连接器开结束位置 -->
                 <!-- 连接器开结束位置 -->
-            </form>
-        </div>
+            </div>
 
 
-        <!-- 驱动管理 -->
-        <div class="col-md-12">
-            <form class="form-horizontal" role="form" method="post">
+            <!-- 驱动管理 -->
+            <div class="col-md-12">
                 <!-- 驱动开始位置 -->
                 <!-- 驱动开始位置 -->
                 <div class="form-group" th:if="${connectors?.size() gt 0}">
                 <div class="form-group" th:if="${connectors?.size() gt 0}">
                     <div class="col-md-12">
                     <div class="col-md-12">
@@ -129,13 +127,19 @@
                                                     <tr>
                                                     <tr>
                                                         <td class="text-left">
                                                         <td class="text-left">
                                                             同步结果>
                                                             同步结果>
-                                                            总数:[[${m?.meta?.total}]]
-                                                            <span th:if="${m?.meta?.model eq '全量' and (m?.meta?.success + m?.meta?.fail) gt 0}">
-                                                            ,进度:[[${#numbers.formatDecimal(((m?.meta?.success + m?.meta?.fail) / m?.meta?.total * 100.00),0 ,2)}]]%
-                                                            ,耗时:[[${(m?.meta?.endTime - m?.meta?.beginTime) / 1000}]]秒
+                                                            <span th:if="${m?.meta?.model eq '全量'}">
+                                                                总数:[[${m?.meta?.total}]]
+                                                                <span th:if="${m?.meta?.total gt 0 and (m?.meta?.success + m?.meta?.fail) gt 0}">
+                                                                ,进度:[[${#numbers.formatDecimal(((m?.meta?.success + m?.meta?.fail) / m?.meta?.total * 100.00),0 ,2)}]]%
+                                                                </span>
+                                                                ,耗时:[[${(m?.meta?.endTime - m?.meta?.beginTime) / 1000}]]秒
                                                             </span>
                                                             </span>
+                                                            <span th:if="${m?.meta?.model eq '增量'}">
+                                                                总数:[[${#numbers.formatDecimal((m?.meta?.success + m?.meta?.fail),0 ,0)}]]
+                                                            </span>
+
                                                             <span th:if="${m?.meta?.success gt 0}">,成功:[[${m?.meta?.success}]]</span>
                                                             <span th:if="${m?.meta?.success gt 0}">,成功:[[${m?.meta?.success}]]</span>
-                                                            <span th:if="${m?.meta?.fail gt 0}">,失败:[[${m?.meta?.fail}]] <a href="javascript:;" class="label label-danger">日志</a></span>
+                                                            <span th:if="${m?.meta?.fail gt 0}">,失败:[[${m?.meta?.fail}]] <a th:id="${m?.meta?.id}" href="javascript:;" class="label label-danger queryData">日志</a></span>
                                                         </td>
                                                         </td>
                                                     </tr>
                                                     </tr>
                                                     <tr>
                                                     <tr>
@@ -172,9 +176,9 @@
                     </div>
                     </div>
                 </div>
                 </div>
                 <!-- 驱动结束位置 -->
                 <!-- 驱动结束位置 -->
-            </form>
-        </div>
+            </div>
 
 
+        </form>
     </div>
     </div>
 </div>
 </div>
 
 

+ 104 - 19
dbsyncer-web/src/main/resources/templates/monitor/monitor.html

@@ -4,27 +4,112 @@
 
 
 <!-- Monitor -->
 <!-- Monitor -->
 <div class="container-fluid">
 <div class="container-fluid">
+    <div class="row">
+        <form class="form-horizontal" role="form" method="post">
 
 
-    图表信息
-    <p>CPU 内存 硬盘 堆内存 线程数</p>
-    <table class="table table-hover">
-        <caption>任务线程</caption>
-        <thead>
-        <tr>
-            <th>序号</th>
-            <th>指标</th>
-        </tr>
-        </thead>
-        <tr th:each="item,userStat:${threadInfo}">
-            <td th:text="${userStat.index}+1"></td>
-            <td th:text="${userStat.current.key}"></td><!-- key-->
-            <td th:text="${userStat.current.value}"></td><!-- value-->
-        </tr>
-    </table>
-
-    <p>驱动增量/全量同步数据</p>
-    <p>操作日志</p>
+            <!-- 数据 -->
+            <div class="col-md-12">
+                <div class="form-group">
+                    <div class="col-md-3">
+                        <!-- 驱动下拉 -->
+                        <select id="metaData" name="metaData" class="form-control select-control">
+                            <option th:each="m,s:${meta}" th:value="${m?.id}" th:text="${m?.mappingName} +' (' + ${m?.model} +')'" th:selected="${m?.id eq metaId}"/>
+                        </select>
+                    </div>
+                    <div class="col-sm-4">
+                        <input class="form-control" type="text" maxlength="32" placeholder="请输入内容关键字(最多32个字)." />
+                    </div>
+                    <div class="col-md-1">
+                        <button type="button" class="btn btn-primary">查询数据</button>
+                    </div>
+                    <div class="col-md-4 text-right">
+                        <button th:id="${metaId}" type="button" class="btn btn-default clearDataBtn">清空数据</button>
+                    </div>
+                </div>
 
 
+                <table class="table table-hover metaDataList">
+                    <thead>
+                    <tr>
+                        <th style="width:3%;"></th>
+                        <th style="width:5%;">事件</th>
+                        <th style="width:5%;">结果</th>
+                        <th style="width:60%;">异常</th>
+                        <th style="width:17%;">时间</th>
+                        <th style="width:10%;">详情</th>
+                    </tr>
+                    </thead>
+                    <tr th:each="d,s : ${data}">
+                        <td th:text="${s.index}+1"></td>
+                        <td th:text="${d?.event}"></td>
+                        <td>
+                            <span th:if="${d?.success}" class="label label-success">成功</span>
+                            <span th:if="${not d?.success}" class="label label-warning">失败</span>
+                        </td>
+                        <td th:text="${d?.error}"></td>
+                        <td th:text="${#dates.format(d?.createTime, 'yyyy-MM-dd HH:mm:ss')}"></td>
+                        <td><a th:json="${d?.json}" href="javascript:;" class="label label-info queryData">查看数据</a></td>
+                    </tr>
+                </table>
+            </div>
+
+            <!-- 性能指标 -->
+            <div class="col-md-12">
+                <p>CPU 内存 硬盘 堆内存 </p>
+            </div>
+
+            <!-- 线程任务 -->
+            <div class="col-md-12">
+                <table class="table table-hover">
+                    <caption>线程任务</caption>
+                    <thead>
+                    <tr>
+                        <th>序号</th>
+                        <th>类型</th>
+                        <th>指标</th>
+                    </tr>
+                    </thead>
+                    <tr th:each="item,userStat:${threadInfo}">
+                        <td th:text="${userStat.index}+1"></td>
+                        <td th:text="${userStat.current.key}"></td><!-- key-->
+                        <td th:text="${userStat.current.value}"></td><!-- value-->
+                    </tr>
+                </table>
+            </div>
+
+            <!-- 日志 -->
+            <div class="col-md-12">
+                <div class="form-group">
+                    <div class="col-sm-4">
+                        <input class="form-control" type="text" maxlength="32" placeholder="请输入内容关键字(最多32个字)." />
+                    </div>
+                    <div class="col-md-1">
+                        <button type="button" class="btn btn-primary">查询日志</button>
+                    </div>
+                    <div class="col-md-4"></div>
+                    <div class="col-md-3 text-right">
+                        <button th:id="${metaId}" type="button" class="btn btn-default clearLogBtn">清空日志</button>
+                    </div>
+                </div>
+
+                <table class="table table-hover">
+                    <thead>
+                    <tr>
+                        <th>序号</th>
+                        <th>内容</th>
+                        <th>时间</th>
+                    </tr>
+                    </thead>
+                    <tr th:each="l,s : ${log}">
+                        <td th:text="${s.index}+1"></td>
+                        <td th:text="${l?.json}"></td>
+                        <td th:text="${#dates.format(l?.createTime, 'yyyy-MM-dd HH:mm:ss')}"></td>
+                    </tr>
+                </table>
+            </div>
+
+        </form>
+    </div>
 </div>
 </div>
 
 
+<script th:src="@{/js/monitor/index.js}"></script>
 </html>
 </html>