|
@@ -2,21 +2,20 @@ package org.dbsyncer.listener.postgresql.decoder;
|
|
|
|
|
|
import org.dbsyncer.common.event.RowChangedEvent;
|
|
|
import org.dbsyncer.common.util.CollectionUtils;
|
|
|
-import org.dbsyncer.connector.constant.ConnectorConstant;
|
|
|
+import org.dbsyncer.connector.ConnectorFactory;
|
|
|
+import org.dbsyncer.connector.model.Field;
|
|
|
+import org.dbsyncer.connector.model.MetaInfo;
|
|
|
import org.dbsyncer.connector.database.DatabaseConnectorMapper;
|
|
|
import org.dbsyncer.listener.ListenerException;
|
|
|
import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
|
|
|
import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
|
|
|
import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
|
|
|
-import org.postgresql.jdbc.PgResultSet;
|
|
|
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.util.Assert;
|
|
|
|
|
|
import java.nio.ByteBuffer;
|
|
|
-import java.sql.Connection;
|
|
|
-import java.sql.DatabaseMetaData;
|
|
|
-import java.sql.ResultSet;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.util.*;
|
|
|
|
|
@@ -30,15 +29,17 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
private static final LocalDateTime PG_EPOCH = LocalDateTime.of(2000, 1, 1, 0, 0, 0);
|
|
|
-
|
|
|
- private static final String GET_TABLE_SCHEMA = "select oid,relname as tableName from pg_class t inner join (select ns.oid as nspoid, ns.nspname from pg_namespace ns where ns.nspname = (select (current_schemas(false))[s.r] from generate_series(1, array_upper(current_schemas(false), 1)) as s(r))) as n on n.nspoid = t.relnamespace";
|
|
|
-
|
|
|
+ private static final String GET_TABLE_SCHEMA = "select oid,relname as tableName from pg_class t inner join (select ns.oid as nspoid, ns.nspname from pg_namespace ns where ns.nspname = (select (current_schemas(false))[s.r] from generate_series(1, array_upper(current_schemas(false), 1)) as s(r))) as n on n.nspoid = t.relnamespace where relkind = 'r'";
|
|
|
private static final Map<Integer, TableId> tables = new LinkedHashMap<>();
|
|
|
+ private ConnectorFactory connectorFactory;
|
|
|
+ private DatabaseConnectorMapper connectorMapper;
|
|
|
|
|
|
@Override
|
|
|
- public void postProcessBeforeInitialization(DatabaseConnectorMapper connectorMapper) {
|
|
|
- initPublication(connectorMapper);
|
|
|
- readSchema(connectorMapper);
|
|
|
+ public void postProcessBeforeInitialization(ConnectorFactory connectorFactory, DatabaseConnectorMapper connectorMapper) {
|
|
|
+ this.connectorFactory = connectorFactory;
|
|
|
+ this.connectorMapper = connectorMapper;
|
|
|
+ initPublication();
|
|
|
+ readSchema();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -47,20 +48,12 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
|
|
|
}
|
|
|
|
|
|
- RowChangedEvent event = null;
|
|
|
MessageTypeEnum type = MessageTypeEnum.getType((char) buffer.get());
|
|
|
switch (type) {
|
|
|
case UPDATE:
|
|
|
- event = parseUpdate(buffer);
|
|
|
- break;
|
|
|
-
|
|
|
case INSERT:
|
|
|
- event = parseInsert(buffer);
|
|
|
- break;
|
|
|
-
|
|
|
case DELETE:
|
|
|
- event = parseDelete(buffer);
|
|
|
- break;
|
|
|
+ return parseData(type, buffer);
|
|
|
|
|
|
case BEGIN:
|
|
|
long beginLsn = buffer.getLong();
|
|
@@ -81,10 +74,6 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
logger.info("Type {} not implemented", type.name());
|
|
|
}
|
|
|
|
|
|
- if (null != event) {
|
|
|
- logger.info(event.toString());
|
|
|
- }
|
|
|
-
|
|
|
return null;
|
|
|
}
|
|
|
|
|
@@ -103,7 +92,7 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
return String.format("dbs_pub_%s_%s", config.getSchema(), config.getUsername());
|
|
|
}
|
|
|
|
|
|
- private void initPublication(DatabaseConnectorMapper connectorMapper) {
|
|
|
+ private void initPublication() {
|
|
|
String pubName = getPubName();
|
|
|
String selectPublication = String.format("SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", pubName);
|
|
|
Integer count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(selectPublication, Integer.class));
|
|
@@ -124,116 +113,77 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void readSchema(DatabaseConnectorMapper connectorMapper) {
|
|
|
+ private void readSchema() {
|
|
|
List<Map> schemas = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(GET_TABLE_SCHEMA));
|
|
|
if (!CollectionUtils.isEmpty(schemas)) {
|
|
|
schemas.forEach(map -> {
|
|
|
Long oid = (Long) map.get("oid");
|
|
|
String tableName = (String) map.get("tableName");
|
|
|
- tables.put(oid.intValue(), new TableId(oid.intValue(), tableName));
|
|
|
+ MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableName);
|
|
|
+ Assert.notEmpty(metaInfo.getColumn(), String.format("The table column for '%s' must not be empty.", tableName));
|
|
|
+ tables.put(oid.intValue(), new TableId(oid.intValue(), tableName, metaInfo.getColumn()));
|
|
|
});
|
|
|
}
|
|
|
-
|
|
|
- try {
|
|
|
- Connection connection = connectorMapper.getConnection();
|
|
|
- DatabaseMetaData metaData = connection.getMetaData();
|
|
|
-
|
|
|
- // read column
|
|
|
- for (TableId tableId : tables.values()) {
|
|
|
- ResultSet rs = metaData.getColumns("", config.getSchema(), tableId.tableName, null);
|
|
|
- PgResultSet pgResultSet = (PgResultSet) rs;
|
|
|
- while (pgResultSet.next()) {
|
|
|
- pgResultSet.getRow();
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- throw new ListenerException(e.getCause());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private RowChangedEvent parseDelete(ByteBuffer buffer) {
|
|
|
- int relationId = buffer.getInt();
|
|
|
- logger.info("Delete table {}", tables.get(relationId).tableName);
|
|
|
-
|
|
|
- List<Object> data = new ArrayList<>();
|
|
|
- String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
|
|
|
-
|
|
|
- switch (newTuple) {
|
|
|
- case "K":
|
|
|
- readTupleData(buffer, data);
|
|
|
- break;
|
|
|
- default:
|
|
|
- logger.info("K not set, got instead {}", newTuple);
|
|
|
- }
|
|
|
- return new RowChangedEvent(tables.get(relationId).tableName, ConnectorConstant.OPERTION_INSERT, data, Collections.EMPTY_LIST);
|
|
|
- }
|
|
|
-
|
|
|
- private RowChangedEvent parseInsert(ByteBuffer buffer) {
|
|
|
- int relationId = buffer.getInt();
|
|
|
- logger.info("Insert table {}", tables.get(relationId).tableName);
|
|
|
-
|
|
|
- List<Object> data = new ArrayList<>();
|
|
|
- String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
|
|
|
- switch (newTuple) {
|
|
|
- case "N":
|
|
|
- readTupleData(buffer, data);
|
|
|
- break;
|
|
|
- default:
|
|
|
- logger.info("N not set, got instead {}", newTuple);
|
|
|
- }
|
|
|
- return new RowChangedEvent(tables.get(relationId).tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, data);
|
|
|
}
|
|
|
|
|
|
- private RowChangedEvent parseUpdate(ByteBuffer buffer) {
|
|
|
- int relationId = buffer.getInt();
|
|
|
- logger.info("Update table {}", tables.get(relationId).tableName);
|
|
|
-
|
|
|
- List<Object> data = new ArrayList<>();
|
|
|
- String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
|
|
|
- switch (newTuple) {
|
|
|
- case "K":
|
|
|
- logger.info("Key update");
|
|
|
- logger.info("Old Key");
|
|
|
- readTupleData(buffer, data);
|
|
|
- break;
|
|
|
- case "O":
|
|
|
- logger.info("Value update");
|
|
|
- logger.info("Old Value");
|
|
|
- readTupleData(buffer, data);
|
|
|
- break;
|
|
|
- case "N":
|
|
|
- readTupleData(buffer, data);
|
|
|
- break;
|
|
|
- default:
|
|
|
- logger.info("K or O Byte1 not set, got instead {}", newTuple);
|
|
|
+ private RowChangedEvent parseData(MessageTypeEnum type, ByteBuffer buffer) {
|
|
|
+ final int relationId = buffer.getInt();
|
|
|
+ final TableId tableId = tables.get(relationId);
|
|
|
+ if (null != tableId) {
|
|
|
+ String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
|
|
|
+ switch (newTuple) {
|
|
|
+ case "N":
|
|
|
+ case "K":
|
|
|
+ case "O":
|
|
|
+ List<Object> data = new ArrayList<>();
|
|
|
+ readTupleData(tableId, buffer, data);
|
|
|
+ if (MessageTypeEnum.DELETE == type) {
|
|
|
+ return new RowChangedEvent(tableId.tableName, type.name(), data, Collections.EMPTY_LIST);
|
|
|
+ }
|
|
|
+ return new RowChangedEvent(tableId.tableName, type.name(), Collections.EMPTY_LIST, data);
|
|
|
+
|
|
|
+ default:
|
|
|
+ logger.info("N, K, O not set, got instead {}", newTuple);
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- return new RowChangedEvent(tables.get(relationId).tableName, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, data);
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
- private void readTupleData(ByteBuffer msg, List<Object> data) {
|
|
|
+ private void readTupleData(TableId tableId, ByteBuffer msg, List<Object> data) {
|
|
|
short nColumn = msg.getShort();
|
|
|
- for (int n = 0; n < nColumn; n++) {
|
|
|
- String tupleContentType = new String(new byte[]{msg.get()}, 0, 1);
|
|
|
- if (tupleContentType.equals("t")) {
|
|
|
- int size = msg.getInt();
|
|
|
- byte[] text = new byte[size];
|
|
|
-
|
|
|
- for (int z = 0; z < size; z++) {
|
|
|
- text[z] = msg.get();
|
|
|
- }
|
|
|
- String content = new String(text, 0, size);
|
|
|
- data.add(content);
|
|
|
- continue;
|
|
|
- }
|
|
|
+ if (nColumn != tableId.fields.size()) {
|
|
|
+ logger.warn("The column size of table '{}' is {}, but we has been received column size is {}.", tableId.tableName, tableId.fields.size(), nColumn);
|
|
|
|
|
|
- if (tupleContentType.equals("n")) {
|
|
|
- data.add(null);
|
|
|
- continue;
|
|
|
+ // The table schema has been changed, we should be get a new table schema from db.
|
|
|
+ MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableId.tableName);
|
|
|
+ if (CollectionUtils.isEmpty(metaInfo.getColumn())) {
|
|
|
+ throw new ListenerException(String.format("The table column for '%s' is empty.", tableId.tableName));
|
|
|
}
|
|
|
+ tableId.fields = metaInfo.getColumn();
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- if (tupleContentType.equals("u")) {
|
|
|
- data.add("TOASTED");
|
|
|
+ for (int n = 0; n < nColumn; n++) {
|
|
|
+ String type = new String(new byte[]{msg.get()}, 0, 1);
|
|
|
+ switch (type) {
|
|
|
+ case "t":
|
|
|
+ int size = msg.getInt();
|
|
|
+ byte[] text = new byte[size];
|
|
|
+ for (int z = 0; z < size; z++) {
|
|
|
+ text[z] = msg.get();
|
|
|
+ }
|
|
|
+ data.add(resolveValue(tableId.fields.get(n).getTypeName(), new String(text, 0, size)));
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "n":
|
|
|
+ data.add(null);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "u":
|
|
|
+ data.add("TOASTED");
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ logger.info("t, n, u not set, got instead {}", type);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -241,10 +191,12 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
final class TableId {
|
|
|
Integer oid;
|
|
|
String tableName;
|
|
|
+ List<Field> fields;
|
|
|
|
|
|
- public TableId(Integer oid, String tableName) {
|
|
|
+ public TableId(Integer oid, String tableName, List<Field> fields) {
|
|
|
this.oid = oid;
|
|
|
this.tableName = tableName;
|
|
|
+ this.fields = fields;
|
|
|
}
|
|
|
}
|
|
|
|