|
@@ -152,6 +152,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
|
|
|
try {
|
|
|
connectLock.lock();
|
|
|
closeChannel(channel);
|
|
|
+ connected = false;
|
|
|
if (null != this.worker && !worker.isInterrupted()) {
|
|
|
this.worker.interrupt();
|
|
|
this.worker = null;
|
|
@@ -160,7 +161,6 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
|
|
|
this.keepAlive.interrupt();
|
|
|
this.keepAlive = null;
|
|
|
}
|
|
|
- connected = false;
|
|
|
lifecycleListeners.forEach(listener -> listener.onDisconnect(this));
|
|
|
} finally {
|
|
|
connectLock.unlock();
|
|
@@ -613,7 +613,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
|
|
|
}
|
|
|
|
|
|
private void spawnKeepAliveThread() {
|
|
|
- long keepAliveInterval = TimeUnit.MINUTES.toMillis(1);
|
|
|
+ long keepAliveInterval = TimeUnit.SECONDS.toMillis(30);
|
|
|
String clientId = createClientId();
|
|
|
this.keepAlive = new Thread(() -> {
|
|
|
while (connected) {
|
|
@@ -629,12 +629,14 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
|
|
|
connectionLost = true;
|
|
|
}
|
|
|
if (connectionLost) {
|
|
|
- String error = String.format("keepalive: Trying to restore lost connection to %s", clientId);
|
|
|
- logger.info(error);
|
|
|
- try {
|
|
|
- lifecycleListeners.forEach(listener -> listener.onCommunicationFailure(this, new ListenerException(error)));
|
|
|
- } catch (Exception e) {
|
|
|
- logger.warn("keepalive error", e);
|
|
|
+ if (connected) {
|
|
|
+ String error = String.format("keepalive: Trying to restore lost connection to %s", clientId);
|
|
|
+ logger.info(error);
|
|
|
+ try {
|
|
|
+ lifecycleListeners.forEach(listener -> listener.onCommunicationFailure(this, new ListenerException(error)));
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.warn("keepalive error", e);
|
|
|
+ }
|
|
|
}
|
|
|
break;
|
|
|
}
|