1
0
AE86 3 жил өмнө
parent
commit
b5391e34e2

+ 26 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java

@@ -15,13 +15,36 @@ import java.util.Map;
  */
 public interface Connector {
 
+    /**
+     * 建立连接
+     *
+     * @param config
+     * @return
+     */
+    ConnectorMapper connect(ConnectorConfig config);
+
+    /**
+     * 断开连接
+     *
+     * @param connectorMapper
+     */
+    void disconnect(ConnectorMapper connectorMapper);
+
     /**
      * 检查连接器是否连接正常
      *
-     * @param config 连接器配置
+     * @param connectorMapper
+     * @return
+     */
+    boolean isAlive(ConnectorMapper connectorMapper);
+
+    /**
+     * 获取连接缓存key
+     *
+     * @param config
      * @return
      */
-    boolean isAlive(ConnectorConfig config);
+    String getConnectorMapperCacheKey(ConnectorConfig config);
 
     /**
      * 获取所有表名
@@ -88,4 +111,5 @@ public interface Connector {
      * @return
      */
     Result writer(WriterSingleConfig config);
+
 }

+ 72 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -3,11 +3,14 @@ package org.dbsyncer.connector;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.enums.ConnectorEnum;
+import org.springframework.beans.factory.DisposableBean;
 import org.springframework.util.Assert;
 
+import java.sql.Connection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * 连接器工厂
@@ -16,7 +19,56 @@ import java.util.Map;
  * @version 1.0.0
  * @date 2019/9/18 23:30
  */
-public class ConnectorFactory {
+public class ConnectorFactory implements DisposableBean {
+
+    private final Map<String, ConnectorMapper> connectorCache = new ConcurrentHashMap<>();
+
+    @Override
+    public void destroy() {
+        connectorCache.values().forEach(this::disconnect);
+        connectorCache.clear();
+    }
+
+    /**
+     * 建立连接
+     *
+     * @param config
+     */
+    public Connection connect(ConnectorConfig config) {
+        return connect(config, Connection.class);
+    }
+
+    public <T> T connect(ConnectorConfig config, Class<T> valueType) {
+        Assert.notNull(config, "ConnectorConfig can not be null.");
+        String type = config.getConnectorType();
+        Connector connector = getConnector(type);
+        String cacheKey = connector.getConnectorMapperCacheKey(config);
+        ConnectorMapper mapper = connectorCache.get(cacheKey);
+        if (null == mapper) {
+            mapper = connector.connect(config);
+            connectorCache.putIfAbsent(mapper.getCacheKey(), mapper);
+        }
+        return (T) mapper.getConnection();
+    }
+
+    /**
+     * TODO 更新连接配置
+     *
+     * @param config
+     * @return
+     */
+    public void updateConnectorConfig(ConnectorConfig config) {
+        Assert.notNull(config, "ConnectorConfig can not be null.");
+        String type = config.getConnectorType();
+        Connector connector = getConnector(type);
+        String cacheKey = connector.getConnectorMapperCacheKey(config);
+        if (connectorCache.containsKey(cacheKey)) {
+            ConnectorMapper mapper = connectorCache.get(cacheKey);
+            connector.disconnect(mapper);
+            connectorCache.remove(cacheKey);
+        }
+        connect(config);
+    }
 
     /**
      * 检查连接配置是否可用
@@ -27,7 +79,13 @@ public class ConnectorFactory {
     public boolean isAlive(ConnectorConfig config) {
         Assert.notNull(config, "ConnectorConfig can not be null.");
         String type = config.getConnectorType();
-        return getConnector(type).isAlive(config);
+        Connector connector = getConnector(type);
+        String cacheKey = connector.getConnectorMapperCacheKey(config);
+        if (!connectorCache.containsKey(cacheKey)) {
+            connect(config);
+        }
+        final ConnectorMapper connectorMapper = connectorCache.get(cacheKey);
+        return connector.isAlive(connectorMapper);
     }
 
     /**
@@ -78,7 +136,7 @@ public class ConnectorFactory {
      * @param command
      * @return
      */
-    public long getCount(ConnectorConfig config, Map<String, String> command){
+    public long getCount(ConnectorConfig config, Map<String, String> command) {
         Connector connector = getConnector(config.getConnectorType());
         return connector.getCount(config, command);
     }
@@ -116,4 +174,15 @@ public class ConnectorFactory {
         return ConnectorEnum.getConnector(connectorType);
     }
 
+    /**
+     * 断开连接
+     *
+     * @param connectorMapper
+     */
+    private void disconnect(ConnectorMapper connectorMapper) {
+        Assert.notNull(connectorMapper, "ConnectorMapper can not be null.");
+        String type = connectorMapper.getConfig().getConnectorType();
+        getConnector(type).disconnect(connectorMapper);
+    }
+
 }

+ 27 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorMapper.java

@@ -0,0 +1,27 @@
+package org.dbsyncer.connector;
+
+import org.dbsyncer.connector.config.ConnectorConfig;
+
+public class ConnectorMapper {
+    private ConnectorConfig config;
+    private String          cacheKey;
+    private Object          connection;
+
+    public ConnectorMapper(ConnectorConfig config, String cacheKey, Object connection) {
+        this.config = config;
+        this.cacheKey = cacheKey;
+        this.connection = connection;
+    }
+
+    public ConnectorConfig getConfig() {
+        return config;
+    }
+
+    public String getCacheKey() {
+        return cacheKey;
+    }
+
+    public Object getConnection() {
+        return connection;
+    }
+}

+ 27 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -4,6 +4,7 @@ import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorException;
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.OperationEnum;
@@ -30,17 +31,37 @@ public abstract class AbstractDatabaseConnector implements Database {
     protected abstract String getTablesSql(DatabaseConfig config);
 
     @Override
-    public boolean isAlive(ConnectorConfig config) {
+    public ConnectorMapper connect(ConnectorConfig config) {
         DatabaseConfig cfg = (DatabaseConfig) config;
-        Connection connection = null;
         try {
-            connection = JDBCUtil.getConnection(cfg.getDriverClassName(), cfg.getUrl(), cfg.getUsername(), cfg.getPassword());
+            String cacheKey = getConnectorMapperCacheKey(config);
+            return new ConnectorMapper(config, cacheKey, JDBCUtil.getConnection(cfg.getDriverClassName(), cfg.getUrl(), cfg.getUsername(), cfg.getPassword()));
         } catch (Exception e) {
             logger.error("Failed to connect:{}, message:{}", cfg.getUrl(), e.getMessage());
-        } finally {
-            JDBCUtil.close(connection);
         }
-        return null != connection;
+        throw new ConnectorException(String.format("Failed to connect:%s", cfg.getUrl()));
+    }
+
+    @Override
+    public void disconnect(ConnectorMapper connectorMapper) {
+        JDBCUtil.close((Connection) connectorMapper.getConnection());
+    }
+
+    @Override
+    public boolean isAlive(ConnectorMapper connectorMapper) {
+        try {
+            Connection connection = (Connection) connectorMapper.getConnection();
+            return null != connection && !connection.isClosed();
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+        }
+        return false;
+    }
+
+    @Override
+    public String getConnectorMapperCacheKey(ConnectorConfig config) {
+        DatabaseConfig cfg = (DatabaseConfig) config;
+        return String.format("%s-%s", cfg.getUrl(), cfg.getUsername());
     }
 
     @Override

+ 18 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/RedisConnector.java

@@ -2,6 +2,7 @@ package org.dbsyncer.connector.redis;
 
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.util.RedisUtil;
 import org.slf4j.Logger;
@@ -18,8 +19,18 @@ public final class RedisConnector implements Redis {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Override
-    public boolean isAlive(ConnectorConfig config) {
-        RedisConfig cfg = (RedisConfig) config;
+    public ConnectorMapper connect(ConnectorConfig config) {
+        return null;
+    }
+
+    @Override
+    public void disconnect(ConnectorMapper connectorMapper) {
+
+    }
+
+    @Override
+    public boolean isAlive(ConnectorMapper connectorMapper) {
+        RedisConfig cfg = (RedisConfig) connectorMapper.getConfig();
         RedisTemplate template = null;
         boolean r = false;
         try {
@@ -37,6 +48,11 @@ public final class RedisConnector implements Redis {
         return r;
     }
 
+    @Override
+    public String getConnectorMapperCacheKey(ConnectorConfig config) {
+        return null;
+    }
+
     @Override
     public List<String> getTable(ConnectorConfig config) {
         return null;

+ 12 - 6
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -3,6 +3,7 @@ package org.dbsyncer.listener;
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.quartz.ScheduledTaskService;
@@ -27,10 +28,11 @@ public abstract class AbstractExtractor implements Extractor {
     private final Logger logger = LoggerFactory.getLogger(getClass());
     protected BlockingQueue queue = new LinkedBlockingQueue<>(100);
     protected Executor taskExecutor;
+    protected ConnectorFactory connectorFactory;
     protected ScheduledTaskService scheduledTaskService;
     protected ConnectorConfig connectorConfig;
     protected ListenerConfig listenerConfig;
-    protected Map<String, String> map;
+    protected Map<String, String> snapshot;
     protected Set<String> filterTable;
     private List<Event> watcher;
 
@@ -69,15 +71,15 @@ public abstract class AbstractExtractor implements Extractor {
     @Override
     public void flushEvent() {
         if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.flushEvent(map));
+            watcher.forEach(w -> w.flushEvent(snapshot));
         }
     }
 
     @Override
     public void forceFlushEvent() {
         if (!CollectionUtils.isEmpty(watcher)) {
-            logger.info("Force flush:{}", map);
-            watcher.forEach(w -> w.forceFlushEvent(map));
+            logger.info("Force flush:{}", snapshot);
+            watcher.forEach(w -> w.forceFlushEvent(snapshot));
         }
     }
 
@@ -103,6 +105,10 @@ public abstract class AbstractExtractor implements Extractor {
         this.taskExecutor = taskExecutor;
     }
 
+    public void setConnectorFactory(ConnectorFactory connectorFactory) {
+        this.connectorFactory = connectorFactory;
+    }
+
     public void setScheduledTaskService(ScheduledTaskService scheduledTaskService) {
         this.scheduledTaskService = scheduledTaskService;
     }
@@ -115,8 +121,8 @@ public abstract class AbstractExtractor implements Extractor {
         this.listenerConfig = listenerConfig;
     }
 
-    public void setMap(Map<String, String> map) {
-        this.map = map;
+    public void setSnapshot(Map<String, String> snapshot) {
+        this.snapshot = snapshot;
     }
 
     public void setFilterTable(Set<String> filterTable) {

+ 4 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -83,9 +83,9 @@ public class MysqlExtractor extends AbstractExtractor {
         final Host host = cluster.get(MASTER);
         final String username = config.getUsername();
         final String password = config.getPassword();
-        final String pos = map.get(BINLOG_POSITION);
+        final String pos = snapshot.get(BINLOG_POSITION);
         client = new BinaryLogRemoteClient(host.getIp(), host.getPort(), username, password);
-        client.setBinlogFilename(map.get(BINLOG_FILENAME));
+        client.setBinlogFilename(snapshot.get(BINLOG_FILENAME));
         client.setBinlogPosition(StringUtils.isBlank(pos) ? 0 : Long.parseLong(pos));
         client.setTableMapEventByTableId(tables);
         client.registerEventListener(new MysqlEventListener());
@@ -151,11 +151,11 @@ public class MysqlExtractor extends AbstractExtractor {
     private void refresh(String binlogFilename, long nextPosition) {
         if (StringUtils.isNotBlank(binlogFilename)) {
             client.setBinlogFilename(binlogFilename);
-            map.put(BINLOG_FILENAME, binlogFilename);
+            snapshot.put(BINLOG_FILENAME, binlogFilename);
         }
         if (0 < nextPosition) {
             client.setBinlogPosition(nextPosition);
-            map.put(BINLOG_POSITION, String.valueOf(nextPosition));
+            snapshot.put(BINLOG_POSITION, String.valueOf(nextPosition));
         }
     }
 

+ 4 - 11
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java

@@ -5,7 +5,6 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.UUIDUtil;
-import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
@@ -31,7 +30,6 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private ConnectorFactory connectorFactory;
     private List<Map<String, String>> commands;
     private int commandSize;
 
@@ -110,8 +108,7 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
 
         // 持久化
         if (point.refreshed()) {
-            map.putAll(point.getPosition());
-            logger.info("增量点:{}", map);
+            snapshot.putAll(point.getPosition());
         }
 
     }
@@ -142,15 +139,15 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
 
             // 开始位置
             if(f.begin()){
-                if (!map.containsKey(key)) {
+                if (!snapshot.containsKey(key)) {
                     final Object val = f.getObject();
                     point.addArg(val);
-                    map.put(key, f.toString(val));
+                    snapshot.put(key, f.toString(val));
                     continue;
                 }
 
                 // 读取历史增量点
-                Object val = f.getObject(map.get(key));
+                Object val = f.getObject(snapshot.get(key));
                 point.addArg(val);
                 point.setBeginKey(key);
                 point.setBeginValue(f.toString(f.getObject()));
@@ -184,10 +181,6 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
         return StringUtils.indexOf(str, searchStr) == StringUtils.lastIndexOf(str, searchStr);
     }
 
-    public void setConnectorFactory(ConnectorFactory connectorFactory) {
-        this.connectorFactory = connectorFactory;
-    }
-
     public void setCommands(List<Map<String, String>> commands) {
         this.commands = commands;
     }

+ 40 - 35
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -7,7 +7,6 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
-import org.dbsyncer.connector.util.JDBCUtil;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.slf4j.Logger;
@@ -18,6 +17,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.sql.*;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -48,7 +48,9 @@ public class SqlServerExtractor extends AbstractExtractor {
 
     private static final String LSN_POSITION = "position";
     private static final long DEFAULT_POLL_INTERVAL_MILLIS = 36000;
+    private static final int PREPARED_STATEMENT_CACHE_CAPACITY = 500;
     private static final int OFFSET_COLUMNS = 4;
+    private final Map<String, PreparedStatement> preparedStatementCache = new ConcurrentHashMap<>(PREPARED_STATEMENT_CACHE_CAPACITY);
     private final Lock connectLock = new ReentrantLock();
     private volatile boolean connected;
     private static Set<String> tables;
@@ -56,6 +58,7 @@ public class SqlServerExtractor extends AbstractExtractor {
     private Connection connection;
     private Worker worker;
     private Lsn lastLsn;
+    private String serverName;
 
     @Override
     public void start() {
@@ -77,10 +80,10 @@ public class SqlServerExtractor extends AbstractExtractor {
             enableTableCDC();
             readChangeTables();
             readLastLsn();
-            JDBCUtil.close(connection);
+            getTrustedServerNameAE();
 
             worker = new Worker();
-            worker.setName(new StringBuilder("cdc-parser-").append(getTrustedServerNameAE()).append("_").append(RandomUtils.nextInt(100)).toString());
+            worker.setName(new StringBuilder("cdc-parser-").append(serverName).append("_").append(RandomUtils.nextInt(100)).toString());
             worker.setDaemon(false);
             worker.start();
         } catch (Exception e) {
@@ -95,18 +98,14 @@ public class SqlServerExtractor extends AbstractExtractor {
     @Override
     public void close() {
         if (connected) {
-            try {
-                connectLock.lock();
-                if (null != worker && !worker.isInterrupted()) {
-                    worker.interrupt();
-                    worker = null;
-                }
-                disableTableCDC();
-                JDBCUtil.close(connection);
-                connected = false;
-            } finally {
-                connectLock.unlock();
+            if (null != worker && !worker.isInterrupted()) {
+                worker.interrupt();
+                worker = null;
             }
+            disableTableCDC();
+            preparedStatementCache.values().forEach(this::close);
+            preparedStatementCache.clear();
+            connected = false;
         }
     }
 
@@ -120,34 +119,29 @@ public class SqlServerExtractor extends AbstractExtractor {
         }
     }
 
-    private void connect() throws SQLException {
-        final DatabaseConfig config = (DatabaseConfig) connectorConfig;
-        String driverClassName = config.getDriverClassName();
-        String username = config.getUsername();
-        String password = config.getPassword();
-        String url = config.getUrl();
-        connection = JDBCUtil.getConnection(driverClassName, url, username, password);
+    private void connect() {
+        connection = connectorFactory.connect((DatabaseConfig) connectorConfig);
     }
 
-    private String getTrustedServerNameAE() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    private void getTrustedServerNameAE() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
         SQLServerConnection conn = (SQLServerConnection) connection;
         Class clazz = conn.getClass();
         Method method = clazz.getDeclaredMethod("getTrustedServerNameAE");
         method.setAccessible(true);
-        return (String) method.invoke(conn, new Object[]{});
+        serverName = (String) method.invoke(conn, new Object[]{});
     }
 
     private void readLastLsn() {
-        if (!map.containsKey(LSN_POSITION)) {
+        if (!snapshot.containsKey(LSN_POSITION)) {
             lastLsn = queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
             if (null != lastLsn && lastLsn.isAvailable()) {
-                map.put(LSN_POSITION, lastLsn.toString());
+                snapshot.put(LSN_POSITION, lastLsn.toString());
                 return;
             }
             // Shouldn't happen if the agent is running, but it is better to guard against such situation
             throw new ListenerException("No maximum LSN recorded in the database");
         }
-        lastLsn = Lsn.valueOf(map.get(LSN_POSITION));
+        lastLsn = Lsn.valueOf(snapshot.get(LSN_POSITION));
     }
 
     private void readTables() {
@@ -315,11 +309,10 @@ public class SqlServerExtractor extends AbstractExtractor {
     }
 
     private <T> T query(String sql, StatementPreparer statementPreparer, ResultSetMapper<T> mapper) {
-        PreparedStatement ps = null;
         ResultSet rs = null;
         T apply = null;
         try {
-            ps = connection.prepareStatement(sql);
+            final PreparedStatement ps = createPreparedStatement(sql);
             if (null != statementPreparer) {
                 statementPreparer.accept(ps);
             }
@@ -330,12 +323,29 @@ public class SqlServerExtractor extends AbstractExtractor {
         } catch (Exception e) {
             logger.error(e.getMessage());
         } finally {
-            close(ps);
             close(rs);
         }
         return apply;
     }
 
+    private PreparedStatement createPreparedStatement(String preparedQueryString) {
+        try {
+            if(connection.isClosed()){
+                connect();
+            }
+            return preparedStatementCache.computeIfAbsent(preparedQueryString, query -> {
+                try {
+                    return connection.prepareStatement(query);
+                } catch (SQLException e) {
+                    throw new ListenerException(e);
+                }
+            });
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+            throw new ListenerException(e.getCause());
+        }
+    }
+
     enum TableOperation {
         /**
          * 插入
@@ -387,7 +397,6 @@ public class SqlServerExtractor extends AbstractExtractor {
         public void run() {
             while (!isInterrupted() && connected) {
                 try {
-                    connect();
                     Lsn stopLsn = queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
                     if (!stopLsn.isAvailable()) {
                         TimeUnit.MICROSECONDS.sleep(DEFAULT_POLL_INTERVAL_MILLIS);
@@ -402,13 +411,9 @@ public class SqlServerExtractor extends AbstractExtractor {
                     pull(stopLsn);
 
                     lastLsn = stopLsn;
-                    map.put(LSN_POSITION, lastLsn.toString());
+                    snapshot.put(LSN_POSITION, lastLsn.toString());
                 } catch (InterruptedException e) {
                     break;
-                } catch (Exception e) {
-                    logger.error(e.getMessage());
-                } finally {
-                    JDBCUtil.close(connection);
                 }
             }
         }

+ 8 - 11
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -19,6 +19,7 @@ import org.dbsyncer.listener.quartz.QuartzExtractor;
 import org.dbsyncer.listener.quartz.ScheduledTaskJob;
 import org.dbsyncer.listener.quartz.ScheduledTaskService;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.manager.ManagerException;
 import org.dbsyncer.manager.config.FieldPicker;
 import org.dbsyncer.manager.puller.AbstractPuller;
 import org.dbsyncer.parser.Parser;
@@ -100,7 +101,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
             Meta meta = manager.getMeta(metaId);
             Assert.notNull(meta, "Meta不能为空.");
             AbstractExtractor extractor = getExtractor(mapping, connector, list, meta);
-            Assert.notNull(extractor, "未知的监听配置.");
 
             long now = Instant.now().toEpochMilli();
             meta.setBeginTime(now);
@@ -153,19 +153,14 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         if (ListenerTypeEnum.isTiming(listenerType)) {
             QuartzExtractor extractor = listener.getExtractor(listenerType, QuartzExtractor.class);
             List<Map<String, String>> commands = list.stream().map(t -> t.getCommand()).collect(Collectors.toList());
-
-            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new QuartzListener(mapping, list));
-            extractor.setConnectorFactory(connectorFactory);
-            extractor.setScheduledTaskService(scheduledTaskService);
             extractor.setCommands(commands);
+            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new QuartzListener(mapping, list));
             return extractor;
         }
 
         // 基于日志抽取
         if (ListenerTypeEnum.isLog(listenerType)) {
-            final String connectorType = connectorConfig.getConnectorType();
-            AbstractExtractor extractor = listener.getExtractor(connectorType, AbstractExtractor.class);
-
+            AbstractExtractor extractor = listener.getExtractor(connectorConfig.getConnectorType(), AbstractExtractor.class);
             LogListener logListener = new LogListener(mapping, list, extractor);
             Set<String> filterTable = new HashSet<>();
             logListener.getTablePicker().forEach((k, fieldPickers) -> filterTable.add(k));
@@ -173,16 +168,18 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
             setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), logListener);
             return extractor;
         }
-        return null;
+
+        throw new ManagerException("未知的监听配置.");
     }
 
     private void setExtractorConfig(AbstractExtractor extractor, ConnectorConfig connector, ListenerConfig listener,
-                                    Map<String, String> map, Event event) {
+                                    Map<String, String> snapshot, Event event) {
         extractor.setTaskExecutor(taskExecutor);
+        extractor.setConnectorFactory(connectorFactory);
         extractor.setScheduledTaskService(scheduledTaskService);
         extractor.setConnectorConfig(connector);
         extractor.setListenerConfig(listener);
-        extractor.setMap(map);
+        extractor.setSnapshot(snapshot);
         extractor.addListener(event);
     }