Преглед изворни кода

支持PG多实例增量监听

AE86 пре 3 година
родитељ
комит
5f36345879

+ 5 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 public abstract class AbstractExtractor implements Extractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
+    protected String metaId;
     protected Executor taskExecutor;
     protected ConnectorFactory connectorFactory;
     protected ScheduledTaskService scheduledTaskService;
@@ -81,6 +82,10 @@ public abstract class AbstractExtractor implements Extractor {
         }
     }
 
+    public void setMetaId(String metaId) {
+        this.metaId = metaId;
+    }
+
     public void setTaskExecutor(Executor taskExecutor) {
         this.taskExecutor = taskExecutor;
     }

+ 8 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/AbstractMessageDecoder.java

@@ -16,6 +16,8 @@ import java.nio.ByteBuffer;
  */
 public abstract class AbstractMessageDecoder implements MessageDecoder {
 
+    protected String metaId;
+
     protected DatabaseConfig config;
 
     private ColumnValue value = new PgColumnValue();
@@ -49,7 +51,12 @@ public abstract class AbstractMessageDecoder implements MessageDecoder {
 
     @Override
     public String getSlotName() {
-        return String.format("dbs_slot_%s_%s", config.getSchema(), config.getUsername());
+        return String.format("dbs_slot_%s_%s_%s", config.getSchema(), config.getUsername(), metaId);
+    }
+
+    @Override
+    public void setMetaId(String metaId) {
+        this.metaId = metaId;
     }
 
     @Override

+ 2 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/MessageDecoder.java

@@ -29,6 +29,8 @@ public interface MessageDecoder {
 
     void withSlotOption(ChainedLogicalStreamBuilder builder);
 
+    void setMetaId(String metaId);
+
     void setConfig(DatabaseConfig config);
 
 }

+ 1 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java

@@ -93,6 +93,7 @@ public class PostgreSQLExtractor extends AbstractDatabaseExtractor {
 
             database = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_DATABASE, String.class));
             messageDecoder = MessageDecoderEnum.getMessageDecoder(config.getProperty(PLUGIN_NAME));
+            messageDecoder.setMetaId(metaId);
             messageDecoder.setConfig(config);
             messageDecoder.postProcessBeforeInitialization(connectorFactory, connectorMapper);
             dropSlotOnClose = BooleanUtil.toBoolean(config.getProperty(DROP_SLOT_ON_CLOSE, "true"));

+ 2 - 1
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -164,7 +164,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
     }
 
     private void setExtractorConfig(AbstractExtractor extractor, ConnectorConfig connector, ListenerConfig listener,
-                                    Map<String, String> snapshot, Event event) {
+                                    Map<String, String> snapshot, AbstractListener event) {
         extractor.setTaskExecutor(taskExecutor);
         extractor.setConnectorFactory(connectorFactory);
         extractor.setScheduledTaskService(scheduledTaskService);
@@ -172,6 +172,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         extractor.setListenerConfig(listener);
         extractor.setSnapshot(snapshot);
         extractor.addListener(event);
+        extractor.setMetaId(event.metaId);
     }
 
     abstract class AbstractListener implements Event {