Browse Source

Support in decoder solution for PostgreSQL plugin 'pgoutput'

AE86 3 năm trước cách đây
mục cha
commit
3c6f34dd67

+ 147 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/AbstractMessageDecoder.java

@@ -1,8 +1,11 @@
 package org.dbsyncer.listener.postgresql;
 package org.dbsyncer.listener.postgresql;
 
 
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.listener.postgresql.column.ColumnValue;
+import org.dbsyncer.listener.postgresql.column.PgColumnValue;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
 import org.postgresql.replication.LogSequenceNumber;
 import org.postgresql.replication.LogSequenceNumber;
+import org.postgresql.util.PGmoney;
 
 
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 
 
@@ -15,6 +18,8 @@ public abstract class AbstractMessageDecoder implements MessageDecoder {
 
 
     protected DatabaseConfig config;
     protected DatabaseConfig config;
 
 
+    private ColumnValue value = new PgColumnValue();
+
     @Override
     @Override
     public boolean skipMessage(ByteBuffer buffer, LogSequenceNumber startLsn, LogSequenceNumber lastReceiveLsn) {
     public boolean skipMessage(ByteBuffer buffer, LogSequenceNumber startLsn, LogSequenceNumber lastReceiveLsn) {
         if (null == lastReceiveLsn || lastReceiveLsn.asLong() == 0 || startLsn.equals(lastReceiveLsn)) {
         if (null == lastReceiveLsn || lastReceiveLsn.asLong() == 0 || startLsn.equals(lastReceiveLsn)) {
@@ -51,4 +56,146 @@ public abstract class AbstractMessageDecoder implements MessageDecoder {
     public void setConfig(DatabaseConfig config) {
     public void setConfig(DatabaseConfig config) {
         this.config = config;
         this.config = config;
     }
     }
+
+    /**
+     * Resolve the value of a {@link ColumnValue}.
+     *
+     * @param typeName
+     * @param columnValue
+     * @return
+     */
+    protected Object resolveValue(String typeName, String columnValue) {
+        value.setValue(columnValue);
+
+        if (value.isNull()) {
+            // nulls are null
+            return null;
+        }
+
+        switch (typeName) {
+            // include all types from https://www.postgresql.org/docs/current/static/datatype.html#DATATYPE-TABLE
+            case "boolean":
+            case "bool":
+                return value.asBoolean();
+
+            case "integer":
+            case "int":
+            case "int4":
+            case "smallint":
+            case "int2":
+            case "smallserial":
+            case "serial":
+            case "serial2":
+            case "serial4":
+                return value.asInteger();
+
+            case "bigint":
+            case "bigserial":
+            case "int8":
+            case "oid":
+                return value.asLong();
+
+            case "real":
+            case "float4":
+                return value.asFloat();
+
+            case "double precision":
+            case "float8":
+                return value.asDouble();
+
+            case "numeric":
+            case "decimal":
+                return value.asDecimal();
+
+            case "character":
+            case "char":
+            case "character varying":
+            case "varchar":
+            case "bpchar":
+            case "text":
+            case "hstore":
+                return value.asString();
+
+            case "date":
+                return value.asLocalDate();
+
+            case "timestamp with time zone":
+            case "timestamptz":
+                return value.asOffsetDateTimeAtUtc();
+
+            case "timestamp":
+            case "timestamp without time zone":
+                return value.asInstant();
+
+            case "time":
+                return value.asTime();
+
+            case "time without time zone":
+                return value.asLocalTime();
+
+            case "time with time zone":
+            case "timetz":
+                return value.asOffsetTimeUtc();
+
+            case "bytea":
+                return value.asByteArray();
+
+            // these are all PG-specific types and we use the JDBC representations
+            // note that, with the exception of point, no converters for these types are implemented yet,
+            // i.e. those values won't actually be propagated to the outbound message until that's the case
+            case "box":
+                return value.asBox();
+            case "circle":
+                return value.asCircle();
+            case "interval":
+                return value.asInterval();
+            case "line":
+                return value.asLine();
+            case "lseg":
+                return value.asLseg();
+            case "money":
+                final Object v = value.asMoney();
+                return (v instanceof PGmoney) ? ((PGmoney) v).val : v;
+            case "path":
+                return value.asPath();
+            case "point":
+                return value.asPoint();
+            case "polygon":
+                return value.asPolygon();
+
+            // PostGIS types are HexEWKB strings
+            // ValueConverter turns them into the correct types
+            case "geometry":
+            case "geography":
+            case "citext":
+            case "bit":
+            case "bit varying":
+            case "varbit":
+            case "json":
+            case "jsonb":
+            case "xml":
+            case "uuid":
+            case "tsrange":
+            case "tstzrange":
+            case "daterange":
+            case "inet":
+            case "cidr":
+            case "macaddr":
+            case "macaddr8":
+            case "int4range":
+            case "numrange":
+            case "int8range":
+                return value.asString();
+
+            // catch-all for other known/builtin PG types
+            case "pg_lsn":
+            case "tsquery":
+            case "tsvector":
+            case "txid_snapshot":
+                // catch-all for unknown (extension module/custom) types
+            default:
+                return null;
+        }
+
+    }
 }
 }

+ 1 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java

@@ -284,13 +284,12 @@ public class PostgreSQLExtractor extends AbstractExtractor {
                     flushLsn(lsn);
                     flushLsn(lsn);
                     // process decoder
                     // process decoder
                     changedEvent(messageDecoder.processMessage(msg));
                     changedEvent(messageDecoder.processMessage(msg));
-                    forceFlushEvent();
 
 
                     // feedback
                     // feedback
                     stream.setAppliedLSN(lsn);
                     stream.setAppliedLSN(lsn);
                     stream.setFlushedLSN(lsn);
                     stream.setFlushedLSN(lsn);
                     stream.forceUpdateStatus();
                     stream.forceUpdateStatus();
-                } catch (IllegalStateException e) {
+                } catch (IllegalStateException | ListenerException e) {
                     logger.error(e.getMessage());
                     logger.error(e.getMessage());
                 } catch (Exception e) {
                 } catch (Exception e) {
                     logger.error(e.getMessage());
                     logger.error(e.getMessage());

+ 2 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/ColumnValue.java

@@ -16,6 +16,8 @@ import java.time.OffsetTime;
  */
  */
 public interface ColumnValue {
 public interface ColumnValue {
 
 
+    void setValue(String value);
+
     boolean isNull();
     boolean isNull();
 
 
     String asString();
     String asString();

+ 0 - 158
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/ColumnValueResolver.java

@@ -1,158 +0,0 @@
-package org.dbsyncer.listener.postgresql.column;
-
-import org.postgresql.util.PGmoney;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/4/23 22:45
- */
-public class ColumnValueResolver {
-
-    /**
-     * Resolve the value of a {@link ColumnValue}.
-     *
-     * @param type
-     * @param value
-     * @return
-     */
-    public Object resolveValue(String type, ColumnValue value) {
-        if (value.isNull()) {
-            // nulls are null
-            return null;
-        }
-
-        switch (type) {
-            // include all types from https://www.postgresql.org/docs/current/static/datatype.html#DATATYPE-TABLE
-            // plus aliases from the shorter names produced by older wal2json
-            case "boolean":
-            case "bool":
-                return value.asBoolean();
-
-            case "hstore":
-                return value.asString();
-
-            case "integer":
-            case "int":
-            case "int4":
-            case "smallint":
-            case "int2":
-            case "smallserial":
-            case "serial":
-            case "serial2":
-            case "serial4":
-                return value.asInteger();
-
-            case "bigint":
-            case "bigserial":
-            case "int8":
-            case "oid":
-                return value.asLong();
-
-            case "real":
-            case "float4":
-                return value.asFloat();
-
-            case "double precision":
-            case "float8":
-                return value.asDouble();
-
-            case "numeric":
-            case "decimal":
-                return value.asDecimal();
-
-            case "character":
-            case "char":
-            case "character varying":
-            case "varchar":
-            case "bpchar":
-            case "text":
-                return value.asString();
-
-            case "date":
-                return value.asLocalDate();
-
-            case "timestamp with time zone":
-            case "timestamptz":
-                return value.asOffsetDateTimeAtUtc();
-
-            case "timestamp":
-            case "timestamp without time zone":
-                return value.asInstant();
-
-            case "time":
-                return value.asTime();
-
-            case "time without time zone":
-                return value.asLocalTime();
-
-            case "time with time zone":
-            case "timetz":
-                return value.asOffsetTimeUtc();
-
-            case "bytea":
-                return value.asByteArray();
-
-            // these are all PG-specific types and we use the JDBC representations
-            // note that, with the exception of point, no converters for these types are implemented yet,
-            // i.e. those values won't actually be propagated to the outbound message until that's the case
-            case "box":
-                return value.asBox();
-            case "circle":
-                return value.asCircle();
-            case "interval":
-                return value.asInterval();
-            case "line":
-                return value.asLine();
-            case "lseg":
-                return value.asLseg();
-            case "money":
-                final Object v = value.asMoney();
-                return (v instanceof PGmoney) ? ((PGmoney) v).val : v;
-            case "path":
-                return value.asPath();
-            case "point":
-                return value.asPoint();
-            case "polygon":
-                return value.asPolygon();
-
-            // PostGIS types are HexEWKB strings
-            // ValueConverter turns them into the correct types
-            case "geometry":
-            case "geography":
-                return value.asString();
-
-            case "citext":
-            case "bit":
-            case "bit varying":
-            case "varbit":
-            case "json":
-            case "jsonb":
-            case "xml":
-            case "uuid":
-            case "tsrange":
-            case "tstzrange":
-            case "daterange":
-            case "inet":
-            case "cidr":
-            case "macaddr":
-            case "macaddr8":
-            case "int4range":
-            case "numrange":
-            case "int8range":
-                return value.asString();
-
-            // catch-all for other known/builtin PG types
-            // TODO: improve with more specific/useful classes here?
-            case "pg_lsn":
-            case "tsquery":
-            case "tsvector":
-            case "txid_snapshot":
-                // catch-all for unknown (extension module/custom) types
-            default:
-                return null;
-        }
-
-    }
-
-}

+ 58 - 58
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/TestDecodingColumnValue.java → dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/PgColumnValue.java

@@ -1,59 +1,59 @@
-package org.dbsyncer.listener.postgresql.column;
-
-import org.dbsyncer.common.util.StringUtil;
-
-import java.math.BigDecimal;
-
-public final class TestDecodingColumnValue extends AbstractColumnValue {
-
-    private String value;
-
-    public TestDecodingColumnValue(String value) {
-        this.value = value;
-    }
-
-    @Override
-    public boolean isNull() {
-        return value == null;
-    }
-
-    @Override
-    public String asString() {
-        return value;
-    }
-
-    @Override
-    public Boolean asBoolean() {
-        return "t".equalsIgnoreCase(value);
-    }
-
-    @Override
-    public Integer asInteger() {
-        return Integer.valueOf(value);
-    }
-
-    @Override
-    public Long asLong() {
-        return Long.valueOf(value);
-    }
-
-    @Override
-    public Float asFloat() {
-        return Float.valueOf(value);
-    }
-
-    @Override
-    public Double asDouble() {
-        return Double.valueOf(value);
-    }
-
-    @Override
-    public Object asDecimal() {
-        return new BigDecimal(value);
-    }
-
-    @Override
-    public byte[] asByteArray() {
-        return StringUtil.hexStringToByteArray(value.substring(2));
-    }
+package org.dbsyncer.listener.postgresql.column;
+
+import org.dbsyncer.common.util.StringUtil;
+
+import java.math.BigDecimal;
+
+public final class PgColumnValue extends AbstractColumnValue {
+
+    private String value;
+
+    public void setValue(String value) {
+        this.value = value;
+    }
+
+    @Override
+    public boolean isNull() {
+        return value == null;
+    }
+
+    @Override
+    public String asString() {
+        return value;
+    }
+
+    @Override
+    public Boolean asBoolean() {
+        return "t".equalsIgnoreCase(value);
+    }
+
+    @Override
+    public Integer asInteger() {
+        return Integer.valueOf(value);
+    }
+
+    @Override
+    public Long asLong() {
+        return Long.valueOf(value);
+    }
+
+    @Override
+    public Float asFloat() {
+        return Float.valueOf(value);
+    }
+
+    @Override
+    public Double asDouble() {
+        return Double.valueOf(value);
+    }
+
+    @Override
+    public Object asDecimal() {
+        return new BigDecimal(value);
+    }
+
+    @Override
+    public byte[] asByteArray() {
+        return StringUtil.hexStringToByteArray(value.substring(2));
+    }
 }
 }

+ 68 - 101
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/PgOutputMessageDecoder.java

@@ -5,7 +5,6 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.config.MetaInfo;
-import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
@@ -14,6 +13,7 @@ import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
 
 
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.time.LocalDateTime;
 import java.time.LocalDateTime;
@@ -29,15 +29,17 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
     private static final LocalDateTime PG_EPOCH = LocalDateTime.of(2000, 1, 1, 0, 0, 0);
     private static final LocalDateTime PG_EPOCH = LocalDateTime.of(2000, 1, 1, 0, 0, 0);
-
     private static final String GET_TABLE_SCHEMA = "select oid,relname as tableName from pg_class t inner join (select ns.oid as nspoid, ns.nspname from pg_namespace ns where ns.nspname = (select (current_schemas(false))[s.r] from generate_series(1, array_upper(current_schemas(false), 1)) as s(r))) as n on n.nspoid = t.relnamespace where relkind = 'r'";
     private static final String GET_TABLE_SCHEMA = "select oid,relname as tableName from pg_class t inner join (select ns.oid as nspoid, ns.nspname from pg_namespace ns where ns.nspname = (select (current_schemas(false))[s.r] from generate_series(1, array_upper(current_schemas(false), 1)) as s(r))) as n on n.nspoid = t.relnamespace where relkind = 'r'";
-
     private static final Map<Integer, TableId> tables = new LinkedHashMap<>();
     private static final Map<Integer, TableId> tables = new LinkedHashMap<>();
+    private ConnectorFactory connectorFactory;
+    private DatabaseConnectorMapper connectorMapper;
 
 
     @Override
     @Override
     public void postProcessBeforeInitialization(ConnectorFactory connectorFactory, DatabaseConnectorMapper connectorMapper) {
     public void postProcessBeforeInitialization(ConnectorFactory connectorFactory, DatabaseConnectorMapper connectorMapper) {
-        initPublication(connectorMapper);
-        readSchema(connectorFactory, connectorMapper);
+        this.connectorFactory = connectorFactory;
+        this.connectorMapper = connectorMapper;
+        initPublication();
+        readSchema();
     }
     }
 
 
     @Override
     @Override
@@ -46,20 +48,12 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
             throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
             throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
         }
         }
 
 
-        RowChangedEvent event = null;
         MessageTypeEnum type = MessageTypeEnum.getType((char) buffer.get());
         MessageTypeEnum type = MessageTypeEnum.getType((char) buffer.get());
         switch (type) {
         switch (type) {
             case UPDATE:
             case UPDATE:
-                event = parseUpdate(buffer);
-                break;
-
             case INSERT:
             case INSERT:
-                event = parseInsert(buffer);
-                break;
-
             case DELETE:
             case DELETE:
-                event = parseDelete(buffer);
-                break;
+                return parseData(type, buffer);
 
 
             case BEGIN:
             case BEGIN:
                 long beginLsn = buffer.getLong();
                 long beginLsn = buffer.getLong();
@@ -80,10 +74,6 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
                 logger.info("Type {} not implemented", type.name());
                 logger.info("Type {} not implemented", type.name());
         }
         }
 
 
-        if (null != event) {
-            logger.info(event.toString());
-        }
-
         return null;
         return null;
     }
     }
 
 
@@ -102,7 +92,7 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
         return String.format("dbs_pub_%s_%s", config.getSchema(), config.getUsername());
         return String.format("dbs_pub_%s_%s", config.getSchema(), config.getUsername());
     }
     }
 
 
-    private void initPublication(DatabaseConnectorMapper connectorMapper) {
+    private void initPublication() {
         String pubName = getPubName();
         String pubName = getPubName();
         String selectPublication = String.format("SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", pubName);
         String selectPublication = String.format("SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", pubName);
         Integer count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(selectPublication, Integer.class));
         Integer count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(selectPublication, Integer.class));
@@ -123,113 +113,90 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
         }
         }
     }
     }
 
 
-    private void readSchema(ConnectorFactory connectorFactory, DatabaseConnectorMapper connectorMapper) {
+    private void readSchema() {
         List<Map> schemas = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(GET_TABLE_SCHEMA));
         List<Map> schemas = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(GET_TABLE_SCHEMA));
         if (!CollectionUtils.isEmpty(schemas)) {
         if (!CollectionUtils.isEmpty(schemas)) {
             schemas.forEach(map -> {
             schemas.forEach(map -> {
                 Long oid = (Long) map.get("oid");
                 Long oid = (Long) map.get("oid");
                 String tableName = (String) map.get("tableName");
                 String tableName = (String) map.get("tableName");
-                tables.put(oid.intValue(), new TableId(oid.intValue(), tableName));
+                MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableName);
+                Assert.notEmpty(metaInfo.getColumn(), String.format("The table column for '%s' must not be empty.", tableName));
+                tables.put(oid.intValue(), new TableId(oid.intValue(), tableName, metaInfo.getColumn()));
             });
             });
         }
         }
-
-        // read column
-        for (TableId tableId : tables.values()) {
-            MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableId.tableName);
-            if(!CollectionUtils.isEmpty(metaInfo.getColumn())){
-                tableId.fields = metaInfo.getColumn();
-            }
-        }
-    }
-
-    private RowChangedEvent parseDelete(ByteBuffer buffer) {
-        int relationId = buffer.getInt();
-
-        List<Object> data = new ArrayList<>();
-        String newTuple = new String(new byte[] {buffer.get()}, 0, 1);
-
-        switch (newTuple) {
-            case "K":
-                readTupleData(buffer, data);
-                break;
-            default:
-                logger.info("K not set, got instead {}", newTuple);
-        }
-        return new RowChangedEvent(tables.get(relationId).tableName, ConnectorConstant.OPERTION_INSERT, data, Collections.EMPTY_LIST);
     }
     }
 
 
-    private RowChangedEvent parseInsert(ByteBuffer buffer) {
-        int relationId = buffer.getInt();
-
-        List<Object> data = new ArrayList<>();
-        String newTuple = new String(new byte[] {buffer.get()}, 0, 1);
-        switch (newTuple) {
-            case "N":
-                readTupleData(buffer, data);
-                break;
-            default:
-                logger.info("N not set, got instead {}", newTuple);
-        }
-        return new RowChangedEvent(tables.get(relationId).tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, data);
-    }
-
-    private RowChangedEvent parseUpdate(ByteBuffer buffer) {
-        int relationId = buffer.getInt();
-
-        List<Object> data = new ArrayList<>();
-        String newTuple = new String(new byte[] {buffer.get()}, 0, 1);
-        switch (newTuple) {
-            case "K":
-                readTupleData(buffer, data);
-                break;
-            case "O":
-                readTupleData(buffer, data);
-                break;
-            case "N":
-                readTupleData(buffer, data);
-                break;
-            default:
-                logger.info("K or O Byte1 not set, got instead {}", newTuple);
+    private RowChangedEvent parseData(MessageTypeEnum type, ByteBuffer buffer) {
+        final int relationId = buffer.getInt();
+        final TableId tableId = tables.get(relationId);
+        if (null != tableId) {
+            String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
+            switch (newTuple) {
+                case "N":
+                case "K":
+                case "O":
+                    List<Object> data = new ArrayList<>();
+                    readTupleData(tableId, buffer, data);
+                    if (MessageTypeEnum.DELETE == type) {
+                        return new RowChangedEvent(tableId.tableName, type.name(), data, Collections.EMPTY_LIST);
+                    }
+                    return new RowChangedEvent(tableId.tableName, type.name(), Collections.EMPTY_LIST, data);
+
+                default:
+                    logger.info("N, K, O not set, got instead {}", newTuple);
+            }
         }
         }
-
-        return new RowChangedEvent(tables.get(relationId).tableName, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, data);
+        return null;
     }
     }
 
 
-    private void readTupleData(ByteBuffer msg, List<Object> data) {
+    private void readTupleData(TableId tableId, ByteBuffer msg, List<Object> data) {
         short nColumn = msg.getShort();
         short nColumn = msg.getShort();
-        for (int n = 0; n < nColumn; n++) {
-            String tupleContentType = new String(new byte[] {msg.get()}, 0, 1);
-            if (tupleContentType.equals("t")) {
-                int size = msg.getInt();
-                byte[] text = new byte[size];
-
-                for (int z = 0; z < size; z++) {
-                    text[z] = msg.get();
-                }
-                String content = new String(text, 0, size);
-                data.add(content);
-                continue;
-            }
+        if (nColumn != tableId.fields.size()) {
+            logger.warn("The column size of table '{}' is {}, but we has been received column size is {}.", tableId.tableName, tableId.fields.size(), nColumn);
 
 
-            if (tupleContentType.equals("n")) {
-                data.add(null);
-                continue;
+            // The table schema has been changed, we should be get a new table schema from db.
+            MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableId.tableName);
+            if (CollectionUtils.isEmpty(metaInfo.getColumn())) {
+                throw new ListenerException(String.format("The table column for '%s' is empty.", tableId.tableName));
             }
             }
+            tableId.fields = metaInfo.getColumn();
+            return;
+        }
 
 
-            if (tupleContentType.equals("u")) {
-                data.add("TOASTED");
+        for (int n = 0; n < nColumn; n++) {
+            String type = new String(new byte[]{msg.get()}, 0, 1);
+            switch (type) {
+                case "t":
+                    int size = msg.getInt();
+                    byte[] text = new byte[size];
+                    for (int z = 0; z < size; z++) {
+                        text[z] = msg.get();
+                    }
+                    data.add(resolveValue(tableId.fields.get(n).getTypeName(), new String(text, 0, size)));
+                    break;
+
+                case "n":
+                    data.add(null);
+                    break;
+
+                case "u":
+                    data.add("TOASTED");
+                    break;
+                default:
+                    logger.info("t, n, u not set, got instead {}", type);
             }
             }
         }
         }
     }
     }
 
 
     final class TableId {
     final class TableId {
-        Integer     oid;
-        String      tableName;
+        Integer oid;
+        String tableName;
         List<Field> fields = new LinkedList<>();
         List<Field> fields = new LinkedList<>();
 
 
-        public TableId(Integer oid, String tableName) {
+        public TableId(Integer oid, String tableName, List<Field> fields) {
             this.oid = oid;
             this.oid = oid;
             this.tableName = tableName;
             this.tableName = tableName;
+            this.fields = fields;
         }
         }
     }
     }
 
 

+ 1 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/TestDecodingMessageDecoder.java

@@ -3,9 +3,7 @@ package org.dbsyncer.listener.postgresql.decoder;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
-import org.dbsyncer.listener.postgresql.column.ColumnValueResolver;
 import org.dbsyncer.listener.postgresql.column.Lexer;
 import org.dbsyncer.listener.postgresql.column.Lexer;
-import org.dbsyncer.listener.postgresql.column.TestDecodingColumnValue;
 import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
 import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
@@ -25,7 +23,6 @@ import java.util.List;
 public class TestDecodingMessageDecoder extends AbstractMessageDecoder {
 public class TestDecodingMessageDecoder extends AbstractMessageDecoder {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
-    private static final ColumnValueResolver resolver = new ColumnValueResolver();
 
 
     @Override
     @Override
     public RowChangedEvent processMessage(ByteBuffer buffer) {
     public RowChangedEvent processMessage(ByteBuffer buffer) {
@@ -77,7 +74,7 @@ public class TestDecodingMessageDecoder extends AbstractMessageDecoder {
             String type = parseType(lexer);
             String type = parseType(lexer);
             lexer.skip(1);
             lexer.skip(1);
             String value = parseValue(lexer);
             String value = parseValue(lexer);
-            data.add(resolver.resolveValue(type, new TestDecodingColumnValue(value)));
+            data.add(resolveValue(type, value));
         }
         }
 
 
         RowChangedEvent event = null;
         RowChangedEvent event = null;