|
@@ -28,28 +28,15 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
|
|
|
private static final LocalDateTime PG_EPOCH = LocalDateTime.of(2000, 1, 1, 0, 0, 0);
|
|
|
|
|
|
+ private static final String GET_TABLE_SCHEMA = "select pg.oid, pg.typname, pg.typtype from pg_catalog.pg_type pg "
|
|
|
+ + "inner join (select ns.oid as nspoid, ns.nspname from pg_namespace ns where ns.nspname = "
|
|
|
+ + "(select (current_schemas(false))[s.r] from generate_series(1, array_upper(current_schemas(false), 1)) as s(r))"
|
|
|
+ + ") as n on n.nspoid = pg.typnamespace";
|
|
|
+
|
|
|
@Override
|
|
|
public void postProcessBeforeInitialization(DatabaseConnectorMapper connectorMapper) {
|
|
|
- String pubName = getPubName();
|
|
|
- String selectPublication = String.format("SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", pubName);
|
|
|
- Integer count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(selectPublication, Integer.class));
|
|
|
- if (0 < count) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- logger.info("Creating new publication '{}' for plugin '{}'", pubName, getOutputPlugin());
|
|
|
- try {
|
|
|
- String createPublication = String.format("CREATE PUBLICATION %s FOR ALL TABLES", pubName);
|
|
|
- logger.info("Creating Publication with statement '{}'", createPublication);
|
|
|
- connectorMapper.execute(databaseTemplate -> {
|
|
|
- databaseTemplate.execute(createPublication);
|
|
|
- return true;
|
|
|
- });
|
|
|
- } catch (Exception e) {
|
|
|
- throw new ListenerException(e.getCause());
|
|
|
- }
|
|
|
-
|
|
|
- // TODO read table schema
|
|
|
+ initPublication(connectorMapper);
|
|
|
+ readSchema(connectorMapper);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -99,6 +86,46 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public String getOutputPlugin() {
|
|
|
+ return MessageDecoderEnum.PG_OUTPUT.getType();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void withSlotOption(ChainedLogicalStreamBuilder builder) {
|
|
|
+ builder.withSlotOption("proto_version", 1);
|
|
|
+ builder.withSlotOption("publication_names", getPubName());
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getPubName() {
|
|
|
+ return String.format("dbs_pub_%s_%s", config.getSchema(), config.getUsername());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initPublication(DatabaseConnectorMapper connectorMapper) {
|
|
|
+ String pubName = getPubName();
|
|
|
+ String selectPublication = String.format("SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", pubName);
|
|
|
+ Integer count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(selectPublication, Integer.class));
|
|
|
+ if (0 < count) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info("Creating new publication '{}' for plugin '{}'", pubName, getOutputPlugin());
|
|
|
+ try {
|
|
|
+ String createPublication = String.format("CREATE PUBLICATION %s FOR ALL TABLES", pubName);
|
|
|
+ logger.info("Creating Publication with statement '{}'", createPublication);
|
|
|
+ connectorMapper.execute(databaseTemplate -> {
|
|
|
+ databaseTemplate.execute(createPublication);
|
|
|
+ return true;
|
|
|
+ });
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new ListenerException(e.getCause());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void readSchema(DatabaseConnectorMapper connectorMapper){
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
private RowChangedEvent parseDelete(ByteBuffer buffer) {
|
|
|
int relationId = buffer.getInt();
|
|
|
logger.info("Delete table {}", relationId);
|
|
@@ -159,21 +186,6 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
return new RowChangedEvent(String.valueOf(relationId), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, data);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public String getOutputPlugin() {
|
|
|
- return MessageDecoderEnum.PG_OUTPUT.getType();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void withSlotOption(ChainedLogicalStreamBuilder builder) {
|
|
|
- builder.withSlotOption("proto_version", 1);
|
|
|
- builder.withSlotOption("publication_names", getPubName());
|
|
|
- }
|
|
|
-
|
|
|
- private String getPubName() {
|
|
|
- return String.format("dbs_pub_%s_%s", config.getSchema(), config.getUsername());
|
|
|
- }
|
|
|
-
|
|
|
private void readTupleData(ByteBuffer msg, List<Object> data) {
|
|
|
short nColumn = msg.getShort();
|
|
|
for (int n = 0; n < nColumn; n++) {
|
|
@@ -201,4 +213,15 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ final class TableId{
|
|
|
+
|
|
|
+ private String tableId;
|
|
|
+ private String tableName;
|
|
|
+
|
|
|
+ public TableId(String tableId, String tableName) {
|
|
|
+ this.tableId = tableId;
|
|
|
+ this.tableName = tableName;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|