AE86 5 anni fa
parent
commit
9180d2f474

+ 21 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/DefaultExtractor.java

@@ -2,6 +2,8 @@ package org.dbsyncer.listener;
 
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.config.ListenerConfig;
 
 import java.util.List;
 import java.util.Map;
@@ -14,8 +16,10 @@ import java.util.concurrent.CopyOnWriteArrayList;
  */
 public abstract class DefaultExtractor implements Extractor {
 
+    private ConnectorConfig connectorConfig;
+    private ListenerConfig listenerConfig;
     private Map<String, String> map;
-    private List<Event>         watcher;
+    private List<Event> watcher;
 
     public void addListener(Event event) {
         if (null != event) {
@@ -32,6 +36,22 @@ public abstract class DefaultExtractor implements Extractor {
         }
     }
 
+    public ConnectorConfig getConnectorConfig() {
+        return connectorConfig;
+    }
+
+    public void setConnectorConfig(ConnectorConfig connectorConfig) {
+        this.connectorConfig = connectorConfig;
+    }
+
+    public ListenerConfig getListenerConfig() {
+        return listenerConfig;
+    }
+
+    public void setListenerConfig(ListenerConfig listenerConfig) {
+        this.listenerConfig = listenerConfig;
+    }
+
     public Map<String, String> getMap() {
         return map;
     }

+ 5 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Listener.java

@@ -1,8 +1,11 @@
 package org.dbsyncer.listener;
 
 import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.config.ListenerConfig;
+
+import java.util.Map;
 
 public interface Listener {
 
-    Extractor createExtractor(ConnectorConfig connectorConfig);
-}
+    DefaultExtractor createExtractor(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map);
+}

+ 7 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java

@@ -2,17 +2,20 @@ package org.dbsyncer.listener;
 
 
 import org.dbsyncer.connector.config.ConnectorConfig;
+import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerEnum;
 import org.springframework.stereotype.Component;
 
+import java.util.Map;
+
 @Component
 public class ListenerFactory implements Listener {
 
     @Override
-    public Extractor createExtractor(ConnectorConfig connectorConfig) {
-        String type = connectorConfig.getConnectorType();
-        Extractor extractor = ListenerEnum.getExtractor(type);
-
+    public DefaultExtractor createExtractor(ConnectorConfig config, ListenerConfig listenerConfig, Map<String, String> map) {
+        DefaultExtractor extractor = ListenerEnum.getExtractor(config.getConnectorType());
+        extractor.setConnectorConfig(config);
+        extractor.setListenerConfig(listenerConfig);
         return extractor;
     }
 }

+ 5 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java

@@ -2,7 +2,7 @@ package org.dbsyncer.listener.enums;
 
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.enums.ConnectorEnum;
-import org.dbsyncer.listener.Extractor;
+import org.dbsyncer.listener.DefaultExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.extractor.MysqlExtractor;
 
@@ -20,9 +20,9 @@ public enum ListenerEnum {
     ;
 
     private String type;
-    private Extractor extractor;
+    private DefaultExtractor extractor;
 
-    ListenerEnum(String type, Extractor extractor) {
+    ListenerEnum(String type, DefaultExtractor extractor) {
         this.type = type;
         this.extractor = extractor;
     }
@@ -34,7 +34,7 @@ public enum ListenerEnum {
      * @return
      * @throws ListenerException
      */
-    public static Extractor getExtractor(String type) throws ListenerException {
+    public static DefaultExtractor getExtractor(String type) throws ListenerException {
         for (ListenerEnum e : ListenerEnum.values()) {
             if (StringUtils.equals(type, e.getType())) {
                 return e.getExtractor();
@@ -47,7 +47,7 @@ public enum ListenerEnum {
         return type;
     }
 
-    public Extractor getExtractor() {
+    public DefaultExtractor getExtractor() {
         return extractor;
     }
 }

+ 7 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.manager.puller.impl;
 
 import org.dbsyncer.common.event.Event;
+import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.listener.DefaultExtractor;
 import org.dbsyncer.listener.Extractor;
 import org.dbsyncer.listener.Listener;
@@ -10,7 +11,10 @@ import org.dbsyncer.manager.enums.IncrementEnum;
 import org.dbsyncer.manager.puller.AbstractPuller;
 import org.dbsyncer.manager.puller.Increment;
 import org.dbsyncer.parser.Parser;
-import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.parser.model.TableGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -59,12 +63,11 @@ public class IncrementPuller extends AbstractPuller {
             Assert.notEmpty(list, "映射关系不能为空");
             Meta meta = manager.getMeta(metaId);
             Assert.notNull(meta, "Meta不能为空.");
-            DefaultExtractor extractor = (DefaultExtractor) listener.createExtractor(connector.getConfig());
+            DefaultExtractor extractor = listener.createExtractor(connector.getConfig(), listenerConfig, meta.getMap());
             Assert.notNull(extractor, "未知的监听配置.");
 
             // 监听数据变更事件
             extractor.addListener(new DefaultListener(mapping, list));
-            extractor.setMap(meta.getMap());
             map.putIfAbsent(metaId, extractor);
 
             // 执行任务
@@ -106,7 +109,7 @@ public class IncrementPuller extends AbstractPuller {
 
     final class DefaultListener implements Event {
 
-        private Mapping          mapping;
+        private Mapping mapping;
         private List<TableGroup> list;
 
         public DefaultListener(Mapping mapping, List<TableGroup> list) {