AE86 3 ani în urmă
părinte
comite
f2cb2e94e0

+ 64 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/Lexer.java

@@ -0,0 +1,64 @@
+package org.dbsyncer.listener.postgresql.column;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/4/24 18:22
+ */
+public final class Lexer {
+    private final char[] array;
+    private final int length;
+    private int pos = 0;
+    private String token;
+
+    public Lexer(String input) {
+        this.array = input.toCharArray();
+        this.length = this.array.length;
+    }
+
+    public String token() {
+        return token;
+    }
+
+    public String nextToken(char comma) {
+        if (pos < length) {
+            StringBuilder out = new StringBuilder(16);
+            while (pos < length && array[pos] != comma) {
+                out.append(array[pos]);
+                pos++;
+            }
+            pos++;
+            return token = out.toString();
+        }
+        return token = null;
+    }
+
+    public String nextTokenToQuote() {
+        if (pos < length) {
+            int commaCount = 1;
+            StringBuilder out = new StringBuilder(16);
+            while (!((pos == length - 1 || (array[pos + 1] == ' ' && commaCount % 2 == 1)) && array[pos] == '\'')) {
+                if (array[pos] == '\'') {
+                    commaCount++;
+                }
+                out.append(array[pos]);
+                pos++;
+            }
+            pos++;
+            return token = out.toString();
+        }
+        return token = null;
+    }
+
+    public void skip(int skip) {
+        this.pos += skip;
+    }
+
+    public char current() {
+        return array[pos];
+    }
+
+    public boolean hasNext() {
+        return pos < length;
+    }
+}

+ 47 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/PgOutputMessageDecoder.java

@@ -3,7 +3,11 @@ package org.dbsyncer.listener.postgresql.decoder;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
 import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
+import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
+import org.postgresql.replication.LogSequenceNumber;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 
@@ -14,8 +18,45 @@ import java.nio.ByteBuffer;
  */
 public class PgOutputMessageDecoder extends AbstractMessageDecoder {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public boolean skipMessage(ByteBuffer buffer, LogSequenceNumber startLsn, LogSequenceNumber lastReceiveLsn) {
+        if (super.skipMessage(buffer, startLsn, lastReceiveLsn)) {
+            return true;
+        }
+        int position = buffer.position();
+        try {
+            MessageTypeEnum type = MessageTypeEnum.getType((char) buffer.get());
+            switch (type) {
+                case BEGIN:
+                case COMMIT:
+                case RELATION:
+                case TRUNCATE:
+                case TYPE:
+                case ORIGIN:
+                case NONE:
+                    return true;
+                default:
+                    // TABLE|INSERT|UPDATE|DELETE
+                    return false;
+            }
+        } finally {
+            buffer.position(position);
+        }
+    }
+
     @Override
     public RowChangedEvent processMessage(ByteBuffer buffer) {
+        if (!buffer.hasArray()) {
+            throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
+        }
+        MessageTypeEnum type = MessageTypeEnum.getType((char) buffer.get());
+        if (MessageTypeEnum.TABLE == type) {
+            int offset = buffer.arrayOffset();
+            byte[] source = buffer.array();
+            return parseMessage(new String(source, offset, (source.length - offset)));
+        }
         return null;
     }
 
@@ -30,4 +71,10 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
         builder.withSlotOption("publication_names", String.format("dbs_pub_%s_%s", config.getSchema(), config.getUsername()));
     }
 
+    private RowChangedEvent parseMessage(String message) {
+        logger.info(message);
+
+        return null;
+    }
+
 }

+ 1 - 60
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/TestDecodingMessageDecoder.java

@@ -5,6 +5,7 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
 import org.dbsyncer.listener.postgresql.column.ColumnValueResolver;
+import org.dbsyncer.listener.postgresql.column.Lexer;
 import org.dbsyncer.listener.postgresql.column.TestDecodingColumnValue;
 import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
@@ -149,64 +150,4 @@ public class TestDecodingMessageDecoder extends AbstractMessageDecoder {
         return lexer.token();
     }
 
-    final class Lexer {
-        private final String input;
-        private final char[] array;
-        private final int length;
-        private int pos = 0;
-        private String token;
-
-        public Lexer(String input) {
-            this.input = input;
-            this.array = input.toCharArray();
-            this.length = this.array.length;
-        }
-
-        public String token() {
-            return token;
-        }
-
-        public String nextToken(char comma) {
-            if (pos < length) {
-                StringBuilder out = new StringBuilder(16);
-                while (pos < length && array[pos] != comma) {
-                    out.append(array[pos]);
-                    pos++;
-                }
-                pos++;
-                return token = out.toString();
-            }
-            return token = null;
-        }
-
-        public String nextTokenToQuote() {
-            if (pos < length) {
-                int commaCount = 1;
-                StringBuilder out = new StringBuilder(16);
-                while (!((pos == length - 1 || (array[pos + 1] == ' ' && commaCount % 2 == 1)) && array[pos] == '\'')) {
-                    if (array[pos] == '\'') {
-                        commaCount++;
-                    }
-                    out.append(array[pos]);
-                    pos++;
-                }
-                pos++;
-                return token = out.toString();
-            }
-            return token = null;
-        }
-
-        public void skip(int skip) {
-            this.pos += skip;
-        }
-
-        public char current() {
-            return array[pos];
-        }
-
-        public boolean hasNext() {
-            return pos < length;
-        }
-    }
-
 }