AE86 5 years ago
parent
commit
3545dadd2d

+ 4 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Listener.java

@@ -14,6 +14,9 @@ public interface Listener {
      * @param listenerConfig  监听器配置
      * @param map             增量参数
      * @return
+     * @throws IllegalAccessException
+     * @throws InstantiationException
      */
-    DefaultExtractor createExtractor(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map);
+    DefaultExtractor createExtractor(ConnectorConfig connectorConfig, ListenerConfig listenerConfig, Map<String, String> map)
+            throws IllegalAccessException, InstantiationException;
 }

+ 4 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/ListenerFactory.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.listener;
 
-
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.listener.enums.ListenerEnum;
@@ -13,8 +12,10 @@ import java.util.Map;
 public class ListenerFactory implements Listener {
 
     @Override
-    public DefaultExtractor createExtractor(ConnectorConfig config, ListenerConfig listenerConfig, Map<String, String> map) {
-        DefaultExtractor extractor = ListenerEnum.getExtractor(config.getConnectorType());
+    public DefaultExtractor createExtractor(ConnectorConfig config, ListenerConfig listenerConfig, Map<String, String> map)
+            throws IllegalAccessException, InstantiationException {
+        Class<DefaultExtractor> clazz = (Class<DefaultExtractor>) ListenerEnum.getExtractor(config.getConnectorType());
+        DefaultExtractor extractor = clazz.newInstance();
         // log/timing
         extractor.setAction(ListenerTypeEnum.getAction(listenerConfig.getListenerType()));
         extractor.setConnectorConfig(config);

+ 9 - 10
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java

@@ -2,7 +2,6 @@ package org.dbsyncer.listener.enums;
 
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.enums.ConnectorEnum;
-import org.dbsyncer.listener.DefaultExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.extractor.MysqlExtractor;
 
@@ -16,15 +15,15 @@ public enum ListenerEnum {
     /**
      * Mysql
      */
-    MYSQL(ConnectorEnum.MYSQL.getType(), new MysqlExtractor()),
+    MYSQL(ConnectorEnum.MYSQL.getType(), MysqlExtractor.class),
     ;
 
     private String type;
-    private DefaultExtractor extractor;
+    private Class<?> clazz;
 
-    ListenerEnum(String type, DefaultExtractor extractor) {
+    ListenerEnum(String type, Class<?> clazz) {
         this.type = type;
-        this.extractor = extractor;
+        this.clazz = clazz;
     }
 
     /**
@@ -34,10 +33,10 @@ public enum ListenerEnum {
      * @return
      * @throws ListenerException
      */
-    public static DefaultExtractor getExtractor(String type) throws ListenerException {
+    public static Class<?> getExtractor(String type) throws ListenerException {
         for (ListenerEnum e : ListenerEnum.values()) {
             if (StringUtils.equals(type, e.getType())) {
-                return e.getExtractor();
+                return e.getClazz();
             }
         }
         throw new ListenerException(String.format("Extractor type \"%s\" does not exist.", type));
@@ -47,7 +46,7 @@ public enum ListenerEnum {
         return type;
     }
 
-    public DefaultExtractor getExtractor() {
-        return extractor;
+    public Class<?> getClazz() {
+        return clazz;
     }
-}
+}

+ 6 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/extractor/MysqlExtractor.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.listener.extractor;
 
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
@@ -48,7 +49,11 @@ public class MysqlExtractor extends DefaultExtractor {
             final Host host = cluster.get(master);
             final String username = config.getUsername();
             final String password = config.getPassword();
-            final String threadSuffixName = "mysql-binlog";
+            // mysql-binlog-127.0.0.1:3306-654321
+            final String threadSuffixName = new StringBuilder("mysql-binlog-")
+                    .append(host.getIp()).append(":").append(host.getPort()).append("-")
+                    .append(RandomStringUtils.randomNumeric(6))
+                    .toString();
 
             client = new BinlogRemoteClient(host.getIp(), host.getPort(), username, password, threadSuffixName);
             client.setBinlogFileName(map.get(BINLOG_FILENAME));

+ 7 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -2,12 +2,9 @@ package org.dbsyncer.manager.puller.impl;
 
 import org.dbsyncer.common.event.Event;
 import org.dbsyncer.listener.DefaultExtractor;
-import org.dbsyncer.listener.Extractor;
 import org.dbsyncer.listener.Listener;
-import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.manager.puller.AbstractPuller;
-import org.dbsyncer.manager.puller.Increment;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
@@ -54,11 +51,15 @@ public class IncrementPuller extends AbstractPuller {
             Connector connector = manager.getConnector(mapping.getSourceConnectorId());
             Assert.notNull(connector, "连接器不能为空.");
             List<TableGroup> list = manager.getTableGroupAll(mappingId);
-            Assert.notEmpty(list, "映射关系不能为空");
+            Assert.notEmpty(list, "映射关系不能为空.");
             Meta meta = manager.getMeta(metaId);
             Assert.notNull(meta, "Meta不能为空.");
             DefaultExtractor extractor = listener.createExtractor(connector.getConfig(), mapping.getListener(), meta.getMap());
             Assert.notNull(extractor, "未知的监听配置.");
+            long now = System.currentTimeMillis();
+            meta.setBeginTime(now);
+            meta.setEndTime(now);
+            manager.editMeta(meta);
 
             // 监听数据变更事件
             extractor.addListener(new DefaultListener(mapping, list));
@@ -80,6 +81,7 @@ public class IncrementPuller extends AbstractPuller {
             extractor.clearAllListener();
             extractor.close();
             finished(metaId);
+            logger.info("关闭成功:{}", metaId);
         }
     }
 
@@ -110,6 +112,7 @@ public class IncrementPuller extends AbstractPuller {
         @Override
         public void flushEvent() {
             // TODO 更新待优化,存在性能问题
+            logger.info("flushEvent");
             DefaultExtractor extractor = map.get(metaId);
             if (null != extractor) {
                 Meta meta = manager.getMeta(metaId);