瀏覽代碼

add schema

AE86 3 年之前
父節點
當前提交
53e57c9d5f

+ 2 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/MessageDecoder.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.listener.postgresql;
 
 import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.postgresql.replication.LogSequenceNumber;
@@ -15,7 +16,7 @@ import java.nio.ByteBuffer;
  */
 public interface MessageDecoder {
 
-    default void postProcessBeforeInitialization(DatabaseConnectorMapper connectorMapper) {
+    default void postProcessBeforeInitialization(ConnectorFactory connectorFactory, DatabaseConnectorMapper connectorMapper) {
     }
 
     boolean skipMessage(ByteBuffer buffer, LogSequenceNumber startLsn, LogSequenceNumber lastReceiveLsn);

+ 2 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java

@@ -17,6 +17,7 @@ import org.postgresql.util.PSQLException;
 import org.postgresql.util.PSQLState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.Assert;
 
 import java.nio.ByteBuffer;
@@ -90,7 +91,7 @@ public class PostgreSQLExtractor extends AbstractExtractor {
 
             messageDecoder = MessageDecoderEnum.getMessageDecoder(config.getProperty(PLUGIN_NAME));
             messageDecoder.setConfig(config);
-            messageDecoder.postProcessBeforeInitialization(connectorMapper);
+            messageDecoder.postProcessBeforeInitialization(connectorFactory, connectorMapper);
             dropSlotOnClose = BooleanUtil.toBoolean(config.getProperty(DROP_SLOT_ON_CLOSE, "true"));
 
             connect();

+ 19 - 34
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/PgOutputMessageDecoder.java

@@ -2,21 +2,20 @@ package org.dbsyncer.listener.postgresql.decoder;
 
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
 import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
-import org.postgresql.jdbc.PgResultSet;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
 import java.time.LocalDateTime;
 import java.util.*;
 
@@ -31,14 +30,14 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
 
     private static final LocalDateTime PG_EPOCH = LocalDateTime.of(2000, 1, 1, 0, 0, 0);
 
-    private static final String GET_TABLE_SCHEMA = "select oid,relname as tableName from pg_class t inner join (select ns.oid as nspoid, ns.nspname from pg_namespace ns where ns.nspname = (select (current_schemas(false))[s.r] from generate_series(1, array_upper(current_schemas(false), 1)) as s(r))) as n on n.nspoid = t.relnamespace";
+    private static final String GET_TABLE_SCHEMA = "select oid,relname as tableName from pg_class t inner join (select ns.oid as nspoid, ns.nspname from pg_namespace ns where ns.nspname = (select (current_schemas(false))[s.r] from generate_series(1, array_upper(current_schemas(false), 1)) as s(r))) as n on n.nspoid = t.relnamespace where relkind = 'r'";
 
     private static final Map<Integer, TableId> tables = new LinkedHashMap<>();
 
     @Override
-    public void postProcessBeforeInitialization(DatabaseConnectorMapper connectorMapper) {
+    public void postProcessBeforeInitialization(ConnectorFactory connectorFactory, DatabaseConnectorMapper connectorMapper) {
         initPublication(connectorMapper);
-        readSchema(connectorMapper);
+        readSchema(connectorFactory, connectorMapper);
     }
 
     @Override
@@ -124,7 +123,7 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
         }
     }
 
-    private void readSchema(DatabaseConnectorMapper connectorMapper) {
+    private void readSchema(ConnectorFactory connectorFactory, DatabaseConnectorMapper connectorMapper) {
         List<Map> schemas = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(GET_TABLE_SCHEMA));
         if (!CollectionUtils.isEmpty(schemas)) {
             schemas.forEach(map -> {
@@ -134,29 +133,20 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
             });
         }
 
-        try {
-            Connection connection = connectorMapper.getConnection();
-            DatabaseMetaData metaData = connection.getMetaData();
-
-            // read column
-            for (TableId tableId : tables.values()) {
-                ResultSet rs = metaData.getColumns("", config.getSchema(), tableId.tableName, null);
-                PgResultSet pgResultSet = (PgResultSet) rs;
-                while (pgResultSet.next()) {
-                    pgResultSet.getRow();
-                }
+        // read column
+        for (TableId tableId : tables.values()) {
+            MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableId.tableName);
+            if(!CollectionUtils.isEmpty(metaInfo.getColumn())){
+                tableId.fields = metaInfo.getColumn();
             }
-        } catch (Exception e) {
-            throw new ListenerException(e.getCause());
         }
     }
 
     private RowChangedEvent parseDelete(ByteBuffer buffer) {
         int relationId = buffer.getInt();
-        logger.info("Delete table {}", tables.get(relationId).tableName);
 
         List<Object> data = new ArrayList<>();
-        String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
+        String newTuple = new String(new byte[] {buffer.get()}, 0, 1);
 
         switch (newTuple) {
             case "K":
@@ -170,10 +160,9 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
 
     private RowChangedEvent parseInsert(ByteBuffer buffer) {
         int relationId = buffer.getInt();
-        logger.info("Insert table {}", tables.get(relationId).tableName);
 
         List<Object> data = new ArrayList<>();
-        String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
+        String newTuple = new String(new byte[] {buffer.get()}, 0, 1);
         switch (newTuple) {
             case "N":
                 readTupleData(buffer, data);
@@ -186,19 +175,14 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
 
     private RowChangedEvent parseUpdate(ByteBuffer buffer) {
         int relationId = buffer.getInt();
-        logger.info("Update table {}", tables.get(relationId).tableName);
 
         List<Object> data = new ArrayList<>();
-        String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
+        String newTuple = new String(new byte[] {buffer.get()}, 0, 1);
         switch (newTuple) {
             case "K":
-                logger.info("Key update");
-                logger.info("Old Key");
                 readTupleData(buffer, data);
                 break;
             case "O":
-                logger.info("Value update");
-                logger.info("Old Value");
                 readTupleData(buffer, data);
                 break;
             case "N":
@@ -214,7 +198,7 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
     private void readTupleData(ByteBuffer msg, List<Object> data) {
         short nColumn = msg.getShort();
         for (int n = 0; n < nColumn; n++) {
-            String tupleContentType = new String(new byte[]{msg.get()}, 0, 1);
+            String tupleContentType = new String(new byte[] {msg.get()}, 0, 1);
             if (tupleContentType.equals("t")) {
                 int size = msg.getInt();
                 byte[] text = new byte[size];
@@ -239,8 +223,9 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
     }
 
     final class TableId {
-        Integer oid;
-        String tableName;
+        Integer     oid;
+        String      tableName;
+        List<Field> fields = new LinkedList<>();
 
         public TableId(Integer oid, String tableName) {
             this.oid = oid;