瀏覽代碼

!325 merge
Merge pull request !325 from AE86/v_2.0

AE86 4 月之前
父節點
當前提交
538a3a3a57

+ 19 - 2
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/binlog/BinaryLogRemoteClient.java

@@ -74,7 +74,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
 
 
     private EventDeserializer eventDeserializer;
     private EventDeserializer eventDeserializer;
     private Map<Long, TableMapEventData> tableMapEventByTableId;
     private Map<Long, TableMapEventData> tableMapEventByTableId;
-    private boolean blocking = true;
+    private final boolean blocking = true;
     private long serverId = 65535L;
     private long serverId = 65535L;
     private volatile String binlogFilename;
     private volatile String binlogFilename;
     private volatile long binlogPosition = 4;
     private volatile long binlogPosition = 4;
@@ -96,6 +96,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
     private boolean useBinlogFilenamePositionInGtidMode;
     private boolean useBinlogFilenamePositionInGtidMode;
     private Boolean isMariaDB;
     private Boolean isMariaDB;
 
 
+    private static final int MYSQL_VERSION_8_4 = 840000;
     private final List<BinaryLogRemoteClient.EventListener> eventListeners = new CopyOnWriteArrayList<>();
     private final List<BinaryLogRemoteClient.EventListener> eventListeners = new CopyOnWriteArrayList<>();
     private final List<BinaryLogRemoteClient.LifecycleListener> lifecycleListeners = new CopyOnWriteArrayList<>();
     private final List<BinaryLogRemoteClient.LifecycleListener> lifecycleListeners = new CopyOnWriteArrayList<>();
 
 
@@ -425,8 +426,24 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
 
 
     }
     }
 
 
+    private Integer getVersion() throws IOException {
+        channel.write(new QueryCommand("SELECT VERSION()"));
+        ResultSetRowPacket[] resultSet = readResultSet();
+        if (resultSet.length == 0) {
+            throw new IOException("Failed to getVersion, command SELECT VERSION()");
+        }
+        ResultSetRowPacket resultSetRow = resultSet[0];
+        String version = resultSetRow.getValue(0).replace(".", "");
+        return Integer.parseInt(String.format("%-6s", version).replace(" ", "0")) ;
+    }
+
     private void fetchBinlogFilenameAndPosition() throws IOException {
     private void fetchBinlogFilenameAndPosition() throws IOException {
-        channel.write(new QueryCommand("show master status"));
+        if (getVersion() >= MYSQL_VERSION_8_4) {
+            channel.write(new QueryCommand("SHOW BINARY LOG STATUS"));
+        } else {
+            channel.write(new QueryCommand("show master status"));
+        }
+
         ResultSetRowPacket[] resultSet = readResultSet();
         ResultSetRowPacket[] resultSet = readResultSet();
         if (resultSet.length == 0) {
         if (resultSet.length == 0) {
             throw new IOException("Failed to determine binlog filename/position");
             throw new IOException("Failed to determine binlog filename/position");