Explorar el Código

add scheduled

AE86 hace 5 años
padre
commit
c5665c5be6

+ 4 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.common.event;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * @version 1.0.0
@@ -21,7 +22,9 @@ public interface Event {
 
     /**
      * 写入增量点事件
+     *
+     * @param map
      */
-    void flushEvent();
+    void flushEvent(Map<String, String> map);
 
 }

+ 44 - 18
dbsyncer-listener/src/main/java/org/dbsyncer/listener/DefaultExtractor.java

@@ -2,29 +2,62 @@ package org.dbsyncer.listener;
 
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.UUIDUtil;
+import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
+import org.dbsyncer.listener.quartz.ScheduledTaskJob;
+import org.dbsyncer.listener.quartz.ScheduledTaskService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
+ * 默认定时抽取
+ *
  * @version 1.0.0
  * @Author AE86
  * @Date 2020-05-12 20:35
  */
-public abstract class DefaultExtractor implements Extractor {
+public class DefaultExtractor implements Extractor, ScheduledTaskJob {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
 
     protected ConnectorConfig connectorConfig;
     protected ListenerConfig listenerConfig;
+    protected ConnectorFactory connectorFactory;
+    protected ScheduledTaskService scheduledTaskService;
     protected Map<String, String> map;
-
     private List<Event> watcher;
-    private Action action;
+    private String key;
+    private AtomicBoolean running = new AtomicBoolean();
+
+    @Override
+    public void start() {
+        key = UUIDUtil.getUUID();
+        String cron = listenerConfig.getCronExpression();
+        logger.info("启动定时任务:{} >> {}", key, cron);
+        scheduledTaskService.start(key, cron, this);
+    }
 
+    @Override
     public void run() {
-        action.execute(this);
+        if(running.compareAndSet(false, true)){
+            // TODO 获取tableGroup
+            Map<String, String> command = null;
+            int pageIndex = 1;
+            int pageSize = 20;
+            connectorFactory.reader(connectorConfig, command, pageIndex, pageSize);
+        }
+    }
+
+    @Override
+    public void close() {
+        scheduledTaskService.stop(key);
     }
 
     public void addListener(Event event) {
@@ -51,35 +84,28 @@ public abstract class DefaultExtractor implements Extractor {
 
     public void flushEvent() {
         if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.flushEvent());
+            watcher.forEach(w -> w.flushEvent(map));
         }
     }
 
-    public ConnectorConfig getConnectorConfig() {
-        return connectorConfig;
-    }
-
     public void setConnectorConfig(ConnectorConfig connectorConfig) {
         this.connectorConfig = connectorConfig;
     }
 
-    public ListenerConfig getListenerConfig() {
-        return listenerConfig;
-    }
-
     public void setListenerConfig(ListenerConfig listenerConfig) {
         this.listenerConfig = listenerConfig;
     }
 
-    public Map<String, String> getMap() {
-        return map;
+    public void setConnectorFactory(ConnectorFactory connectorFactory) {
+        this.connectorFactory = connectorFactory;
+    }
+
+    public void setScheduledTaskService(ScheduledTaskService scheduledTaskService) {
+        this.scheduledTaskService = scheduledTaskService;
     }
 
     public void setMap(Map<String, String> map) {
         this.map = map;
     }
 
-    public void setAction(Action action) {
-        this.action = action;
-    }
 }

+ 3 - 8
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -3,18 +3,13 @@ package org.dbsyncer.listener;
 public interface Extractor {
 
     /**
-     * 根据日志/事件等抽取
+     * 启动定时/日志等方式抽取增量数据
      */
-    void extract();
-
-    /**
-     * 定时抽取
-     */
-    void extractTiming();
+    void start();
 
     /**
      * 关闭任务
      */
     void close();
 
-}
+}

+ 34 - 6
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java

@@ -1,26 +1,54 @@
 package org.dbsyncer.listener;
 
+import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerEnum;
 import org.dbsyncer.listener.enums.ListenerTypeEnum;
+import org.dbsyncer.listener.quartz.ScheduledTaskService;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
 
 import java.util.Map;
 
 @Component
 public class ListenerFactory implements Listener {
 
+    @Autowired
+    private ConnectorFactory connectorFactory;
+
+    @Autowired
+    private ScheduledTaskService scheduledTaskService;
+
     @Override
-    public DefaultExtractor createExtractor(ConnectorConfig config, ListenerConfig listenerConfig, Map<String, String> map)
+    public DefaultExtractor createExtractor(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map)
             throws IllegalAccessException, InstantiationException {
-        Class<DefaultExtractor> clazz = (Class<DefaultExtractor>) ListenerEnum.getExtractor(config.getConnectorType());
-        DefaultExtractor extractor = clazz.newInstance();
-        // log/timing
-        extractor.setAction(ListenerTypeEnum.getAction(listenerConfig.getListenerType()));
-        extractor.setConnectorConfig(config);
+        String listenerType = listenerConfig.getListenerType();
+        String connectorType = connectorConfig.getConnectorType();
+        DefaultExtractor extractor = getDefaultExtractor(listenerType, connectorType);
+
+        extractor.setConnectorConfig(connectorConfig);
         extractor.setListenerConfig(listenerConfig);
         extractor.setMap(map);
         return extractor;
     }
+
+    private DefaultExtractor getDefaultExtractor(String listenerType, String connectorType)
+            throws IllegalAccessException, InstantiationException {
+        // 默认定时抽取
+        if (ListenerTypeEnum.isTiming(listenerType)) {
+            Class<DefaultExtractor> clazz = (Class<DefaultExtractor>) ListenerEnum.DEFAULT.getClazz();
+            DefaultExtractor extractor = clazz.newInstance();
+            extractor.setConnectorFactory(connectorFactory);
+            extractor.setScheduledTaskService(scheduledTaskService);
+            return extractor;
+        }
+
+        // 基于日志抽取
+        Assert.isTrue(ListenerTypeEnum.isLog(listenerType), "未知的同步方式.");
+        Class<DefaultExtractor> clazz = (Class<DefaultExtractor>) ListenerEnum.getExtractor(connectorType);
+        return clazz.newInstance();
+    }
+
 }

+ 7 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java

@@ -2,8 +2,9 @@ package org.dbsyncer.listener.enums;
 
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.enums.ConnectorEnum;
+import org.dbsyncer.listener.DefaultExtractor;
 import org.dbsyncer.listener.ListenerException;
-import org.dbsyncer.listener.extractor.MysqlExtractor;
+import org.dbsyncer.listener.mysql.MysqlExtractor;
 
 /**
  * @author AE86
@@ -12,11 +13,14 @@ import org.dbsyncer.listener.extractor.MysqlExtractor;
  */
 public enum ListenerEnum {
 
+    /**
+     * 定时
+     */
+    DEFAULT(ListenerTypeEnum.TIMING.getType(), DefaultExtractor.class),
     /**
      * Mysql
      */
-    MYSQL(ConnectorEnum.MYSQL.getType(), MysqlExtractor.class),
-    ;
+    MYSQL(ConnectorEnum.MYSQL.getType(), MysqlExtractor.class);
 
     private String type;
     private Class<?> clazz;

+ 11 - 19
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerTypeEnum.java

@@ -1,8 +1,6 @@
 package org.dbsyncer.listener.enums;
 
 import org.apache.commons.lang.StringUtils;
-import org.dbsyncer.listener.Action;
-import org.dbsyncer.listener.ListenerException;
 
 /**
  * @author AE86
@@ -12,36 +10,30 @@ import org.dbsyncer.listener.ListenerException;
 public enum ListenerTypeEnum {
 
     /**
-     * 日志
+     * 定时
      */
-    LOG("log", extractor -> extractor.extract()),
+    TIMING("timing"),
     /**
-     * 定时
+     * 日志
      */
-    TIMING("timing", extractor -> extractor.extractTiming());
+    LOG("log");
 
     private String type;
-    private Action action;
 
-    ListenerTypeEnum(String type, Action action) {
+    ListenerTypeEnum(String type) {
         this.type = type;
-        this.action = action;
     }
 
-    public static Action getAction(String type) throws ListenerException {
-        for (ListenerTypeEnum e : ListenerTypeEnum.values()) {
-            if (StringUtils.equals(type, e.getType())) {
-                return e.getAction();
-            }
-        }
-        throw new ListenerException(String.format("Action type \"%s\" does not exist.", type));
+    public static boolean isTiming(String type) {
+        return StringUtils.equals(TIMING.getType(), type);
+    }
+
+    public static boolean isLog(String type) {
+        return StringUtils.equals(LOG.getType(), type);
     }
 
     public String getType() {
         return type;
     }
 
-    public Action getAction() {
-        return action;
-    }
 }

+ 202 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -0,0 +1,202 @@
+package org.dbsyncer.listener.mysql;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.listener.DefaultExtractor;
+import org.dbsyncer.listener.ListenerException;
+import org.dbsyncer.listener.config.Host;
+import org.dbsyncer.listener.mysql.binlog.BinlogEventListener;
+import org.dbsyncer.listener.mysql.binlog.BinlogEventV4;
+import org.dbsyncer.listener.mysql.binlog.BinlogRemoteClient;
+import org.dbsyncer.listener.mysql.binlog.impl.event.*;
+import org.dbsyncer.listener.mysql.common.glossary.Column;
+import org.dbsyncer.listener.mysql.common.glossary.Pair;
+import org.dbsyncer.listener.mysql.common.glossary.Row;
+import org.dbsyncer.listener.mysql.common.glossary.column.StringColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+import java.util.*;
+import java.util.regex.Matcher;
+
+import static java.util.regex.Pattern.compile;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-12 21:14
+ */
+public class MysqlExtractor extends DefaultExtractor {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private static final String BINLOG_FILENAME = "fileName";
+    private static final String BINLOG_POSITION = "position";
+    private BinlogRemoteClient client;
+    private List<Host> cluster;
+    private int master = 0;
+
+    @Override
+    public void start() {
+        try {
+            final DatabaseConfig config = (DatabaseConfig) connectorConfig;
+            cluster = readNodes(config.getUrl());
+            Assert.notEmpty(cluster, "Mysql连接地址有误.");
+
+            final Host host = cluster.get(master);
+            final String username = config.getUsername();
+            final String password = config.getPassword();
+            // mysql-binlog-127.0.0.1:3306-654321
+            final String threadSuffixName = new StringBuilder("mysql-binlog-")
+                    .append(host.getIp()).append(":").append(host.getPort()).append("-")
+                    .append(RandomStringUtils.randomNumeric(6))
+                    .toString();
+
+            client = new BinlogRemoteClient(host.getIp(), host.getPort(), username, password, threadSuffixName);
+            client.setBinlogFileName(map.get(BINLOG_FILENAME));
+            String pos = map.get(BINLOG_POSITION);
+            client.setBinlogPosition(StringUtils.isBlank(pos) ? 0 : Long.parseLong(pos));
+            client.setBinlogEventListener(new MysqlEventListener());
+            client.start();
+        } catch (Exception e) {
+            logger.error("启动失败:{}", e.getMessage());
+            throw new ListenerException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            if(null != client){
+                client.stopQuietly();
+            }
+        } catch (Exception e) {
+            logger.error("关闭失败:{}", e.getMessage());
+        }
+    }
+
+    private List<Host> readNodes(String url) {
+        if (StringUtils.isBlank(url)) {
+            return Collections.EMPTY_LIST;
+        }
+        Matcher matcher = compile("(//)(?!(/)).+?(/)").matcher(url);
+        while (matcher.find()) {
+            url = matcher.group(0);
+            break;
+        }
+        url = StringUtils.replace(url, "/", "");
+
+        List<Host> cluster = new ArrayList<>();
+        String[] arr = StringUtils.split(url, ",");
+        int size = arr.length;
+        for (int i = 0; i < size; i++) {
+            String[] host = StringUtils.split(arr[i], ":");
+            if (2 == host.length) {
+                cluster.add(new Host(host[0], Integer.parseInt(host[1])));
+            }
+        }
+        return cluster;
+    }
+
+    /**
+     * 有变化触发刷新binlog增量事件
+     *
+     * @param event
+     */
+    private void refresh(AbstractBinlogEventV4 event) {
+        String binlogFilename = event.getBinlogFilename();
+        long nextPosition = event.getHeader().getNextPosition();
+
+        // binlogFileName
+        if (StringUtils.isNotBlank(binlogFilename) && !StringUtils.equals(binlogFilename, client.getBinlogFileName())) {
+            client.setBinlogFileName(binlogFilename);
+        }
+        client.setBinlogPosition(nextPosition);
+
+        // nextPosition
+        map.put(BINLOG_FILENAME, client.getBinlogFileName());
+        map.put(BINLOG_POSITION, String.valueOf(client.getBinlogPosition()));
+    }
+
+    final class MysqlEventListener implements BinlogEventListener {
+
+        private Map<Long, String> table = new HashMap<>();
+
+        @Override
+        public void onEvents(BinlogEventV4 event) {
+            if (event == null) {
+                logger.error("binlog event is null");
+                return;
+            }
+
+            if (event instanceof TableMapEvent) {
+                TableMapEvent tableEvent = (TableMapEvent) event;
+                table.putIfAbsent(tableEvent.getTableId(), tableEvent.getTableName().toString());
+                return;
+            }
+
+            if (event instanceof UpdateRowsEventV2) {
+                UpdateRowsEventV2 e = (UpdateRowsEventV2) event;
+                final String tableName = table.get(e.getTableId());
+                List<Pair<Row>> rows = e.getRows();
+                for (Pair<Row> p : rows) {
+                    List<Object> before = new ArrayList<>();
+                    List<Object> after = new ArrayList<>();
+                    addAll(before, p.getBefore().getColumns());
+                    addAll(after, p.getAfter().getColumns());
+                    changedEvent(tableName, ConnectorConstant.OPERTION_UPDATE, before, after);
+                    break;
+                }
+                return;
+            }
+
+            if (event instanceof WriteRowsEventV2) {
+                WriteRowsEventV2 e = (WriteRowsEventV2) event;
+                final String tableName = table.get(e.getTableId());
+                List<Row> rows = e.getRows();
+                for (Row row : rows) {
+                    List<Object> after = new ArrayList<>();
+                    addAll(after, row.getColumns());
+                    changedEvent(tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, after);
+                    break;
+                }
+                return;
+            }
+
+            if (event instanceof DeleteRowsEventV2) {
+                DeleteRowsEventV2 e = (DeleteRowsEventV2) event;
+                final String tableName = table.get(e.getTableId());
+                List<Row> rows = e.getRows();
+                for (Row row : rows) {
+                    List<Object> before = new ArrayList<>();
+                    addAll(before, row.getColumns());
+                    changedEvent(tableName, ConnectorConstant.OPERTION_DELETE, before, Collections.EMPTY_LIST);
+                    break;
+                }
+                return;
+            }
+
+            // 处理事件优先级:RotateEvent > FormatDescriptionEvent > TableMapEvent > RowsEvent > XidEvent
+            if (event instanceof XidEvent) {
+                refresh((XidEvent) event);
+                return;
+            }
+
+            // 切换binlog
+            if (event instanceof RotateEvent) {
+                refresh((RotateEvent) event);
+                return;
+            }
+
+        }
+
+        private void addAll(List<Object> before, List<Column> columns) {
+            columns.forEach(c -> before.add((c instanceof StringColumn) ? c.toString() : c.getValue()));
+        }
+
+    }
+
+}

+ 15 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskService.java

@@ -2,8 +2,21 @@ package org.dbsyncer.listener.quartz;
 
 public interface ScheduledTaskService {
 
-    void start(ScheduledTask task);
+    /**
+     * 第一位,表示秒,取值0-59
+     * 第二位,表示分,取值0-59
+     * 第三位,表示小时,取值0-23
+     * 第四位,日期天/日,取值1-31
+     * 第五位,日期月份,取值1-12
+     * 第六位,星期,取值1-7
+     * [秒 分 时 日 月 星期]
+     *
+     * @param key 任务唯一key
+     * @param cron 任务表达式
+     * @param job 任务实现
+     */
+    void start(String key, String cron, ScheduledTaskJob job);
 
-    void stop(String taskKey);
+    void stop(String key);
 
 }

+ 9 - 32
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ScheduledTaskServiceImpl.java

@@ -27,50 +27,27 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService {
     @Autowired
     private ThreadPoolTaskScheduler taskScheduler;
 
-    /**
-     * 存放已经启动的任务map
-     */
     private Map<String, ScheduledFuture> map = new ConcurrentHashMap<>();
 
-    /**
-     * 根据任务key 启动任务
-     */
     @Override
-    public void start(ScheduledTask task) {
-        logger.info(">>>>>> 启动任务 {} 开始 >>>>>>", task);
+    public void start(String key, String cron, ScheduledTaskJob job) {
         //校验任务key是否已经启动
-        String key = task.getKey();
-        if (this.isStart(key)) {
-            logger.info(">>>>>> 当前任务已经启动,无需重复启动!");
+        final ScheduledFuture scheduledFuture = map.get(key);
+        if (null != scheduledFuture && !scheduledFuture.isCancelled()) {
+            logger.warn(">>>>>> 当前任务已经启动,无需重复启动!");
+            return;
         }
-        // 定时表达式
-        String taskCron = task.getCron();
-        // delegate
-        ScheduledTaskJob job = task.getJob();
         //获取需要定时调度的接口
-        map.putIfAbsent(key, taskScheduler.schedule(job,
-                (triggerContext) -> new CronTrigger(taskCron).nextExecutionTime(triggerContext)
-        ));
+        map.putIfAbsent(key, taskScheduler.schedule(job, (trigger) -> new CronTrigger(cron).nextExecutionTime(trigger) ));
     }
 
-    /**
-     * 根据 key 停止任务
-     */
     @Override
-    public void stop(String taskKey) {
-        logger.info(">>>>>> 进入停止任务 {}  >>>>>>", taskKey);
-        ScheduledFuture job = map.get(taskKey);
+    public void stop(String key) {
+        ScheduledFuture job = map.get(key);
         if (null != job) {
+            logger.info(">>>>>> 进入停止任务 {}  >>>>>>", key);
             job.cancel(true);
         }
     }
 
-    /**
-     * 任务是否已经启动
-     */
-    private boolean isStart(String key) {
-        final ScheduledFuture job = map.get(key);
-        return null != job && job.isCancelled();
-    }
-
 }

+ 0 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/AbstractPuller.java

@@ -1,15 +1,11 @@
 package org.dbsyncer.manager.puller;
 
 import org.dbsyncer.common.event.ClosedEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
 
 public abstract class AbstractPuller implements Puller {
 
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
     @Autowired
     private ApplicationContext applicationContext;
 

+ 38 - 10
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -2,9 +2,12 @@ package org.dbsyncer.manager.puller.impl;
 
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.listener.DefaultExtractor;
 import org.dbsyncer.listener.Listener;
+import org.dbsyncer.listener.quartz.ScheduledTaskJob;
+import org.dbsyncer.listener.quartz.ScheduledTaskService;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.config.FieldPicker;
 import org.dbsyncer.manager.puller.AbstractPuller;
@@ -12,6 +15,8 @@ import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
@@ -21,6 +26,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * 增量同步
@@ -30,7 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * @date 2020/04/26 15:28
  */
 @Component
-public class IncrementPuller extends AbstractPuller {
+public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob, InitializingBean, DisposableBean {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -43,6 +49,11 @@ public class IncrementPuller extends AbstractPuller {
     @Autowired
     private Manager manager;
 
+    @Autowired
+    private ScheduledTaskService scheduledTaskService;
+
+    private String key;
+
     private Map<String, DefaultExtractor> map = new ConcurrentHashMap<>();
 
     @Override
@@ -69,7 +80,7 @@ public class IncrementPuller extends AbstractPuller {
 
             // 执行任务
             logger.info("启动成功:{}", metaId);
-            map.get(metaId).run();
+            map.get(metaId).start();
         } catch (Exception e) {
             close(metaId);
             logger.error("运行异常,结束任务{}:{}", metaId, e.getMessage());
@@ -88,16 +99,32 @@ public class IncrementPuller extends AbstractPuller {
         }
     }
 
+    @Override
+    public void run() {
+        // 定时同步增量信息
+        map.forEach((k, v) -> v.flushEvent());
+    }
+
+    @Override
+    public void afterPropertiesSet() {
+        key = UUIDUtil.getUUID();
+        scheduledTaskService.start(key, "*/10 * * * * ?", this);
+    }
+
+    @Override
+    public void destroy() {
+        scheduledTaskService.stop(key);
+    }
+
     final class DefaultListener implements Event {
 
         private Mapping mapping;
-        private List<TableGroup> list;
         private String metaId;
         private Map<String, List<FieldPicker>> tablePicker;
+        private AtomicBoolean changed = new AtomicBoolean();
 
         public DefaultListener(Mapping mapping, List<TableGroup> list) {
             this.mapping = mapping;
-            this.list = list;
             this.metaId = mapping.getMetaId();
             this.tablePicker = new LinkedHashMap<>();
             list.forEach(t -> {
@@ -121,17 +148,18 @@ public class IncrementPuller extends AbstractPuller {
                 });
             }
 
+            // 标记有变更记录
+            changed.compareAndSet(false, true);
         }
 
         @Override
-        public void flushEvent() {
-            // TODO 更新待优化,存在性能问题
-            DefaultExtractor extractor = map.get(metaId);
-            if (null != extractor) {
-                logger.debug("flushEvent map:{}", extractor.getMap());
+        public void flushEvent(Map<String, String> map) {
+            // 如果有变更,执行更新
+            if (changed.compareAndSet(true, false)) {
                 Meta meta = manager.getMeta(metaId);
                 if (null != meta) {
-                    meta.setMap(extractor.getMap());
+                    logger.info("同步增量信息:{}>>{}", metaId, map);
+                    meta.setMap(map);
                     manager.editMeta(meta);
                 }
             }