AE86 před 3 roky
rodič
revize
69727665f5

+ 24 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/AbstractMessageDecoder.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.listener.postgresql;
 
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
 import org.postgresql.replication.LogSequenceNumber;
 
 import java.nio.ByteBuffer;
@@ -16,7 +17,29 @@ public abstract class AbstractMessageDecoder implements MessageDecoder {
 
     @Override
     public boolean skipMessage(ByteBuffer buffer, LogSequenceNumber startLsn, LogSequenceNumber lastReceiveLsn) {
-        return null == lastReceiveLsn || lastReceiveLsn.asLong() == 0 || startLsn.equals(lastReceiveLsn);
+        if (null == lastReceiveLsn || lastReceiveLsn.asLong() == 0 || startLsn.equals(lastReceiveLsn)) {
+            return true;
+        }
+
+        int position = buffer.position();
+        try {
+            MessageTypeEnum type = MessageTypeEnum.getType((char) buffer.get());
+            switch (type) {
+                case BEGIN:
+                case COMMIT:
+                case RELATION:
+                case TRUNCATE:
+                case TYPE:
+                case ORIGIN:
+                case NONE:
+                    return true;
+                default:
+                    // TABLE|INSERT|UPDATE|DELETE
+                    return false;
+            }
+        } finally {
+            buffer.position(position);
+        }
     }
 
     @Override

+ 0 - 27
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/PgOutputMessageDecoder.java

@@ -7,7 +7,6 @@ import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
 import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
-import org.postgresql.replication.LogSequenceNumber;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,32 +52,6 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
         // TODO read table schema
     }
 
-    @Override
-    public boolean skipMessage(ByteBuffer buffer, LogSequenceNumber startLsn, LogSequenceNumber lastReceiveLsn) {
-        if (super.skipMessage(buffer, startLsn, lastReceiveLsn)) {
-            return true;
-        }
-        int position = buffer.position();
-        try {
-            MessageTypeEnum type = MessageTypeEnum.getType((char) buffer.get());
-            switch (type) {
-                case BEGIN:
-                case COMMIT:
-                case RELATION:
-                case TRUNCATE:
-                case TYPE:
-                case ORIGIN:
-                case NONE:
-                    return true;
-                default:
-                    // TABLE|INSERT|UPDATE|DELETE
-                    return false;
-            }
-        } finally {
-            buffer.position(position);
-        }
-    }
-
     @Override
     public RowChangedEvent processMessage(ByteBuffer buffer) {
         if (!buffer.hasArray()) {

+ 0 - 30
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/TestDecodingMessageDecoder.java

@@ -8,7 +8,6 @@ import org.dbsyncer.listener.postgresql.column.Lexer;
 import org.dbsyncer.listener.postgresql.column.TestDecodingColumnValue;
 import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
-import org.postgresql.replication.LogSequenceNumber;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,35 +27,6 @@ public class TestDecodingMessageDecoder extends AbstractMessageDecoder {
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private static final ColumnValueResolver resolver = new ColumnValueResolver();
 
-    @Override
-    public boolean skipMessage(ByteBuffer buffer, LogSequenceNumber startLsn, LogSequenceNumber lastReceiveLsn) {
-        if (super.skipMessage(buffer, startLsn, lastReceiveLsn)) {
-            return true;
-        }
-        int position = buffer.position();
-        try {
-            MessageTypeEnum type = MessageTypeEnum.getType((char) buffer.get());
-            switch (type) {
-                case BEGIN:
-                case COMMIT:
-                case RELATION:
-                case TRUNCATE:
-                case TYPE:
-                case ORIGIN:
-                case INSERT:
-                case UPDATE:
-                case DELETE:
-                case NONE:
-                    return true;
-                default:
-                    // TABLE
-                    return false;
-            }
-        } finally {
-            buffer.position(position);
-        }
-    }
-
     @Override
     public RowChangedEvent processMessage(ByteBuffer buffer) {
         if (!buffer.hasArray()) {