|
@@ -1,7 +1,7 @@
|
|
|
package org.dbsyncer.connector.postgresql;
|
|
|
|
|
|
import org.dbsyncer.common.util.BooleanUtil;
|
|
|
-import org.dbsyncer.connector.AbstractDatabaseListener;
|
|
|
+import org.dbsyncer.sdk.listener.AbstractDatabaseListener;
|
|
|
import org.dbsyncer.connector.ConnectorException;
|
|
|
import org.dbsyncer.connector.postgresql.enums.MessageDecoderEnum;
|
|
|
import org.dbsyncer.sdk.config.DatabaseConfig;
|
|
@@ -52,7 +52,7 @@ public class PostgreSQLListener extends AbstractDatabaseListener {
|
|
|
private final Lock connectLock = new ReentrantLock();
|
|
|
private volatile boolean connected;
|
|
|
private DatabaseConfig config;
|
|
|
- private DatabaseConnectorInstance connectorInstance;
|
|
|
+ private DatabaseConnectorInstance instance;
|
|
|
private Connection connection;
|
|
|
private PGReplicationStream stream;
|
|
|
private boolean dropSlotOnClose;
|
|
@@ -70,15 +70,15 @@ public class PostgreSQLListener extends AbstractDatabaseListener {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- connectorInstance = (DatabaseConnectorInstance) connectorFactory.connect(connectorConfig);
|
|
|
- config = connectorInstance.getConfig();
|
|
|
+ instance = (DatabaseConnectorInstance) connectorInstance;
|
|
|
+ config = instance.getConfig();
|
|
|
|
|
|
- final String walLevel = connectorInstance.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_WAL_LEVEL, String.class));
|
|
|
+ final String walLevel = instance.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_WAL_LEVEL, String.class));
|
|
|
if (!DEFAULT_WAL_LEVEL.equals(walLevel)) {
|
|
|
throw new ConnectorException(String.format("Postgres server wal_level property must be \"%s\" but is: %s", DEFAULT_WAL_LEVEL, walLevel));
|
|
|
}
|
|
|
|
|
|
- final boolean hasAuth = connectorInstance.execute(databaseTemplate -> {
|
|
|
+ final boolean hasAuth = instance.execute(databaseTemplate -> {
|
|
|
Map rs = databaseTemplate.queryForMap(GET_ROLE);
|
|
|
Boolean login = (Boolean) rs.getOrDefault("login", false);
|
|
|
Boolean replication = (Boolean) rs.getOrDefault("replication", false);
|
|
@@ -91,11 +91,11 @@ public class PostgreSQLListener extends AbstractDatabaseListener {
|
|
|
throw new ConnectorException(String.format("Postgres roles LOGIN and REPLICATION are not assigned to user: %s", config.getUsername()));
|
|
|
}
|
|
|
|
|
|
- database = connectorInstance.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_DATABASE, String.class));
|
|
|
+ database = instance.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_DATABASE, String.class));
|
|
|
messageDecoder = MessageDecoderEnum.getMessageDecoder(config.getProperty(PLUGIN_NAME));
|
|
|
messageDecoder.setMetaId(metaId);
|
|
|
messageDecoder.setConfig(config);
|
|
|
- messageDecoder.postProcessBeforeInitialization(connectorFactory, connectorInstance);
|
|
|
+ messageDecoder.postProcessBeforeInitialization(connectorService, instance);
|
|
|
dropSlotOnClose = BooleanUtil.toBoolean(config.getProperty(DROP_SLOT_ON_CLOSE, "true"));
|
|
|
|
|
|
connect();
|
|
@@ -170,7 +170,7 @@ public class PostgreSQLListener extends AbstractDatabaseListener {
|
|
|
private void createReplicationSlot(PGConnection pgConnection) throws SQLException {
|
|
|
String slotName = messageDecoder.getSlotName();
|
|
|
String plugin = messageDecoder.getOutputPlugin();
|
|
|
- boolean existSlot = connectorInstance.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_SLOT, new Object[]{database, slotName, plugin}, Integer.class) > 0);
|
|
|
+ boolean existSlot = instance.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_SLOT, new Object[]{database, slotName, plugin}, Integer.class) > 0);
|
|
|
if (!existSlot) {
|
|
|
pgConnection.getReplicationAPI()
|
|
|
.createReplicationSlot()
|
|
@@ -184,7 +184,7 @@ public class PostgreSQLListener extends AbstractDatabaseListener {
|
|
|
}
|
|
|
|
|
|
if (!snapshot.containsKey(LSN_POSITION)) {
|
|
|
- LogSequenceNumber lsn = connectorInstance.execute(databaseTemplate -> LogSequenceNumber.valueOf(databaseTemplate.queryForObject(GET_RESTART_LSN, new Object[]{database, slotName, plugin}, String.class)));
|
|
|
+ LogSequenceNumber lsn = instance.execute(databaseTemplate -> LogSequenceNumber.valueOf(databaseTemplate.queryForObject(GET_RESTART_LSN, new Object[]{database, slotName, plugin}, String.class)));
|
|
|
if (null == lsn || lsn.asLong() == 0) {
|
|
|
throw new ConnectorException("No maximum LSN recorded in the database");
|
|
|
}
|
|
@@ -204,7 +204,7 @@ public class PostgreSQLListener extends AbstractDatabaseListener {
|
|
|
final int ATTEMPTS = 3;
|
|
|
for (int i = 0; i < ATTEMPTS; i++) {
|
|
|
try {
|
|
|
- connectorInstance.execute(databaseTemplate -> {
|
|
|
+ instance.execute(databaseTemplate -> {
|
|
|
databaseTemplate.execute(String.format("select pg_drop_replication_slot('%s')", slotName));
|
|
|
return true;
|
|
|
});
|