|
@@ -1,21 +1,24 @@
|
|
|
package org.dbsyncer.listener.postgresql.decoder;
|
|
|
|
|
|
import org.dbsyncer.common.event.RowChangedEvent;
|
|
|
+import org.dbsyncer.common.util.CollectionUtils;
|
|
|
import org.dbsyncer.connector.constant.ConnectorConstant;
|
|
|
import org.dbsyncer.connector.database.DatabaseConnectorMapper;
|
|
|
import org.dbsyncer.listener.ListenerException;
|
|
|
import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
|
|
|
import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
|
|
|
import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
|
|
|
+import org.postgresql.jdbc.PgResultSet;
|
|
|
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.nio.ByteBuffer;
|
|
|
+import java.sql.Connection;
|
|
|
+import java.sql.DatabaseMetaData;
|
|
|
+import java.sql.ResultSet;
|
|
|
import java.time.LocalDateTime;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.List;
|
|
|
+import java.util.*;
|
|
|
|
|
|
/**
|
|
|
* @author AE86
|
|
@@ -28,10 +31,9 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
|
|
|
private static final LocalDateTime PG_EPOCH = LocalDateTime.of(2000, 1, 1, 0, 0, 0);
|
|
|
|
|
|
- private static final String GET_TABLE_SCHEMA = "select pg.oid, pg.typname, pg.typtype from pg_catalog.pg_type pg "
|
|
|
- + "inner join (select ns.oid as nspoid, ns.nspname from pg_namespace ns where ns.nspname = "
|
|
|
- + "(select (current_schemas(false))[s.r] from generate_series(1, array_upper(current_schemas(false), 1)) as s(r))"
|
|
|
- + ") as n on n.nspoid = pg.typnamespace";
|
|
|
+ private static final String GET_TABLE_SCHEMA = "select oid,relname as tableName from pg_class t inner join (select ns.oid as nspoid, ns.nspname from pg_namespace ns where ns.nspname = (select (current_schemas(false))[s.r] from generate_series(1, array_upper(current_schemas(false), 1)) as s(r))) as n on n.nspoid = t.relnamespace";
|
|
|
+
|
|
|
+ private static final Map<Integer, TableId> tables = new LinkedHashMap<>();
|
|
|
|
|
|
@Override
|
|
|
public void postProcessBeforeInitialization(DatabaseConnectorMapper connectorMapper) {
|
|
@@ -122,13 +124,36 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void readSchema(DatabaseConnectorMapper connectorMapper){
|
|
|
+ private void readSchema(DatabaseConnectorMapper connectorMapper) {
|
|
|
+ List<Map> schemas = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(GET_TABLE_SCHEMA));
|
|
|
+ if (!CollectionUtils.isEmpty(schemas)) {
|
|
|
+ schemas.forEach(map -> {
|
|
|
+ Long oid = (Long) map.get("oid");
|
|
|
+ String tableName = (String) map.get("tableName");
|
|
|
+ tables.put(oid.intValue(), new TableId(oid.intValue(), tableName));
|
|
|
+ });
|
|
|
+ }
|
|
|
|
|
|
+ try {
|
|
|
+ Connection connection = connectorMapper.getConnection();
|
|
|
+ DatabaseMetaData metaData = connection.getMetaData();
|
|
|
+
|
|
|
+ // read column
|
|
|
+ for (TableId tableId : tables.values()) {
|
|
|
+ ResultSet rs = metaData.getColumns("", config.getSchema(), tableId.tableName, null);
|
|
|
+ PgResultSet pgResultSet = (PgResultSet) rs;
|
|
|
+ while (pgResultSet.next()) {
|
|
|
+ pgResultSet.getRow();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new ListenerException(e.getCause());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private RowChangedEvent parseDelete(ByteBuffer buffer) {
|
|
|
int relationId = buffer.getInt();
|
|
|
- logger.info("Delete table {}", relationId);
|
|
|
+ logger.info("Delete table {}", tables.get(relationId).tableName);
|
|
|
|
|
|
List<Object> data = new ArrayList<>();
|
|
|
String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
|
|
@@ -140,12 +165,12 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
default:
|
|
|
logger.info("K not set, got instead {}", newTuple);
|
|
|
}
|
|
|
- return new RowChangedEvent(String.valueOf(relationId), ConnectorConstant.OPERTION_INSERT, data, Collections.EMPTY_LIST);
|
|
|
+ return new RowChangedEvent(tables.get(relationId).tableName, ConnectorConstant.OPERTION_INSERT, data, Collections.EMPTY_LIST);
|
|
|
}
|
|
|
|
|
|
private RowChangedEvent parseInsert(ByteBuffer buffer) {
|
|
|
int relationId = buffer.getInt();
|
|
|
- logger.info("Insert table {}", relationId);
|
|
|
+ logger.info("Insert table {}", tables.get(relationId).tableName);
|
|
|
|
|
|
List<Object> data = new ArrayList<>();
|
|
|
String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
|
|
@@ -156,12 +181,12 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
default:
|
|
|
logger.info("N not set, got instead {}", newTuple);
|
|
|
}
|
|
|
- return new RowChangedEvent(String.valueOf(relationId), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, data);
|
|
|
+ return new RowChangedEvent(tables.get(relationId).tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, data);
|
|
|
}
|
|
|
|
|
|
private RowChangedEvent parseUpdate(ByteBuffer buffer) {
|
|
|
int relationId = buffer.getInt();
|
|
|
- logger.info("Update table {}", relationId);
|
|
|
+ logger.info("Update table {}", tables.get(relationId).tableName);
|
|
|
|
|
|
List<Object> data = new ArrayList<>();
|
|
|
String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
|
|
@@ -183,7 +208,7 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
logger.info("K or O Byte1 not set, got instead {}", newTuple);
|
|
|
}
|
|
|
|
|
|
- return new RowChangedEvent(String.valueOf(relationId), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, data);
|
|
|
+ return new RowChangedEvent(tables.get(relationId).tableName, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, data);
|
|
|
}
|
|
|
|
|
|
private void readTupleData(ByteBuffer msg, List<Object> data) {
|
|
@@ -213,13 +238,12 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- final class TableId{
|
|
|
-
|
|
|
- private String tableId;
|
|
|
- private String tableName;
|
|
|
+ final class TableId {
|
|
|
+ Integer oid;
|
|
|
+ String tableName;
|
|
|
|
|
|
- public TableId(String tableId, String tableName) {
|
|
|
- this.tableId = tableId;
|
|
|
+ public TableId(Integer oid, String tableName) {
|
|
|
+ this.oid = oid;
|
|
|
this.tableName = tableName;
|
|
|
}
|
|
|
}
|