Browse Source

fix mysql extractor retry

AE86 4 years ago
parent
commit
a80d37a233

+ 7 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/event/Event.java

@@ -44,4 +44,11 @@ public interface Event {
      */
     void errorEvent(Exception e);
 
+    /**
+     * 中断异常
+     *
+     * @param e
+     */
+    void interruptException(Exception e);
+
 }

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -213,7 +213,7 @@ public abstract class AbstractDatabaseConnector implements Database {
             // 记录错误数据
             result.getFailData().addAll(data);
             result.getFail().set(size);
-            result.getError().append(e.getMessage()).append("\r\n");
+            result.getError().append(e.getMessage()).append(System.lineSeparator());
             logger.error(e.getMessage());
         } finally {
             // 释放连接
@@ -268,7 +268,7 @@ public abstract class AbstractDatabaseConnector implements Database {
             // 记录错误数据
             result.getFailData().add(data);
             result.getFail().set(1);
-            result.getError().append(e.getMessage()).append("\r\n");
+            result.getError().append(e.getMessage()).append(System.lineSeparator());
             logger.error(e.getMessage());
         } finally {
             // 释放连接

+ 7 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -67,6 +67,13 @@ public abstract class AbstractExtractor implements Extractor {
         }
     }
 
+    @Override
+    public void interruptException(Exception e) {
+        if (!CollectionUtils.isEmpty(watcher)) {
+            watcher.forEach(w -> w.interruptException(e));
+        }
+    }
+
     public void setConnectorConfig(ConnectorConfig connectorConfig) {
         this.connectorConfig = connectorConfig;
     }

+ 7 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -61,4 +61,11 @@ public interface Extractor {
      */
     void errorEvent(Exception e);
 
+    /**
+     * 中断异常
+     *
+     * @param e
+     */
+    void interruptException(Exception e);
+
 }

+ 61 - 23
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -7,9 +7,7 @@ import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.config.Host;
-import org.dbsyncer.listener.mysql.binlog.BinlogEventListener;
-import org.dbsyncer.listener.mysql.binlog.BinlogEventV4;
-import org.dbsyncer.listener.mysql.binlog.BinlogRemoteClient;
+import org.dbsyncer.listener.mysql.binlog.*;
 import org.dbsyncer.listener.mysql.binlog.impl.event.*;
 import org.dbsyncer.listener.mysql.common.glossary.Column;
 import org.dbsyncer.listener.mysql.common.glossary.Pair;
@@ -20,6 +18,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 
 import static java.util.regex.Pattern.compile;
@@ -35,6 +34,7 @@ public class MysqlExtractor extends AbstractExtractor {
 
     private static final String BINLOG_FILENAME = "fileName";
     private static final String BINLOG_POSITION = "position";
+    private static final int RETRY_TIMES = 3;
     private BinlogRemoteClient client;
     private List<Host> cluster;
     private int master = 0;
@@ -42,25 +42,7 @@ public class MysqlExtractor extends AbstractExtractor {
     @Override
     public void start() {
         try {
-            final DatabaseConfig config = (DatabaseConfig) connectorConfig;
-            cluster = readNodes(config.getUrl());
-            Assert.notEmpty(cluster, "Mysql连接地址有误.");
-
-            final Host host = cluster.get(master);
-            final String username = config.getUsername();
-            final String password = config.getPassword();
-            // mysql-binlog-127.0.0.1:3306-654321
-            final String threadSuffixName = new StringBuilder("mysql-binlog-")
-                    .append(host.getIp()).append(":").append(host.getPort()).append("-")
-                    .append(RandomStringUtils.randomNumeric(6))
-                    .toString();
-
-            client = new BinlogRemoteClient(host.getIp(), host.getPort(), username, password, threadSuffixName);
-            client.setBinlogFileName(map.get(BINLOG_FILENAME));
-            String pos = map.get(BINLOG_POSITION);
-            client.setBinlogPosition(StringUtils.isBlank(pos) ? 0 : Long.parseLong(pos));
-            client.setBinlogEventListener(new MysqlEventListener());
-            client.start();
+            run();
         } catch (Exception e) {
             logger.error("启动失败:{}", e.getMessage());
             throw new ListenerException(e);
@@ -70,7 +52,7 @@ public class MysqlExtractor extends AbstractExtractor {
     @Override
     public void close() {
         try {
-            if(null != client){
+            if (null != client && client.isRunning()) {
                 client.stopQuietly();
             }
         } catch (Exception e) {
@@ -78,6 +60,62 @@ public class MysqlExtractor extends AbstractExtractor {
         }
     }
 
+    private void run() throws Exception {
+        final DatabaseConfig config = (DatabaseConfig) connectorConfig;
+        cluster = readNodes(config.getUrl());
+        Assert.notEmpty(cluster, "Mysql连接地址有误.");
+
+        final Host host = cluster.get(master);
+        final String username = config.getUsername();
+        final String password = config.getPassword();
+        // mysql-binlog-127.0.0.1:3306-654321
+        final String threadSuffixName = new StringBuilder("mysql-binlog-")
+                .append(host.getIp()).append(":").append(host.getPort()).append("-")
+                .append(RandomStringUtils.randomNumeric(6))
+                .toString();
+
+        client = new BinlogRemoteClient(host.getIp(), host.getPort(), username, password, threadSuffixName);
+        client.setBinlogFileName(map.get(BINLOG_FILENAME));
+        String pos = map.get(BINLOG_POSITION);
+        client.setBinlogPosition(StringUtils.isBlank(pos) ? 0 : Long.parseLong(pos));
+        client.setBinlogEventListener(new MysqlEventListener());
+        client.setBinlogParserListener(new BinlogParserListener.Adapter() {
+            @Override
+            public void onException(BinlogParser parser, Exception exception) {
+                logger.error("threadSuffixName:{}, error:{}", threadSuffixName, exception.getMessage());
+                // 异常中断
+                reStart(threadSuffixName);
+            }
+        });
+        client.start();
+    }
+
+    private void reStart(String threadSuffixName) {
+        for (int i = 1; i <= RETRY_TIMES; i++) {
+            try {
+                if(null != client){
+                    client.stopQuietly();
+                }
+                run();
+
+                errorEvent(new ListenerException(String.format("重启成功, %s", threadSuffixName)));
+                logger.error("第{}次重启成功, ThreadName:{} ", i, threadSuffixName);
+                break;
+            } catch (Exception e) {
+                logger.error("第{}次重启异常, ThreadName:{}, {}", i, threadSuffixName, e.getMessage());
+                // 无法连接,关闭任务
+                if(i == RETRY_TIMES){
+                    interruptException(new ListenerException(String.format("重启异常, %s, %s", threadSuffixName, e.getMessage())));
+                }
+            }
+            try {
+                TimeUnit.SECONDS.sleep(i * 2);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage());
+            }
+        }
+    }
+
     private List<Host> readNodes(String url) {
         if (StringUtils.isBlank(url)) {
             return Collections.EMPTY_LIST;

+ 3 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/binlog/BinlogRemoteClient.java

@@ -136,7 +136,9 @@ public class BinlogRemoteClient {
         // exception thrown elsewhere in the code.
         this.transport.disconnect();
 
-        this.binlogParser.stop(timeout, unit);
+        if(null != this.binlogParser){
+            this.binlogParser.stop(timeout, unit);
+        }
     }
 
     public void stopQuietly() throws Exception {

+ 5 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/impl/IncrementPuller.java

@@ -202,6 +202,11 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
             logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
         }
 
+        @Override
+        public void interruptException(Exception e) {
+            errorEvent(e);
+            close(metaId);
+        }
     }
 
     /**

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -373,7 +373,7 @@ public class ParserFactory implements Parser {
                         result.getFail().getAndAdd(w.getFail().get());
                         result.getError().append(w.getError());
                     } catch (Exception e) {
-                        result.getError().append(e.getMessage()).append("\r\n");
+                        result.getError().append(e.getMessage()).append(System.lineSeparator());
                     } finally {
                         latch.countDown();
                     }