瀏覽代碼

兼容阿里云rds低版本协议事件

穿云 1 月之前
父節點
當前提交
2a7a9fbfa9

+ 13 - 10
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/binlog/BinaryLogRemoteClient.java

@@ -12,9 +12,12 @@ import com.github.shyiko.mysql.binlog.network.*;
 import com.github.shyiko.mysql.binlog.network.protocol.*;
 import com.github.shyiko.mysql.binlog.network.protocol.command.*;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.mysql.deserializer.DeleteDeserializer;
-import org.dbsyncer.connector.mysql.deserializer.UpdateDeserializer;
-import org.dbsyncer.connector.mysql.deserializer.WriteDeserializer;
+import org.dbsyncer.connector.mysql.deserializer.DeleteDeserialize;
+import org.dbsyncer.connector.mysql.deserializer.ExtDeleteDeserializer;
+import org.dbsyncer.connector.mysql.deserializer.ExtUpdateDeserializer;
+import org.dbsyncer.connector.mysql.deserializer.UpdateDeserialize;
+import org.dbsyncer.connector.mysql.deserializer.ExtWriteDeserializer;
+import org.dbsyncer.connector.mysql.deserializer.WriteDeserialize;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -191,7 +194,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
     }
 
     private String createClientId() {
-        return new StringBuilder(hostname).append(":").append(port).append("_").append(connectionId).toString();
+        return hostname + ":" + port + "_" + connectionId;
     }
 
     private void openChannel() throws IOException {
@@ -584,12 +587,12 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         eventDataDeserializers.put(EventType.ROTATE, new RotateEventDataDeserializer());
         eventDataDeserializers.put(EventType.FORMAT_DESCRIPTION, new FormatDescriptionEventDataDeserializer());
         eventDataDeserializers.put(EventType.TABLE_MAP, new TableMapEventDataDeserializer());
-        eventDataDeserializers.put(EventType.UPDATE_ROWS, new UpdateRowsEventDataDeserializer(tableMapEventByTableId));
-        eventDataDeserializers.put(EventType.WRITE_ROWS, new WriteRowsEventDataDeserializer(tableMapEventByTableId));
-        eventDataDeserializers.put(EventType.DELETE_ROWS, new DeleteRowsEventDataDeserializer(tableMapEventByTableId));
-        eventDataDeserializers.put(EventType.EXT_WRITE_ROWS, (new WriteDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
-        eventDataDeserializers.put(EventType.EXT_UPDATE_ROWS, (new UpdateDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
-        eventDataDeserializers.put(EventType.EXT_DELETE_ROWS, (new DeleteDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
+        eventDataDeserializers.put(EventType.UPDATE_ROWS, new UpdateDeserialize(tableMapEventByTableId));
+        eventDataDeserializers.put(EventType.WRITE_ROWS, new WriteDeserialize(tableMapEventByTableId));
+        eventDataDeserializers.put(EventType.DELETE_ROWS, new DeleteDeserialize(tableMapEventByTableId));
+        eventDataDeserializers.put(EventType.EXT_WRITE_ROWS, (new ExtWriteDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
+        eventDataDeserializers.put(EventType.EXT_UPDATE_ROWS, (new ExtUpdateDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
+        eventDataDeserializers.put(EventType.EXT_DELETE_ROWS, (new ExtDeleteDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
         eventDataDeserializers.put(EventType.XID, new XidEventDataDeserializer());
         eventDataDeserializers.put(EventType.INTVAR, new IntVarEventDataDeserializer());
         eventDataDeserializers.put(EventType.QUERY, new QueryEventDataDeserializer());

+ 28 - 0
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/DeleteDeserialize.java

@@ -0,0 +1,28 @@
+/**
+ * DBSyncer Copyright 2020-2025 All Rights Reserved.
+ */
+package org.dbsyncer.connector.mysql.deserializer;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.DeleteRowsEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * @Author 穿云
+ * @Version 1.0.0
+ * @Date 2025-04-12 15:21
+ */
+public class DeleteDeserialize extends DeleteRowsEventDataDeserializer {
+    private final JsonBinaryDeserialize jsonBinaryDeserialize = new JsonBinaryDeserialize();
+
+    public DeleteDeserialize(Map<Long, TableMapEventData> tableMapEventByTableId) {
+        super(tableMapEventByTableId);
+    }
+
+    protected byte[] deserializeJson(int meta, ByteArrayInputStream inputStream) throws IOException {
+        return jsonBinaryDeserialize.deserializeJson(meta, inputStream);
+    }
+}

+ 2 - 2
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/DeleteDeserializer.java → dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/ExtDeleteDeserializer.java

@@ -11,12 +11,12 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
-public final class DeleteDeserializer extends DeleteRowsEventDataDeserializer {
+public final class ExtDeleteDeserializer extends DeleteRowsEventDataDeserializer {
 
     private final DatetimeV2Deserialize datetimeV2Deserialize = new DatetimeV2Deserialize();
     private final JsonBinaryDeserialize jsonBinaryDeserialize = new JsonBinaryDeserialize();
 
-    public DeleteDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
+    public ExtDeleteDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
         super(tableMapEventByTableId);
     }
 

+ 2 - 2
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/UpdateDeserializer.java → dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/ExtUpdateDeserializer.java

@@ -11,12 +11,12 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
-public final class UpdateDeserializer extends UpdateRowsEventDataDeserializer {
+public final class ExtUpdateDeserializer extends UpdateRowsEventDataDeserializer {
 
     private final DatetimeV2Deserialize datetimeV2Deserialize = new DatetimeV2Deserialize();
     private final JsonBinaryDeserialize jsonBinaryDeserialize = new JsonBinaryDeserialize();
 
-    public UpdateDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
+    public ExtUpdateDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
         super(tableMapEventByTableId);
     }
 

+ 2 - 2
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/WriteDeserializer.java → dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/ExtWriteDeserializer.java

@@ -11,12 +11,12 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
-public final class WriteDeserializer extends WriteRowsEventDataDeserializer {
+public final class ExtWriteDeserializer extends WriteRowsEventDataDeserializer {
 
     private final DatetimeV2Deserialize datetimeV2Deserialize = new DatetimeV2Deserialize();
     private final JsonBinaryDeserialize jsonBinaryDeserialize = new JsonBinaryDeserialize();
 
-    public WriteDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
+    public ExtWriteDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
         super(tableMapEventByTableId);
     }
 

+ 28 - 0
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/UpdateDeserialize.java

@@ -0,0 +1,28 @@
+/**
+ * DBSyncer Copyright 2020-2025 All Rights Reserved.
+ */
+package org.dbsyncer.connector.mysql.deserializer;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * @Author 穿云
+ * @Version 1.0.0
+ * @Date 2025-04-12 15:05
+ */
+public class UpdateDeserialize extends UpdateRowsEventDataDeserializer {
+    private final JsonBinaryDeserialize jsonBinaryDeserialize = new JsonBinaryDeserialize();
+    public UpdateDeserialize(Map<Long, TableMapEventData> tableMapEventByTableId) {
+        super(tableMapEventByTableId);
+    }
+
+    protected byte[] deserializeJson(int meta, ByteArrayInputStream inputStream) throws IOException {
+        return jsonBinaryDeserialize.deserializeJson(meta, inputStream);
+    }
+
+}

+ 28 - 0
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/WriteDeserialize.java

@@ -0,0 +1,28 @@
+/**
+ * DBSyncer Copyright 2020-2025 All Rights Reserved.
+ */
+package org.dbsyncer.connector.mysql.deserializer;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * @Author 穿云
+ * @Version 1.0.0
+ * @Date 2025-04-12 15:21
+ */
+public class WriteDeserialize extends WriteRowsEventDataDeserializer {
+    private final JsonBinaryDeserialize jsonBinaryDeserialize = new JsonBinaryDeserialize();
+
+    public WriteDeserialize(Map<Long, TableMapEventData> tableMapEventByTableId) {
+        super(tableMapEventByTableId);
+    }
+
+    protected byte[] deserializeJson(int meta, ByteArrayInputStream inputStream) throws IOException {
+        return jsonBinaryDeserialize.deserializeJson(meta, inputStream);
+    }
+}