Browse Source

Merge remote-tracking branch 'origin/V_1.0.0_Beta' into yjwang

yjwang 3 years ago
parent
commit
1af545178a

+ 91 - 44
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/PgOutputMessageDecoder.java

@@ -1,21 +1,24 @@
 package org.dbsyncer.listener.postgresql.decoder;
 
 import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
 import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
+import org.postgresql.jdbc.PgResultSet;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
 import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 
 /**
  * @author AE86
@@ -28,28 +31,14 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
 
     private static final LocalDateTime PG_EPOCH = LocalDateTime.of(2000, 1, 1, 0, 0, 0);
 
-    @Override
-    public void postProcessBeforeInitialization(DatabaseConnectorMapper connectorMapper) {
-        String pubName = getPubName();
-        String selectPublication = String.format("SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", pubName);
-        Integer count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(selectPublication, Integer.class));
-        if (0 < count) {
-            return;
-        }
+    private static final String GET_TABLE_SCHEMA = "select oid,relname as tableName from pg_class t inner join (select ns.oid as nspoid, ns.nspname from pg_namespace ns where ns.nspname = (select (current_schemas(false))[s.r] from generate_series(1, array_upper(current_schemas(false), 1)) as s(r))) as n on n.nspoid = t.relnamespace";
 
-        logger.info("Creating new publication '{}' for plugin '{}'", pubName, getOutputPlugin());
-        try {
-            String createPublication = String.format("CREATE PUBLICATION %s FOR ALL TABLES", pubName);
-            logger.info("Creating Publication with statement '{}'", createPublication);
-            connectorMapper.execute(databaseTemplate -> {
-                databaseTemplate.execute(createPublication);
-                return true;
-            });
-        } catch (Exception e) {
-            throw new ListenerException(e.getCause());
-        }
+    private static final Map<Integer, TableId> tables = new LinkedHashMap<>();
 
-        // TODO read table schema
+    @Override
+    public void postProcessBeforeInitialization(DatabaseConnectorMapper connectorMapper) {
+        initPublication(connectorMapper);
+        readSchema(connectorMapper);
     }
 
     @Override
@@ -99,9 +88,72 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
         return null;
     }
 
+    @Override
+    public String getOutputPlugin() {
+        return MessageDecoderEnum.PG_OUTPUT.getType();
+    }
+
+    @Override
+    public void withSlotOption(ChainedLogicalStreamBuilder builder) {
+        builder.withSlotOption("proto_version", 1);
+        builder.withSlotOption("publication_names", getPubName());
+    }
+
+    private String getPubName() {
+        return String.format("dbs_pub_%s_%s", config.getSchema(), config.getUsername());
+    }
+
+    private void initPublication(DatabaseConnectorMapper connectorMapper) {
+        String pubName = getPubName();
+        String selectPublication = String.format("SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", pubName);
+        Integer count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(selectPublication, Integer.class));
+        if (0 < count) {
+            return;
+        }
+
+        logger.info("Creating new publication '{}' for plugin '{}'", pubName, getOutputPlugin());
+        try {
+            String createPublication = String.format("CREATE PUBLICATION %s FOR ALL TABLES", pubName);
+            logger.info("Creating Publication with statement '{}'", createPublication);
+            connectorMapper.execute(databaseTemplate -> {
+                databaseTemplate.execute(createPublication);
+                return true;
+            });
+        } catch (Exception e) {
+            throw new ListenerException(e.getCause());
+        }
+    }
+
+    private void readSchema(DatabaseConnectorMapper connectorMapper) {
+        List<Map> schemas = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(GET_TABLE_SCHEMA));
+        if (!CollectionUtils.isEmpty(schemas)) {
+            schemas.forEach(map -> {
+                Long oid = (Long) map.get("oid");
+                String tableName = (String) map.get("tableName");
+                tables.put(oid.intValue(), new TableId(oid.intValue(), tableName));
+            });
+        }
+
+        try {
+            Connection connection = connectorMapper.getConnection();
+            DatabaseMetaData metaData = connection.getMetaData();
+
+            // read column
+            for (TableId tableId : tables.values()) {
+                ResultSet rs = metaData.getColumns("", config.getSchema(), tableId.tableName, null);
+                PgResultSet pgResultSet = (PgResultSet) rs;
+                while (pgResultSet.next()) {
+                    pgResultSet.getRow();
+                }
+            }
+        } catch (Exception e) {
+            throw new ListenerException(e.getCause());
+        }
+    }
+
     private RowChangedEvent parseDelete(ByteBuffer buffer) {
         int relationId = buffer.getInt();
-        logger.info("Delete table {}", relationId);
+        logger.info("Delete table {}", tables.get(relationId).tableName);
 
         List<Object> data = new ArrayList<>();
         String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
@@ -113,12 +165,12 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
             default:
                 logger.info("K not set, got instead {}", newTuple);
         }
-        return new RowChangedEvent(String.valueOf(relationId), ConnectorConstant.OPERTION_INSERT, data, Collections.EMPTY_LIST);
+        return new RowChangedEvent(tables.get(relationId).tableName, ConnectorConstant.OPERTION_INSERT, data, Collections.EMPTY_LIST);
     }
 
     private RowChangedEvent parseInsert(ByteBuffer buffer) {
         int relationId = buffer.getInt();
-        logger.info("Insert table {}", relationId);
+        logger.info("Insert table {}", tables.get(relationId).tableName);
 
         List<Object> data = new ArrayList<>();
         String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
@@ -129,12 +181,12 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
             default:
                 logger.info("N not set, got instead {}", newTuple);
         }
-        return new RowChangedEvent(String.valueOf(relationId), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, data);
+        return new RowChangedEvent(tables.get(relationId).tableName, ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, data);
     }
 
     private RowChangedEvent parseUpdate(ByteBuffer buffer) {
         int relationId = buffer.getInt();
-        logger.info("Update table {}", relationId);
+        logger.info("Update table {}", tables.get(relationId).tableName);
 
         List<Object> data = new ArrayList<>();
         String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
@@ -156,22 +208,7 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
                 logger.info("K or O Byte1 not set, got instead {}", newTuple);
         }
 
-        return new RowChangedEvent(String.valueOf(relationId), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, data);
-    }
-
-    @Override
-    public String getOutputPlugin() {
-        return MessageDecoderEnum.PG_OUTPUT.getType();
-    }
-
-    @Override
-    public void withSlotOption(ChainedLogicalStreamBuilder builder) {
-        builder.withSlotOption("proto_version", 1);
-        builder.withSlotOption("publication_names", getPubName());
-    }
-
-    private String getPubName() {
-        return String.format("dbs_pub_%s_%s", config.getSchema(), config.getUsername());
+        return new RowChangedEvent(tables.get(relationId).tableName, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, data);
     }
 
     private void readTupleData(ByteBuffer msg, List<Object> data) {
@@ -201,4 +238,14 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
         }
     }
 
+    final class TableId {
+        Integer oid;
+        String tableName;
+
+        public TableId(Integer oid, String tableName) {
+            this.oid = oid;
+            this.tableName = tableName;
+        }
+    }
+
 }

+ 1 - 1
dbsyncer-web/src/main/resources/public/connector/addDqlMysql.html

@@ -16,7 +16,7 @@
     <div class="form-group">
         <label class="col-sm-2 control-label">SQL <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-10">
-            <textarea id="sql" name="sql" class="form-control" maxlength="1024" dbsyncer-valid="require" rows="10" th:text="${connector?.config?.sql}?:'SELECT T1.* FROM USER T1'"></textarea>
+            <textarea id="sql" name="sql" class="form-control" maxlength="8192" dbsyncer-valid="require" rows="10" th:text="${connector?.config?.sql}?:'SELECT T1.* FROM USER T1'"></textarea>
         </div>
     </div>
     <div class="form-group">

+ 1 - 1
dbsyncer-web/src/main/resources/public/connector/addDqlOracle.html

@@ -16,7 +16,7 @@
     <div class="form-group">
         <label class="col-sm-2 control-label">SQL <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-10">
-            <textarea id="sql" name="sql" class="form-control dbsyncer_textarea_resize_none" maxlength="1024" dbsyncer-valid="require" rows="10" th:text="${connector?.config?.sql}?:'SELECT T1.*,ROWIDTOCHAR(ROWID) as RID FROM &quot;USER&quot; T1'"></textarea>
+            <textarea id="sql" name="sql" class="form-control dbsyncer_textarea_resize_none" maxlength="8192" dbsyncer-valid="require" rows="10" th:text="${connector?.config?.sql}?:'SELECT T1.*,ROWIDTOCHAR(ROWID) as RID FROM &quot;USER&quot; T1'"></textarea>
         </div>
     </div>
     <div class="form-group">

+ 2 - 5
dbsyncer-web/src/main/resources/public/connector/addDqlPostgreSQL.html

@@ -18,9 +18,7 @@
     <div class="form-group">
         <label class="col-sm-2 control-label">SQL <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-10">
-            <textarea class="form-control dbsyncer_textarea_resize_none" dbsyncer-valid="require" id="sql"
-                      maxlength="1024" name="sql" rows="10"
-                      th:text="${connector?.config?.sql}?:'SELECT T1.* FROM &quot;USER&quot; T1'"></textarea>
+            <textarea class="form-control dbsyncer_textarea_resize_none" maxlength="8192" dbsyncer-valid="require" id="sql" name="sql" rows="10" th:text="${connector?.config?.sql}?:'SELECT T1.* FROM &quot;USER&quot; T1'"></textarea>
         </div>
     </div>
     <div class="form-group">
@@ -37,8 +35,7 @@
     <div class="form-group">
         <label class="col-sm-2 control-label">URL <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-10">
-            <textarea class="form-control dbsyncer_textarea_resize_none" dbsyncer-valid="require" maxlength="1024"
-                      name="url" rows="5"
+            <textarea class="form-control dbsyncer_textarea_resize_none" dbsyncer-valid="require" maxlength="1024" name="url" rows="5"
                       th:text="${connector?.config?.url} ?: 'jdbc:postgresql://127.0.0.1:5432/test'"></textarea>
         </div>
     </div>

+ 1 - 1
dbsyncer-web/src/main/resources/public/connector/addDqlSqlServer.html

@@ -31,7 +31,7 @@
     <div class="form-group">
         <label class="col-sm-2 control-label">URL <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-10">
-            <textarea name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="1024" dbsyncer-valid="require" rows="5" th:text="${connector?.config?.url} ?: 'jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test'"></textarea>
+            <textarea name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="8192" dbsyncer-valid="require" rows="5" th:text="${connector?.config?.url} ?: 'jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test'"></textarea>
         </div>
     </div>
     <div class="form-group">