AE86 3 роки тому
батько
коміт
2fe927aba5

+ 124 - 17
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/PgOutputMessageDecoder.java

@@ -13,7 +13,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * @author AE86
@@ -24,6 +27,8 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    private static final LocalDateTime PG_EPOCH = LocalDateTime.of(2000, 1, 1, 0, 0, 0);
+
     @Override
     public void postProcessBeforeInitialization(DatabaseConnectorMapper connectorMapper) {
         String pubName = getPubName();
@@ -44,6 +49,8 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
         } catch (Exception e) {
             throw new ListenerException(e.getCause());
         }
+
+        // TODO read table schema
     }
 
     @Override
@@ -78,26 +85,105 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
             throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
         }
 
+        RowChangedEvent event = null;
         MessageTypeEnum type = MessageTypeEnum.getType((char) buffer.get());
-        if (MessageTypeEnum.UPDATE == type) {
-            final byte[] source = buffer.array();
-            final byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length);
-            return parseMessage(ConnectorConstant.OPERTION_UPDATE, new String(content));
+        switch (type) {
+            case BEGIN:
+                long beginLsn = buffer.getLong();
+                long beginTs = buffer.getLong();
+                long xid = buffer.getInt();
+                logger.info("Begin LSN {}, timestamp {}, xid {} - {}", beginLsn, PG_EPOCH.plusNanos(beginTs * 1000L), xid, beginTs);
+                break;
+
+            case COMMIT:
+                buffer.get();
+                long commitLsn = buffer.getLong();
+                long commitEndLsn = buffer.getLong();
+                long commitTs = buffer.getLong();
+                logger.info("Commit: LSN {}, end LSN {}, ts {}", commitLsn, commitEndLsn, PG_EPOCH.plusNanos(commitTs * 1000L));
+                break;
+
+            case UPDATE:
+                event = parseUpdate(buffer);
+                break;
+
+            case INSERT:
+                event = parseInsert(buffer);
+                break;
+
+            case DELETE:
+                event = parseDelete(buffer);
+                break;
+
+            default:
+                logger.info("Type {} not implemented", type.name());
         }
 
-        if (MessageTypeEnum.INSERT == type) {
-            final byte[] source = buffer.array();
-            final byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length);
-            return parseMessage(ConnectorConstant.OPERTION_INSERT, new String(content));
+        if (null != event) {
+            logger.info(event.toString());
         }
 
-        if (MessageTypeEnum.DELETE == type) {
-            final byte[] source = buffer.array();
-            final byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length);
-            return parseMessage(ConnectorConstant.OPERTION_DELETE, new String(content));
+        return null;
+    }
+
+    private RowChangedEvent parseDelete(ByteBuffer buffer) {
+        int relationId = buffer.getInt();
+        logger.info("Delete table {}", relationId);
+
+        List<Object> data = new ArrayList<>();
+        String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
+
+        switch (newTuple) {
+            case "K":
+                readTupleData(buffer, data);
+                break;
+            default:
+                logger.info("K not set, got instead {}", newTuple);
         }
+        return new RowChangedEvent(String.valueOf(relationId), ConnectorConstant.OPERTION_INSERT, data, Collections.EMPTY_LIST);
+    }
 
-        return null;
+    private RowChangedEvent parseInsert(ByteBuffer buffer) {
+        int relationId = buffer.getInt();
+        logger.info("Insert table {}", relationId);
+
+        List<Object> data = new ArrayList<>();
+        String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
+        switch (newTuple) {
+            case "N":
+                readTupleData(buffer, data);
+                break;
+            default:
+                logger.info("N not set, got instead {}", newTuple);
+        }
+        return new RowChangedEvent(String.valueOf(relationId), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, data);
+    }
+
+    private RowChangedEvent parseUpdate(ByteBuffer buffer) {
+        int relationId = buffer.getInt();
+        logger.info("Update table {}", relationId);
+
+        List<Object> data = new ArrayList<>();
+        String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
+        switch (newTuple) {
+            case "K":
+                logger.info("Key update");
+                logger.info("Old Key");
+                readTupleData(buffer, data);
+                break;
+            case "O":
+                logger.info("Value update");
+                logger.info("Old Value");
+                readTupleData(buffer, data);
+                break;
+            case "N":
+                readTupleData(buffer, data);
+                break;
+            default:
+                logger.info("K or O Byte1 not set, got instead {}", newTuple);
+        }
+
+        return new RowChangedEvent(String.valueOf(relationId), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, data);
     }
 
     @Override
@@ -115,10 +201,31 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
         return String.format("dbs_pub_%s_%s", config.getSchema(), config.getUsername());
     }
 
-    private RowChangedEvent parseMessage(String event, String message) {
-        logger.info(message);
+    private void readTupleData(ByteBuffer msg, List<Object> data) {
+        short nColumn = msg.getShort();
+        for (int n = 0; n < nColumn; n++) {
+            String tupleContentType = new String(new byte[]{msg.get()}, 0, 1);
+            if (tupleContentType.equals("t")) {
+                int size = msg.getInt();
+                byte[] text = new byte[size];
+
+                for (int z = 0; z < size; z++) {
+                    text[z] = msg.get();
+                }
+                String content = new String(text, 0, size);
+                data.add(content);
+                continue;
+            }
+
+            if (tupleContentType.equals("n")) {
+                data.add(null);
+                continue;
+            }
 
-        return null;
+            if (tupleContentType.equals("u")) {
+                data.add("TOASTED");
+            }
+        }
     }
 
 }

+ 16 - 9
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/TestDecodingMessageDecoder.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.listener.postgresql.decoder;
 
 import org.dbsyncer.common.event.RowChangedEvent;
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
 import org.dbsyncer.listener.postgresql.column.ColumnValueResolver;
@@ -11,6 +10,8 @@ import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
 import org.postgresql.replication.LogSequenceNumber;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -24,6 +25,7 @@ import java.util.List;
  */
 public class TestDecodingMessageDecoder extends AbstractMessageDecoder {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
     private static final ColumnValueResolver resolver = new ColumnValueResolver();
 
     @Override
@@ -109,16 +111,21 @@ public class TestDecodingMessageDecoder extends AbstractMessageDecoder {
         }
 
         RowChangedEvent event = null;
-        if (StringUtil.equals(ConnectorConstant.OPERTION_UPDATE, eventType)) {
-            event = new RowChangedEvent(table, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, data);
-        }
+        switch (eventType) {
+            case ConnectorConstant.OPERTION_UPDATE:
+                event = new RowChangedEvent(table, eventType, Collections.EMPTY_LIST, data);
+                break;
 
-        if (StringUtil.equals(ConnectorConstant.OPERTION_INSERT, eventType)) {
-            event = new RowChangedEvent(table, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, data);
-        }
+            case ConnectorConstant.OPERTION_INSERT:
+                event = new RowChangedEvent(table, eventType, Collections.EMPTY_LIST, data);
+                break;
+
+            case ConnectorConstant.OPERTION_DELETE:
+                event = new RowChangedEvent(table, eventType, data, Collections.EMPTY_LIST);
+                break;
 
-        if (StringUtil.equals(ConnectorConstant.OPERTION_DELETE, eventType)) {
-            event = new RowChangedEvent(table, ConnectorConstant.OPERTION_DELETE, data, Collections.EMPTY_LIST);
+            default:
+                logger.info("Type {} not implemented", eventType);
         }
         return event;
     }