Преглед изворни кода

修复mysql增量同步运行长时间后断连,增加心跳检测https://gitee.com/ghi/dbsyncer/issues/I69US8

Signed-off-by: AE86 <836391306@qq.com>
AE86 пре 1 година
родитељ
комит
1243d9f1fe

+ 59 - 8
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/BinaryLogRemoteClient.java

@@ -7,6 +7,7 @@ import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
 import com.github.shyiko.mysql.binlog.network.*;
 import com.github.shyiko.mysql.binlog.network.protocol.*;
 import com.github.shyiko.mysql.binlog.network.protocol.command.*;
+import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.mysql.deserializer.DeleteDeserializer;
 import org.dbsyncer.listener.mysql.deserializer.UpdateDeserializer;
 import org.dbsyncer.listener.mysql.deserializer.WriteDeserializer;
@@ -26,6 +27,7 @@ import java.security.GeneralSecurityException;
 import java.security.cert.X509Certificate;
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -78,6 +80,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
     private volatile PacketChannel channel;
     private volatile boolean connected;
     private Thread worker;
+    private Thread keepAlive;
     private String workerThreadName;
 
     private final Lock connectLock = new ReentrantLock();
@@ -127,16 +130,16 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
             }
             setConfig();
             openChannel();
+            connected = true;
+            // new keepalive thread
+            spawnKeepAliveThread();
+
             // dump binary log
             requestBinaryLogStream(channel);
             ensureEventDeserializerHasRequiredEDDs();
 
-            // new Thread
-            this.worker = new Thread(() -> listenForEventPackets(channel));
-            this.worker.setDaemon(false);
-            this.workerThreadName = new StringBuilder("binlog-parser-").append(hostname).append(":").append(port).append("_").append(connectionId).toString();
-            this.worker.setName(workerThreadName);
-            this.worker.start();
+            // new listen thread
+            spawnWorkerThread();
             lifecycleListeners.forEach(listener -> listener.onConnect(this));
         } finally {
             connectLock.unlock();
@@ -153,6 +156,11 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
                     this.worker.interrupt();
                     this.worker = null;
                 }
+                if (null != this.keepAlive && !keepAlive.isInterrupted()) {
+                    this.keepAlive.interrupt();
+                    this.keepAlive = null;
+                }
+                connected = false;
                 lifecycleListeners.forEach(listener -> listener.onDisconnect(this));
             } finally {
                 connectLock.unlock();
@@ -175,6 +183,10 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         lifecycleListeners.add(lifecycleListener);
     }
 
+    private String createClientId() {
+        return new StringBuilder(hostname).append(":").append(port).append("_").append(connectionId).toString();
+    }
+
     private void openChannel() throws IOException {
         try {
             Socket socket = new Socket();
@@ -212,7 +224,6 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
             String position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;
             logger.info("Connected to {}:{} at {} (sid:{}, cid:{})", hostname, port, position, serverId, connectionId);
         }
-        connected = true;
     }
 
     private void listenForEventPackets(final PacketChannel channel) {
@@ -525,7 +536,6 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
     }
 
     private void closeChannel(final PacketChannel channel) throws IOException {
-        connected = false;
         if (channel != null && channel.isOpen()) {
             channel.close();
         }
@@ -594,6 +604,47 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         return serverId;
     }
 
+    private void spawnWorkerThread() {
+        this.worker = new Thread(() -> listenForEventPackets(channel));
+        this.worker.setDaemon(false);
+        this.workerThreadName = new StringBuilder("binlog-parser-").append(createClientId()).toString();
+        this.worker.setName(workerThreadName);
+        this.worker.start();
+    }
+
+    private void spawnKeepAliveThread() {
+        long keepAliveInterval = TimeUnit.MINUTES.toMillis(1);
+        String clientId = createClientId();
+        this.keepAlive = new Thread(() -> {
+            while (connected) {
+                try {
+                    Thread.sleep(keepAliveInterval);
+                } catch (InterruptedException e) {
+                    // expected in case of disconnect
+                }
+                boolean connectionLost = false;
+                try {
+                    channel.write(new PingCommand());
+                } catch (IOException e) {
+                    connectionLost = true;
+                }
+                if (connectionLost) {
+                    String error = String.format("keepalive: Trying to restore lost connection to %s", clientId);
+                    logger.info(error);
+                    try {
+                        lifecycleListeners.forEach(listener -> listener.onCommunicationFailure(this, new ListenerException(error)));
+                    } catch (Exception e) {
+                        logger.warn("keepalive error", e);
+                    }
+                    break;
+                }
+            }
+        });
+        this.keepAlive.setDaemon(false);
+        this.keepAlive.setName(new StringBuilder("binlog-keepalive-").append(clientId).toString());
+        this.keepAlive.start();
+    }
+
     @Override
     public String getBinlogFilename() {
         return binlogFilename;

+ 12 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -43,6 +43,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
     private String database;
     private final Lock connectLock = new ReentrantLock();
     private volatile boolean connected;
+    private volatile boolean recovery;
 
     @Override
     public void start() {
@@ -121,6 +122,16 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
     }
 
     private void reStart() {
+        try {
+            connectLock.tryLock();
+            if (recovery) {
+                return;
+            }
+            recovery = true;
+        } finally {
+            connectLock.unlock();
+        }
+
         for (int i = 1; i <= RETRY_TIMES; i++) {
             try {
                 if (null != client) {
@@ -130,6 +141,7 @@ public class MysqlExtractor extends AbstractDatabaseExtractor {
 
                 errorEvent(new ListenerException(String.format("重启成功, %s", client.getWorkerThreadName())));
                 logger.error("第{}次重启成功, ThreadName:{} ", i, client.getWorkerThreadName());
+                recovery = false;
                 break;
             } catch (Exception e) {
                 logger.error("第{}次重启异常, ThreadName:{}, {}", i, client.getWorkerThreadName(), e.getMessage());