AE86 4 years ago
parent
commit
9538dc8283

+ 6 - 0
dbsyncer-listener/pom.xml

@@ -24,5 +24,11 @@
             <artifactId>mysql-binlog-connector-java</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
     </dependencies>
 </project>

+ 25 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/BinaryLogClient.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.listener.mysql;
 
 import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
 import com.github.shyiko.mysql.binlog.network.AuthenticationException;
 import com.github.shyiko.mysql.binlog.network.ServerException;
 
@@ -67,4 +68,28 @@ public interface BinaryLogClient {
      */
     void setBinlogPosition(long binlogPosition);
 
+    /**
+     * @return event deserializer
+     * @see #setEventDeserializer(EventDeserializer)
+     */
+    EventDeserializer getEventDeserializer();
+
+    /**
+     * @param eventDeserializer custom event deserializer
+     */
+    void setEventDeserializer(EventDeserializer eventDeserializer);
+
+    /**
+     * SimpleEventModel
+     * @return
+     */
+    boolean isSimpleEventModel();
+
+    /**
+     * <p>true: ROTATE > FORMAT_DESCRIPTION > TABLE_MAP > WRITE_ROWS > UPDATE_ROWS > DELETE_ROWS > XID
+     * <p>false: Support all events
+     *
+     * @param simpleEventModel
+     */
+    void setSimpleEventModel(boolean simpleEventModel);
 }

+ 88 - 44
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/BinaryLogRemoteClient.java

@@ -20,9 +20,7 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.security.GeneralSecurityException;
 import java.security.cert.X509Certificate;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -35,14 +33,16 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
 
         @Override
         protected void initSSLContext(SSLContext sc) throws GeneralSecurityException {
-            sc.init(null, new TrustManager[] {
+            sc.init(null, new TrustManager[]{
                     new X509TrustManager() {
 
                         @Override
-                        public void checkClientTrusted(X509Certificate[] x509Certificates, String s) { }
+                        public void checkClientTrusted(X509Certificate[] x509Certificates, String s) {
+                        }
 
                         @Override
-                        public void checkServerTrusted(X509Certificate[] x509Certificates, String s) { }
+                        public void checkServerTrusted(X509Certificate[] x509Certificates, String s) {
+                        }
 
                         @Override
                         public X509Certificate[] getAcceptedIssuers() {
@@ -56,36 +56,33 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
     // https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
     private static final int MAX_PACKET_LENGTH = 16777215;
 
-    private final String  hostname;
-    private final int     port;
-    private final String  schema;
-    private final String  username;
-    private final String  password;
-    private       SSLMode sslMode = SSLMode.DISABLED;
-
-    private          boolean blocking       = true;
-    private          long    serverId       = 65535;
-    private volatile String  binlogFilename;
-    private volatile long    binlogPosition = 4;
-    private volatile long    connectionId;
-
-    private final Object  gtidSetAccessLock = new Object();
-    private       GtidSet gtidSet;
-    private       boolean gtidSetFallbackToPurged;
-    private       boolean useBinlogFilenamePositionInGtidMode;
-    private       String  gtid;
-    private       boolean tx;
-
-    private       EventDeserializer                             eventDeserializer  = new EventDeserializer();
-    private final List<BinaryLogRemoteClient.EventListener>     eventListeners     = new CopyOnWriteArrayList<>();
-    private final List<BinaryLogRemoteClient.LifecycleListener> lifecycleListeners = new CopyOnWriteArrayList<>();
-
+    private final String hostname;
+    private final int port;
+    private final String schema;
+    private final String username;
+    private final String password;
+    private SSLMode sslMode = SSLMode.DISABLED;
+
+    private EventDeserializer eventDeserializer;
+    private boolean blocking = true;
+    private boolean simpleEventModel = false;
+    private long serverId = 65535;
+    private volatile String binlogFilename;
+    private volatile long binlogPosition = 4;
+    private volatile long connectionId;
     private volatile PacketChannel channel;
-    private volatile boolean       connected;
-
-    private int timeout = 3000;
+    private volatile boolean connected;
 
     private final Lock connectLock = new ReentrantLock();
+    private final Object gtidSetAccessLock = new Object();
+    private GtidSet gtidSet;
+    private String gtid;
+    private boolean tx;
+    private boolean gtidSetFallbackToPurged;
+    private boolean useBinlogFilenamePositionInGtidMode;
+
+    private final List<BinaryLogRemoteClient.EventListener> eventListeners = new CopyOnWriteArrayList<>();
+    private final List<BinaryLogRemoteClient.LifecycleListener> lifecycleListeners = new CopyOnWriteArrayList<>();
 
     /**
      * Alias for BinaryLogRemoteClient(hostname, port, &lt;no schema&gt; = null, username, password).
@@ -119,6 +116,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
             if (isConnected()) {
                 throw new IllegalStateException("BinaryLogRemoteClient is already connected");
             }
+            setConfig();
             openChannel();
             // dump binary log
             requestBinaryLogStream(channel);
@@ -161,7 +159,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
     private void openChannel() throws IOException {
         try {
             Socket socket = new Socket();
-            socket.connect(new InetSocketAddress(hostname, port), timeout);
+            socket.connect(new InetSocketAddress(hostname, port), 3000);
             channel = new PacketChannel(socket);
             if (channel.getInputStream().peek() == -1) {
                 throw new EOFException();
@@ -523,6 +521,39 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         }
     }
 
+    private void setConfig() {
+        IdentityHashMap eventDataDeserializers = new IdentityHashMap();
+        Map<Long, TableMapEventData> tableMapEventByTableId = new HashMap();
+        if(null == eventDeserializer){
+            this.eventDeserializer = new EventDeserializer(new EventHeaderV4Deserializer(), new NullEventDataDeserializer(), eventDataDeserializers, tableMapEventByTableId);
+            eventDeserializer.setCompatibilityMode(
+                    EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
+                    EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
+            );
+        }
+
+        // Process event priority: RotateEvent > FormatDescriptionEvent > TableMapEvent > RowsEvent > XidEvent
+        eventDataDeserializers.put(EventType.ROTATE, new RotateEventDataDeserializer());
+        eventDataDeserializers.put(EventType.FORMAT_DESCRIPTION, new FormatDescriptionEventDataDeserializer());
+        eventDataDeserializers.put(EventType.TABLE_MAP, new TableMapEventDataDeserializer());
+        eventDataDeserializers.put(EventType.UPDATE_ROWS, new UpdateRowsEventDataDeserializer(tableMapEventByTableId));
+        eventDataDeserializers.put(EventType.WRITE_ROWS, new WriteRowsEventDataDeserializer(tableMapEventByTableId));
+        eventDataDeserializers.put(EventType.DELETE_ROWS, new DeleteRowsEventDataDeserializer(tableMapEventByTableId));
+        eventDataDeserializers.put(EventType.XID, new XidEventDataDeserializer());
+
+        if(!simpleEventModel){
+            eventDataDeserializers.put(EventType.INTVAR, new IntVarEventDataDeserializer());
+            eventDataDeserializers.put(EventType.QUERY, new QueryEventDataDeserializer());
+            eventDataDeserializers.put(EventType.EXT_WRITE_ROWS, (new WriteRowsEventDataDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
+            eventDataDeserializers.put(EventType.EXT_UPDATE_ROWS, (new UpdateRowsEventDataDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
+            eventDataDeserializers.put(EventType.EXT_DELETE_ROWS, (new DeleteRowsEventDataDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));
+            eventDataDeserializers.put(EventType.ROWS_QUERY, new RowsQueryEventDataDeserializer());
+            eventDataDeserializers.put(EventType.GTID, new GtidEventDataDeserializer());
+            eventDataDeserializers.put(EventType.PREVIOUS_GTIDS, new PreviousGtidSetDeserializer());
+            eventDataDeserializers.put(EventType.XA_PREPARE, new XAPrepareEventDataDeserializer());
+        }
+    }
+
     private void notifyEventListeners(Event event) {
         if (event.getData() instanceof EventDeserializer.EventDataWrapper) {
             event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper) event.getData()).getExternal());
@@ -570,6 +601,29 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         this.binlogPosition = binlogPosition;
     }
 
+    @Override
+    public EventDeserializer getEventDeserializer() {
+        return eventDeserializer;
+    }
+
+    @Override
+    public void setEventDeserializer(EventDeserializer eventDeserializer) {
+        if (eventDeserializer == null) {
+            throw new IllegalArgumentException("Event deserializer cannot be NULL");
+        }
+        this.eventDeserializer = eventDeserializer;
+    }
+
+    @Override
+    public boolean isSimpleEventModel() {
+        return simpleEventModel;
+    }
+
+    @Override
+    public void setSimpleEventModel(boolean simpleEventModel) {
+        this.simpleEventModel = simpleEventModel;
+    }
+
     public SSLMode getSSLMode() {
         return sslMode;
     }
@@ -641,16 +695,6 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         this.useBinlogFilenamePositionInGtidMode = useBinlogFilenamePositionInGtidMode;
     }
 
-    /**
-     * @param eventDeserializer custom event deserializer
-     */
-    public void setEventDeserializer(EventDeserializer eventDeserializer) {
-        if (eventDeserializer == null) {
-            throw new IllegalArgumentException("Event deserializer cannot be NULL");
-        }
-        this.eventDeserializer = eventDeserializer;
-    }
-
     public interface EventListener {
 
         void onEvent(Event event);

+ 58 - 0
dbsyncer-listener/src/main/test/BinaryLogRemoteClientTest.java

@@ -0,0 +1,58 @@
+import com.github.shyiko.mysql.binlog.event.Event;
+import org.dbsyncer.listener.mysql.BinaryLogClient;
+import org.dbsyncer.listener.mysql.BinaryLogRemoteClient;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-11-13 22:25
+ */
+public class BinaryLogRemoteClientTest {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Test
+    public void testConnect() throws Exception {
+        String hostname = "127.0.0.1";
+        int port = 3306;
+        String username = "root";
+        String password = "123";
+
+        BinaryLogClient client = new BinaryLogRemoteClient(hostname, port, username, password);
+        client.setBinlogFilename("mysql_bin.000029");
+        client.setBinlogPosition(154);
+        client.registerEventListener(new BinaryLogRemoteClient.EventListener() {
+            @Override
+            public void onEvent(Event event) {
+                logger.info(event.toString());
+            }
+        });
+        client.registerLifecycleListener(new BinaryLogRemoteClient.LifecycleListener() {
+            @Override
+            public void onConnect(BinaryLogRemoteClient client) {
+                logger.info("建立连接");
+            }
+
+            @Override
+            public void onCommunicationFailure(BinaryLogRemoteClient client, Exception ex) {
+                logger.error("连接异常", ex);
+            }
+
+            @Override
+            public void onEventDeserializationFailure(BinaryLogRemoteClient client, Exception ex) {
+                logger.error("解析异常", ex);
+            }
+
+            @Override
+            public void onDisconnect(BinaryLogRemoteClient client) {
+                logger.error("断开连接");
+            }
+        });
+
+        client.connect();
+    }
+
+}