|
@@ -88,6 +88,18 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
RowChangedEvent event = null;
|
|
|
MessageTypeEnum type = MessageTypeEnum.getType((char) buffer.get());
|
|
|
switch (type) {
|
|
|
+ case UPDATE:
|
|
|
+ event = parseUpdate(buffer);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case INSERT:
|
|
|
+ event = parseInsert(buffer);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case DELETE:
|
|
|
+ event = parseDelete(buffer);
|
|
|
+ break;
|
|
|
+
|
|
|
case BEGIN:
|
|
|
long beginLsn = buffer.getLong();
|
|
|
long beginTs = buffer.getLong();
|
|
@@ -103,18 +115,6 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
logger.info("Commit: LSN {}, end LSN {}, ts {}", commitLsn, commitEndLsn, PG_EPOCH.plusNanos(commitTs * 1000L));
|
|
|
break;
|
|
|
|
|
|
- case UPDATE:
|
|
|
- event = parseUpdate(buffer);
|
|
|
- break;
|
|
|
-
|
|
|
- case INSERT:
|
|
|
- event = parseInsert(buffer);
|
|
|
- break;
|
|
|
-
|
|
|
- case DELETE:
|
|
|
- event = parseDelete(buffer);
|
|
|
- break;
|
|
|
-
|
|
|
default:
|
|
|
logger.info("Type {} not implemented", type.name());
|
|
|
}
|