AE86 3 yıl önce
ebeveyn
işleme
fa1acaccd9

+ 3 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Listener.java

@@ -1,7 +1,9 @@
 package org.dbsyncer.listener;
 
+import org.dbsyncer.listener.enums.ListenerTypeEnum;
+
 public interface Listener {
 
-    <T> T getExtractor(String groupType, String listenerType, Class<T> valueType) throws IllegalAccessException, InstantiationException;
+    <T> T getExtractor(ListenerTypeEnum listenerTypeEnum, String connectorType, Class<T> valueType) throws IllegalAccessException, InstantiationException;
 
 }

+ 26 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java

@@ -1,15 +1,38 @@
 package org.dbsyncer.listener;
 
-import org.dbsyncer.listener.enums.ListenerEnum;
+import org.dbsyncer.listener.enums.ListenerTypeEnum;
+import org.dbsyncer.listener.enums.LogExtractorEnum;
+import org.dbsyncer.listener.enums.TimingExtractorEnum;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 @Component
 public class ListenerFactory implements Listener {
 
+    private Map<ListenerTypeEnum, ExtractorMapper> map = new LinkedHashMap<>();
+
+    @PostConstruct
+    private void init() {
+        map.putIfAbsent(ListenerTypeEnum.LOG, (connectorType) -> LogExtractorEnum.getExtractor(connectorType));
+        map.putIfAbsent(ListenerTypeEnum.TIMING, (connectorType) -> TimingExtractorEnum.getExtractor(connectorType));
+    }
+
     @Override
-    public <T> T getExtractor(String groupType, String listenerType, Class<T> valueType) throws IllegalAccessException, InstantiationException {
-        Class<T> clazz = (Class<T>) ListenerEnum.getExtractor(groupType + listenerType);
+    public <T> T getExtractor(ListenerTypeEnum listenerTypeEnum, String connectorType, Class<T> valueType) throws IllegalAccessException, InstantiationException {
+        ExtractorMapper mapper = map.get(listenerTypeEnum);
+        if (null == mapper) {
+            throw new ListenerException(String.format("Unsupported type \"%s\" for extractor \"%s\".", listenerTypeEnum, connectorType));
+        }
+
+        Class<T> clazz = (Class<T>) mapper.getExtractor(connectorType);
         return clazz.newInstance();
     }
 
+    interface ExtractorMapper {
+        Class getExtractor(String connectorType);
+    }
+
 }

+ 0 - 100
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java

@@ -1,100 +0,0 @@
-package org.dbsyncer.listener.enums;
-
-import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.enums.ConnectorEnum;
-import org.dbsyncer.listener.ListenerException;
-import org.dbsyncer.listener.file.FileExtractor;
-import org.dbsyncer.listener.kafka.KafkaExtractor;
-import org.dbsyncer.listener.mysql.MysqlExtractor;
-import org.dbsyncer.listener.oracle.OracleExtractor;
-import org.dbsyncer.listener.postgresql.PostgreSQLExtractor;
-import org.dbsyncer.listener.quartz.DatabaseQuartzExtractor;
-import org.dbsyncer.listener.quartz.ESQuartzExtractor;
-import org.dbsyncer.listener.sqlserver.SqlServerExtractor;
-
-/**
- * 监听器Extractor支持日志和定时模式
- *
- * @author AE86
- * @version 1.0.0
- * @date 2020/04/24 14:19
- */
-public enum ListenerEnum {
-
-    /**
-     * log_Mysql
-     */
-    LOG_MYSQL(ListenerTypeEnum.LOG.getType() + ConnectorEnum.MYSQL.getType(), MysqlExtractor.class),
-    /**
-     * log_Oracle
-     */
-    LOG_ORACLE(ListenerTypeEnum.LOG.getType() + ConnectorEnum.ORACLE.getType(), OracleExtractor.class),
-    /**
-     * log_SqlServer
-     */
-    LOG_SQL_SERVER(ListenerTypeEnum.LOG.getType() + ConnectorEnum.SQL_SERVER.getType(), SqlServerExtractor.class),
-    /**
-     * log_PostgreSQL
-     */
-    LOG_POSTGRE_SQL(ListenerTypeEnum.LOG.getType() + ConnectorEnum.POSTGRE_SQL.getType(), PostgreSQLExtractor.class),
-    /**
-     * log_Kafka
-     */
-    LOG_KAFKA(ListenerTypeEnum.LOG.getType() + ConnectorEnum.KAFKA.getType(), KafkaExtractor.class),
-    /**
-     * log_File
-     */
-    LOG_FILE(ListenerTypeEnum.LOG.getType() + ConnectorEnum.FILE.getType(), FileExtractor.class),
-    /**
-     * timing_Mysql
-     */
-    TIMING_MYSQL(ListenerTypeEnum.TIMING.getType() + ConnectorEnum.MYSQL.getType(), DatabaseQuartzExtractor.class),
-    /**
-     * timing_Mysql
-     */
-    TIMING_ORACLE(ListenerTypeEnum.TIMING.getType() + ConnectorEnum.ORACLE.getType(), DatabaseQuartzExtractor.class),
-    /**
-     * timing_SqlServer
-     */
-    TIMING_SQL_SERVER(ListenerTypeEnum.TIMING.getType() + ConnectorEnum.SQL_SERVER.getType(), DatabaseQuartzExtractor.class),
-    /**
-     * timing_PostgreSQL
-     */
-    TIMING_POSTGRE_SQL(ListenerTypeEnum.TIMING.getType() + ConnectorEnum.POSTGRE_SQL.getType(), DatabaseQuartzExtractor.class),
-    /**
-     * timing_Elasticsearch
-     */
-    TIMING_ELASTIC_SEARCH(ListenerTypeEnum.TIMING.getType() + ConnectorEnum.ELASTIC_SEARCH.getType(), ESQuartzExtractor.class);
-
-    private String type;
-    private Class<?> clazz;
-
-    ListenerEnum(String type, Class<?> clazz) {
-        this.type = type;
-        this.clazz = clazz;
-    }
-
-    /**
-     * 获取抽取器
-     *
-     * @param type
-     * @return
-     * @throws ListenerException
-     */
-    public static Class<?> getExtractor(String type) throws ListenerException {
-        for (ListenerEnum e : ListenerEnum.values()) {
-            if (StringUtil.equals(type, e.getType())) {
-                return e.getClazz();
-            }
-        }
-        throw new ListenerException(String.format("Extractor type \"%s\" does not exist.", type));
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public Class<?> getClazz() {
-        return clazz;
-    }
-}

+ 78 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/LogExtractorEnum.java

@@ -0,0 +1,78 @@
+package org.dbsyncer.listener.enums;
+
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.enums.ConnectorEnum;
+import org.dbsyncer.listener.ListenerException;
+import org.dbsyncer.listener.file.FileExtractor;
+import org.dbsyncer.listener.kafka.KafkaExtractor;
+import org.dbsyncer.listener.mysql.MysqlExtractor;
+import org.dbsyncer.listener.oracle.OracleExtractor;
+import org.dbsyncer.listener.postgresql.PostgreSQLExtractor;
+import org.dbsyncer.listener.sqlserver.SqlServerExtractor;
+
+/**
+ * 日志模式支持类型
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/24 14:19
+ */
+public enum LogExtractorEnum {
+
+    /**
+     * Mysql
+     */
+    MYSQL(ConnectorEnum.MYSQL.getType(), MysqlExtractor.class),
+    /**
+     * Oracle
+     */
+    ORACLE(ConnectorEnum.ORACLE.getType(), OracleExtractor.class),
+    /**
+     * SqlServer
+     */
+    SQL_SERVER(ConnectorEnum.SQL_SERVER.getType(), SqlServerExtractor.class),
+    /**
+     * PostgreSQL
+     */
+    POSTGRE_SQL(ConnectorEnum.POSTGRE_SQL.getType(), PostgreSQLExtractor.class),
+    /**
+     * Kafka
+     */
+    KAFKA(ConnectorEnum.KAFKA.getType(), KafkaExtractor.class),
+    /**
+     * File
+     */
+    FILE(ConnectorEnum.FILE.getType(), FileExtractor.class);
+
+    private String type;
+    private Class clazz;
+
+    LogExtractorEnum(String type, Class clazz) {
+        this.type = type;
+        this.clazz = clazz;
+    }
+
+    /**
+     * 获取抽取器
+     *
+     * @param type
+     * @return
+     * @throws ListenerException
+     */
+    public static Class getExtractor(String type) throws ListenerException {
+        for (LogExtractorEnum e : LogExtractorEnum.values()) {
+            if (StringUtil.equals(type, e.getType())) {
+                return e.getClazz();
+            }
+        }
+        throw new ListenerException(String.format("LogExtractorEnum type \"%s\" does not exist.", type));
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public Class getClazz() {
+        return clazz;
+    }
+}

+ 70 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/TimingExtractorEnum.java

@@ -0,0 +1,70 @@
+package org.dbsyncer.listener.enums;
+
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.enums.ConnectorEnum;
+import org.dbsyncer.listener.ListenerException;
+import org.dbsyncer.listener.quartz.DatabaseQuartzExtractor;
+import org.dbsyncer.listener.quartz.ESQuartzExtractor;
+
+/**
+ * 定时模式支持类型
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/24 14:19
+ */
+public enum TimingExtractorEnum {
+
+    /**
+     * Mysql
+     */
+    MYSQL(ConnectorEnum.MYSQL.getType(), DatabaseQuartzExtractor.class),
+    /**
+     * Mysql
+     */
+    ORACLE(ConnectorEnum.ORACLE.getType(), DatabaseQuartzExtractor.class),
+    /**
+     * SqlServer
+     */
+    SQL_SERVER(ConnectorEnum.SQL_SERVER.getType(), DatabaseQuartzExtractor.class),
+    /**
+     * PostgreSQL
+     */
+    POSTGRE_SQL(ConnectorEnum.POSTGRE_SQL.getType(), DatabaseQuartzExtractor.class),
+    /**
+     * Elasticsearch
+     */
+    ELASTIC_SEARCH(ConnectorEnum.ELASTIC_SEARCH.getType(), ESQuartzExtractor.class);
+
+    private String type;
+    private Class clazz;
+
+    TimingExtractorEnum(String type, Class clazz) {
+        this.type = type;
+        this.clazz = clazz;
+    }
+
+    /**
+     * 获取抽取器
+     *
+     * @param type
+     * @return
+     * @throws ListenerException
+     */
+    public static Class getExtractor(String type) throws ListenerException {
+        for (TimingExtractorEnum e : TimingExtractorEnum.values()) {
+            if (StringUtil.equals(type, e.getType())) {
+                return e.getClazz();
+            }
+        }
+        throw new ListenerException(String.format("TimingListenerEnum type \"%s\" does not exist.", type));
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public Class getClazz() {
+        return clazz;
+    }
+}

+ 3 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -142,7 +142,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
 
         // 默认定时抽取
         if (ListenerTypeEnum.isTiming(listenerType)) {
-            AbstractQuartzExtractor extractor = listener.getExtractor(ListenerTypeEnum.TIMING.getType(), connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
+            AbstractQuartzExtractor extractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
             List<Map<String, String>> commands = list.stream().map(t -> t.getCommand()).collect(Collectors.toList());
             extractor.setCommands(commands);
             setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new QuartzListener(mapping, list));
@@ -151,9 +151,9 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
 
         // 基于日志抽取
         if (ListenerTypeEnum.isLog(listenerType)) {
-            AbstractExtractor extractor = listener.getExtractor(ListenerTypeEnum.LOG.getType(), connectorConfig.getConnectorType(), AbstractExtractor.class);
-            LogListener logListener = new LogListener(mapping, list, extractor);
+            AbstractExtractor extractor = listener.getExtractor(ListenerTypeEnum.LOG, connectorConfig.getConnectorType(), AbstractExtractor.class);
             Set<String> filterTable = new HashSet<>();
+            LogListener logListener = new LogListener(mapping, list, extractor);
             logListener.getTablePicker().forEach((k, fieldPickers) -> filterTable.add(k));
             extractor.setFilterTable(filterTable);
             setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), logListener);