Ver código fonte

fix expired binlog error

AE86 4 anos atrás
pai
commit
5ee88a01f9

+ 9 - 29
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/BinaryLogRemoteClient.java

@@ -33,7 +33,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
 
         @Override
         protected void initSSLContext(SSLContext sc) throws GeneralSecurityException {
-            sc.init(null, new TrustManager[]{
+            sc.init(null, new TrustManager[] {
                     new X509TrustManager() {
 
                         @Override
@@ -126,12 +126,12 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
             ensureEventDeserializerHasRequiredEDDs();
 
             // new Thread
-            this.worker = new Thread(()-> listenForEventPackets(channel));
+            this.worker = new Thread(() -> listenForEventPackets(channel));
             this.worker.setDaemon(false);
             this.workerThreadName = new StringBuilder("binlog-parser-").append(hostname).append(":").append(port).append("_").append(connectionId).toString();
             this.worker.setName(workerThreadName);
             this.worker.start();
-            notifyConnectEvent();
+            lifecycleListeners.forEach(listener -> listener.onConnect(this));
         } finally {
             connectLock.unlock();
         }
@@ -143,11 +143,11 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
             try {
                 connectLock.lock();
                 closeChannel(channel);
-                if(null != this.worker && !worker.isInterrupted()){
+                if (null != this.worker && !worker.isInterrupted()) {
                     this.worker.interrupt();
                     this.worker = null;
                 }
-                notifyDisconnectEvent();
+                lifecycleListeners.forEach(listener -> listener.onDisconnect(this));
             } finally {
                 connectLock.unlock();
             }
@@ -242,11 +242,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
                     if (cause instanceof EOFException || cause instanceof SocketException) {
                         throw e;
                     }
-                    if (connected) {
-                        for (BinaryLogRemoteClient.LifecycleListener lifecycleListener : lifecycleListeners) {
-                            lifecycleListener.onEventDeserializationFailure(this, e);
-                        }
-                    }
+                    lifecycleListeners.forEach(listener -> listener.onEventDeserializationFailure(this, e));
                     continue;
                 }
                 if (connected) {
@@ -256,11 +252,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
                 }
             }
         } catch (Exception e) {
-            if (connected) {
-                for (BinaryLogRemoteClient.LifecycleListener lifecycleListener : lifecycleListeners) {
-                    lifecycleListener.onCommunicationFailure(this, e);
-                }
-            }
+            lifecycleListeners.forEach(listener -> listener.onCommunicationFailure(this, e));
         }
     }
 
@@ -535,7 +527,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
     }
 
     private void setConfig() {
-        if(null == tableMapEventByTableId){
+        if (null == tableMapEventByTableId) {
             tableMapEventByTableId = new HashMap<>();
         }
 
@@ -556,7 +548,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         eventDataDeserializers.put(EventType.EXT_DELETE_ROWS, (new DeleteRowsEventDataDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
         eventDataDeserializers.put(EventType.XID, new XidEventDataDeserializer());
 
-        if(simpleEventModel){
+        if (simpleEventModel) {
             eventDataDeserializers.put(EventType.INTVAR, new IntVarEventDataDeserializer());
             eventDataDeserializers.put(EventType.QUERY, new QueryEventDataDeserializer());
             eventDataDeserializers.put(EventType.ROWS_QUERY, new RowsQueryEventDataDeserializer());
@@ -582,18 +574,6 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         }
     }
 
-    private void notifyConnectEvent() {
-        for (BinaryLogRemoteClient.LifecycleListener lifecycleListener : lifecycleListeners) {
-            lifecycleListener.onConnect(this);
-        }
-    }
-
-    private void notifyDisconnectEvent() {
-        for (BinaryLogRemoteClient.LifecycleListener lifecycleListener : lifecycleListeners) {
-            lifecycleListener.onDisconnect(this);
-        }
-    }
-
     @Override
     public String getBinlogFilename() {
         return binlogFilename;

+ 21 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/MysqlExtractor.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.listener.mysql;
 
 import com.github.shyiko.mysql.binlog.event.*;
+import com.github.shyiko.mysql.binlog.network.ServerException;
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.connector.config.DatabaseConfig;
@@ -56,7 +57,6 @@ public class MysqlExtractor extends AbstractExtractor {
         } catch (Exception e) {
             logger.error("关闭失败:{}", e.getMessage());
         }
-
     }
 
     private void run() throws Exception {
@@ -102,8 +102,6 @@ public class MysqlExtractor extends AbstractExtractor {
     }
 
     private void reStart() {
-        this.close();
-
         for (int i = 1; i <= RETRY_TIMES; i++) {
             try {
                 if (null != client) {
@@ -154,7 +152,26 @@ public class MysqlExtractor extends AbstractExtractor {
         }
 
         @Override
-        public void onCommunicationFailure(BinaryLogRemoteClient client, Exception ex) {
+        public void onCommunicationFailure(BinaryLogRemoteClient client, Exception e) {
+            logger.error(e.getMessage());
+            /**
+             * e:
+             * case1> Due to the automatic expiration and deletion mechanism of MySQL binlog files, the binlog file cannot be found.
+             * case2> Got fatal error 1236 from master when reading data from binary log.
+             * case3> Log event entry exceeded max_allowed_packet; Increase max_allowed_packet on master.
+             */
+            if(e instanceof ServerException){
+                ServerException serverException = (ServerException) e;
+                if(serverException.getErrorCode() == 1236){
+                    close();
+                    String log = String.format("线程[%s]执行异常。由于MySQL配置了过期binlog文件自动删除机制,已无法找到原binlog文件%s。建议先保存驱动(加载最新的binlog文件),再启动驱动。",
+                            client.getWorkerThreadName(),
+                            client.getBinlogFilename());
+                    interruptException(new ListenerException(log));
+                    return;
+                }
+            }
+
             reStart();
         }
 

+ 1 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -87,6 +87,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             field.setAccessible(true);
             database = (String) field.get(delegate);
         } catch (Exception e) {
+            logger.error("无法连接Mysql,URL:{}", config.getUrl());
             throw new StorageException(e.getMessage());
         } finally {
             JDBCUtil.close(conn);