1
0
AE86 5 жил өмнө
parent
commit
6dfcfdde33

+ 64 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -0,0 +1,64 @@
+package org.dbsyncer.listener;
+
+import org.dbsyncer.common.event.Event;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.config.ListenerConfig;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-25 22:35
+ */
+public abstract class AbstractExtractor implements Extractor {
+
+    protected ConnectorConfig connectorConfig;
+    protected ListenerConfig listenerConfig;
+    protected Map<String, String> map;
+    private List<Event> watcher;
+
+    public void addListener(Event event) {
+        if (null != event) {
+            if (null == watcher) {
+                watcher = new CopyOnWriteArrayList<>();
+            }
+            watcher.add(event);
+        }
+    }
+
+    public void clearAllListener() {
+        if (null != watcher) {
+            watcher.clear();
+            watcher = null;
+        }
+    }
+
+    public void changedEvent(String tableName, String event, List<Object> before, List<Object> after) {
+        if (!CollectionUtils.isEmpty(watcher)) {
+            watcher.forEach(w -> w.changedEvent(tableName, event, before, after));
+        }
+    }
+
+    public void flushEvent() {
+        if (!CollectionUtils.isEmpty(watcher)) {
+            watcher.forEach(w -> w.flushEvent(map));
+        }
+    }
+
+    public void setConnectorConfig(ConnectorConfig connectorConfig) {
+        this.connectorConfig = connectorConfig;
+    }
+
+    public void setListenerConfig(ListenerConfig listenerConfig) {
+        this.listenerConfig = listenerConfig;
+    }
+
+    public void setMap(Map<String, String> map) {
+        this.map = map;
+    }
+
+}

+ 0 - 111
dbsyncer-listener/src/main/java/org/dbsyncer/listener/DefaultExtractor.java

@@ -1,111 +0,0 @@
-package org.dbsyncer.listener;
-
-import org.dbsyncer.common.event.Event;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.UUIDUtil;
-import org.dbsyncer.connector.ConnectorFactory;
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.listener.config.ListenerConfig;
-import org.dbsyncer.listener.quartz.ScheduledTaskJob;
-import org.dbsyncer.listener.quartz.ScheduledTaskService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * 默认定时抽取
- *
- * @version 1.0.0
- * @Author AE86
- * @Date 2020-05-12 20:35
- */
-public class DefaultExtractor implements Extractor, ScheduledTaskJob {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    protected ConnectorConfig connectorConfig;
-    protected ListenerConfig listenerConfig;
-    protected ConnectorFactory connectorFactory;
-    protected ScheduledTaskService scheduledTaskService;
-    protected Map<String, String> map;
-    private List<Event> watcher;
-    private String key;
-    private AtomicBoolean running = new AtomicBoolean();
-
-    @Override
-    public void start() {
-        key = UUIDUtil.getUUID();
-        String cron = listenerConfig.getCronExpression();
-        logger.info("启动定时任务:{} >> {}", key, cron);
-        scheduledTaskService.start(key, cron, this);
-    }
-
-    @Override
-    public void run() {
-        if(running.compareAndSet(false, true)){
-            // TODO 获取tableGroup
-            Map<String, String> command = null;
-            int pageIndex = 1;
-            int pageSize = 20;
-            connectorFactory.reader(connectorConfig, command, pageIndex, pageSize);
-        }
-    }
-
-    @Override
-    public void close() {
-        scheduledTaskService.stop(key);
-    }
-
-    public void addListener(Event event) {
-        if (null != event) {
-            if (null == watcher) {
-                watcher = new CopyOnWriteArrayList<>();
-            }
-            watcher.add(event);
-        }
-    }
-
-    public void clearAllListener() {
-        if (null != watcher) {
-            watcher.clear();
-            watcher = null;
-        }
-    }
-
-    public void changedEvent(String tableName, String event, List<Object> before, List<Object> after) {
-        if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.changedEvent(tableName, event, before, after));
-        }
-    }
-
-    public void flushEvent() {
-        if (!CollectionUtils.isEmpty(watcher)) {
-            watcher.forEach(w -> w.flushEvent(map));
-        }
-    }
-
-    public void setConnectorConfig(ConnectorConfig connectorConfig) {
-        this.connectorConfig = connectorConfig;
-    }
-
-    public void setListenerConfig(ListenerConfig listenerConfig) {
-        this.listenerConfig = listenerConfig;
-    }
-
-    public void setConnectorFactory(ConnectorFactory connectorFactory) {
-        this.connectorFactory = connectorFactory;
-    }
-
-    public void setScheduledTaskService(ScheduledTaskService scheduledTaskService) {
-        this.scheduledTaskService = scheduledTaskService;
-    }
-
-    public void setMap(Map<String, String> map) {
-        this.map = map;
-    }
-
-}

+ 3 - 9
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Listener.java

@@ -1,22 +1,16 @@
 package org.dbsyncer.listener;
 
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.listener.config.ListenerConfig;
-
-import java.util.Map;
+import org.dbsyncer.listener.config.ExtractorConfig;
 
 public interface Listener {
 
     /**
      * 创建抽取器
      *
-     * @param connectorConfig 连接器配置
-     * @param listenerConfig  监听器配置
-     * @param map             增量参数
+     * @param config 抽取器配置
      * @return
      * @throws IllegalAccessException
      * @throws InstantiationException
      */
-    DefaultExtractor createExtractor(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map)
-            throws IllegalAccessException, InstantiationException;
+    AbstractExtractor createExtractor(ExtractorConfig config) throws IllegalAccessException, InstantiationException;
 }

+ 16 - 9
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java

@@ -2,14 +2,17 @@ package org.dbsyncer.listener;
 
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.config.ExtractorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerEnum;
 import org.dbsyncer.listener.enums.ListenerTypeEnum;
+import org.dbsyncer.listener.quartz.QuartzExtractor;
 import org.dbsyncer.listener.quartz.ScheduledTaskService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
+import java.util.List;
 import java.util.Map;
 
 @Component
@@ -22,32 +25,36 @@ public class ListenerFactory implements Listener {
     private ScheduledTaskService scheduledTaskService;
 
     @Override
-    public DefaultExtractor createExtractor(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map)
+    public AbstractExtractor createExtractor(ExtractorConfig config)
             throws IllegalAccessException, InstantiationException {
-        String listenerType = listenerConfig.getListenerType();
-        String connectorType = connectorConfig.getConnectorType();
-        DefaultExtractor extractor = getDefaultExtractor(listenerType, connectorType);
+        ConnectorConfig connectorConfig = config.getConnectorConfig();
+        ListenerConfig listenerConfig = config.getListenerConfig();
+
+        AbstractExtractor extractor = getDefaultExtractor(listenerConfig.getListenerType(), connectorConfig.getConnectorType(),
+                config.getCommands(), config.getTableNames());
 
         extractor.setConnectorConfig(connectorConfig);
         extractor.setListenerConfig(listenerConfig);
-        extractor.setMap(map);
+        extractor.setMap(config.getMap());
         return extractor;
     }
 
-    private DefaultExtractor getDefaultExtractor(String listenerType, String connectorType)
+    private AbstractExtractor getDefaultExtractor(String listenerType, String connectorType, List<Map<String, String>> commands, List<String> tableNames)
             throws IllegalAccessException, InstantiationException {
         // 默认定时抽取
         if (ListenerTypeEnum.isTiming(listenerType)) {
-            Class<DefaultExtractor> clazz = (Class<DefaultExtractor>) ListenerEnum.DEFAULT.getClazz();
-            DefaultExtractor extractor = clazz.newInstance();
+            Class<QuartzExtractor> clazz = (Class<QuartzExtractor>) ListenerEnum.DEFAULT.getClazz();
+            QuartzExtractor extractor = clazz.newInstance();
             extractor.setConnectorFactory(connectorFactory);
             extractor.setScheduledTaskService(scheduledTaskService);
+            extractor.setCommands(commands);
+            extractor.setTableNames(tableNames);
             return extractor;
         }
 
         // 基于日志抽取
         Assert.isTrue(ListenerTypeEnum.isLog(listenerType), "未知的同步方式.");
-        Class<DefaultExtractor> clazz = (Class<DefaultExtractor>) ListenerEnum.getExtractor(connectorType);
+        Class<AbstractExtractor> clazz = (Class<AbstractExtractor>) ListenerEnum.getExtractor(connectorType);
         return clazz.newInstance();
     }
 

+ 48 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/config/ExtractorConfig.java

@@ -0,0 +1,48 @@
+package org.dbsyncer.listener.config;
+
+import org.dbsyncer.connector.config.ConnectorConfig;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-25 23:52
+ */
+public class ExtractorConfig {
+
+    private ConnectorConfig connectorConfig;
+    private ListenerConfig listenerConfig;
+    private Map<String, String> map;
+    private List<Map<String, String>> commands;
+    private List<String> tableNames;
+
+    public ExtractorConfig(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map, List<Map<String, String>> commands, List<String> tableNames) {
+        this.connectorConfig = connectorConfig;
+        this.listenerConfig = listenerConfig;
+        this.map = map;
+        this.commands = commands;
+        this.tableNames = tableNames;
+    }
+
+    public ConnectorConfig getConnectorConfig() {
+        return connectorConfig;
+    }
+
+    public ListenerConfig getListenerConfig() {
+        return listenerConfig;
+    }
+
+    public Map<String, String> getMap() {
+        return map;
+    }
+
+    public List<Map<String, String>> getCommands() {
+        return commands;
+    }
+
+    public List<String> getTableNames() {
+        return tableNames;
+    }
+}

+ 2 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java

@@ -2,7 +2,7 @@ package org.dbsyncer.listener.enums;
 
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.enums.ConnectorEnum;
-import org.dbsyncer.listener.DefaultExtractor;
+import org.dbsyncer.listener.quartz.QuartzExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.mysql.MysqlExtractor;
 
@@ -16,7 +16,7 @@ public enum ListenerEnum {
     /**
      * 定时
      */
-    DEFAULT(ListenerTypeEnum.TIMING.getType(), DefaultExtractor.class),
+    DEFAULT(ListenerTypeEnum.TIMING.getType(), QuartzExtractor.class),
     /**
      * Mysql
      */

+ 2 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -4,7 +4,7 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
-import org.dbsyncer.listener.DefaultExtractor;
+import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.config.Host;
 import org.dbsyncer.listener.mysql.binlog.BinlogEventListener;
@@ -29,7 +29,7 @@ import static java.util.regex.Pattern.compile;
  * @Author AE86
  * @Date 2020-05-12 21:14
  */
-public class MysqlExtractor extends DefaultExtractor {
+public class MysqlExtractor extends AbstractExtractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 

+ 70 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java

@@ -0,0 +1,70 @@
+package org.dbsyncer.listener.quartz;
+
+import org.dbsyncer.common.util.UUIDUtil;
+import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.listener.AbstractExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * 默认定时抽取
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-05-12 20:35
+ */
+public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJob {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private ConnectorFactory connectorFactory;
+    private ScheduledTaskService scheduledTaskService;
+    private List<Map<String, String>> commands;
+    private List<String> tableNames;
+    private String key;
+    private AtomicBoolean running = new AtomicBoolean();
+
+    @Override
+    public void start() {
+        key = UUIDUtil.getUUID();
+        String cron = listenerConfig.getCronExpression();
+        logger.info("启动定时任务:{} >> {}", key, cron);
+        scheduledTaskService.start(key, cron, this);
+    }
+
+    @Override
+    public void run() {
+        if(running.compareAndSet(false, true)){
+            // TODO 获取tableGroup
+            Map<String, String> command = null;
+            int pageIndex = 1;
+            int pageSize = 20;
+            connectorFactory.reader(connectorConfig, command, pageIndex, pageSize);
+        }
+    }
+
+    @Override
+    public void close() {
+        scheduledTaskService.stop(key);
+    }
+
+    public void setConnectorFactory(ConnectorFactory connectorFactory) {
+        this.connectorFactory = connectorFactory;
+    }
+
+    public void setScheduledTaskService(ScheduledTaskService scheduledTaskService) {
+        this.scheduledTaskService = scheduledTaskService;
+    }
+
+    public void setCommands(List<Map<String, String>> commands) {
+        this.commands = commands;
+    }
+
+    public void setTableNames(List<String> tableNames) {
+        this.tableNames = tableNames;
+    }
+}

+ 13 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -3,8 +3,12 @@ package org.dbsyncer.manager.puller.impl;
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.UUIDUtil;
+import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.Table;
-import org.dbsyncer.listener.DefaultExtractor;
+import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.config.ExtractorConfig;
+import org.dbsyncer.listener.config.ListenerConfig;
+import org.dbsyncer.listener.quartz.QuartzExtractor;
 import org.dbsyncer.listener.Listener;
 import org.dbsyncer.listener.quartz.ScheduledTaskJob;
 import org.dbsyncer.listener.quartz.ScheduledTaskService;
@@ -27,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 /**
  * 增量同步
@@ -54,7 +59,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
 
     private String key;
 
-    private Map<String, DefaultExtractor> map = new ConcurrentHashMap<>();
+    private Map<String, AbstractExtractor> map = new ConcurrentHashMap<>();
 
     @Override
     public void asyncStart(Mapping mapping) {
@@ -65,9 +70,13 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
             Assert.notNull(connector, "连接器不能为空.");
             List<TableGroup> list = manager.getTableGroupAll(mappingId);
             Assert.notEmpty(list, "映射关系不能为空.");
+            List<Map<String, String>> commands = list.stream().map(t -> t.getCommand()).collect(Collectors.toList());
+            List<String> tableNames = list.stream().map(t -> t.getSourceTable().getName()).collect(Collectors.toList());
             Meta meta = manager.getMeta(metaId);
             Assert.notNull(meta, "Meta不能为空.");
-            DefaultExtractor extractor = listener.createExtractor(connector.getConfig(), mapping.getListener(), meta.getMap());
+
+            ExtractorConfig config = new ExtractorConfig(connector.getConfig(), mapping.getListener(), meta.getMap(), commands, tableNames);
+            AbstractExtractor extractor = listener.createExtractor(config);
             Assert.notNull(extractor, "未知的监听配置.");
             long now = System.currentTimeMillis();
             meta.setBeginTime(now);
@@ -89,7 +98,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
 
     @Override
     public void close(String metaId) {
-        DefaultExtractor extractor = map.get(metaId);
+        AbstractExtractor extractor = map.get(metaId);
         if (null != extractor) {
             extractor.clearAllListener();
             extractor.close();