|
@@ -17,7 +17,6 @@ import org.postgresql.util.PSQLException;
|
|
|
import org.postgresql.util.PSQLState;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.util.Assert;
|
|
|
|
|
|
import java.nio.ByteBuffer;
|
|
@@ -41,6 +40,7 @@ public class PostgreSQLExtractor extends AbstractExtractor {
|
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
private static final String GET_SLOT = "select count(1) from pg_replication_slots where database = ? and slot_name = ? and plugin = ?";
|
|
|
+ private static final String GET_RESTART_LSN = "select restart_lsn from pg_replication_slots where database = ? and slot_name = ? and plugin = ?";
|
|
|
private static final String GET_ROLE = "SELECT r.rolcanlogin AS login, r.rolreplication AS replication, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rds_superuser') AS BOOL) IS TRUE AS superuser, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rdsadmin') AS BOOL) IS TRUE AS admin, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rdsrepladmin') AS BOOL) IS TRUE AS rep_admin FROM pg_roles r WHERE r.rolname = current_user";
|
|
|
private static final String GET_DATABASE = "SELECT current_database()";
|
|
|
private static final String GET_WAL_LEVEL = "SHOW WAL_LEVEL";
|
|
@@ -58,6 +58,7 @@ public class PostgreSQLExtractor extends AbstractExtractor {
|
|
|
private MessageDecoder messageDecoder;
|
|
|
private Worker worker;
|
|
|
private LogSequenceNumber startLsn;
|
|
|
+ private String database;
|
|
|
|
|
|
@Override
|
|
|
public void start() {
|
|
@@ -89,6 +90,7 @@ public class PostgreSQLExtractor extends AbstractExtractor {
|
|
|
throw new ListenerException(String.format("Postgres roles LOGIN and REPLICATION are not assigned to user: %s", config.getUsername()));
|
|
|
}
|
|
|
|
|
|
+ database = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_DATABASE, String.class));
|
|
|
messageDecoder = MessageDecoderEnum.getMessageDecoder(config.getProperty(PLUGIN_NAME));
|
|
|
messageDecoder.setConfig(config);
|
|
|
messageDecoder.postProcessBeforeInitialization(connectorFactory, connectorMapper);
|
|
@@ -145,20 +147,7 @@ public class PostgreSQLExtractor extends AbstractExtractor {
|
|
|
sleepInMills(10L);
|
|
|
}
|
|
|
|
|
|
- private LogSequenceNumber readLastLsn() throws SQLException {
|
|
|
- if (!snapshot.containsKey(LSN_POSITION)) {
|
|
|
- LogSequenceNumber lsn = currentXLogLocation();
|
|
|
- if (null == lsn || lsn.asLong() == 0) {
|
|
|
- throw new ListenerException("No maximum LSN recorded in the database");
|
|
|
- }
|
|
|
- snapshot.put(LSN_POSITION, lsn.asString());
|
|
|
- }
|
|
|
-
|
|
|
- return LogSequenceNumber.valueOf(snapshot.get(LSN_POSITION));
|
|
|
- }
|
|
|
-
|
|
|
private void createReplicationStream(PGConnection pgConnection) throws SQLException {
|
|
|
- this.startLsn = readLastLsn();
|
|
|
ChainedLogicalStreamBuilder streamBuilder = pgConnection
|
|
|
.getReplicationAPI()
|
|
|
.replicationStream()
|
|
@@ -172,7 +161,6 @@ public class PostgreSQLExtractor extends AbstractExtractor {
|
|
|
}
|
|
|
|
|
|
private void createReplicationSlot(PGConnection pgConnection) throws SQLException {
|
|
|
- String database = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_DATABASE, String.class));
|
|
|
String slotName = messageDecoder.getSlotName();
|
|
|
String plugin = messageDecoder.getOutputPlugin();
|
|
|
boolean existSlot = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_SLOT, new Object[]{database, slotName, plugin}, Integer.class) > 0);
|
|
@@ -183,7 +171,20 @@ public class PostgreSQLExtractor extends AbstractExtractor {
|
|
|
.withSlotName(slotName)
|
|
|
.withOutputPlugin(plugin)
|
|
|
.make();
|
|
|
+
|
|
|
+ // wait for create replication slot to have finished
|
|
|
+ sleepInMills(300);
|
|
|
}
|
|
|
+
|
|
|
+ if (!snapshot.containsKey(LSN_POSITION)) {
|
|
|
+ LogSequenceNumber lsn = connectorMapper.execute(databaseTemplate -> LogSequenceNumber.valueOf(databaseTemplate.queryForObject(GET_RESTART_LSN, new Object[] {database, slotName, plugin}, String.class)));
|
|
|
+ if (null == lsn || lsn.asLong() == 0) {
|
|
|
+ throw new ListenerException("No maximum LSN recorded in the database");
|
|
|
+ }
|
|
|
+ snapshot.put(LSN_POSITION, lsn.asString());
|
|
|
+ }
|
|
|
+
|
|
|
+ this.startLsn = LogSequenceNumber.valueOf(snapshot.get(LSN_POSITION));
|
|
|
}
|
|
|
|
|
|
private void dropReplicationSlot() {
|
|
@@ -223,12 +224,6 @@ public class PostgreSQLExtractor extends AbstractExtractor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private LogSequenceNumber currentXLogLocation() throws SQLException {
|
|
|
- int majorVersion = connection.getMetaData().getDatabaseMajorVersion();
|
|
|
- String sql = majorVersion >= 10 ? "select * from pg_current_wal_lsn()" : "select * from pg_current_xlog_location()";
|
|
|
- return connectorMapper.execute(databaseTemplate -> LogSequenceNumber.valueOf(databaseTemplate.queryForObject(sql, String.class)));
|
|
|
- }
|
|
|
-
|
|
|
private void recover() {
|
|
|
connectLock.lock();
|
|
|
try {
|