Explorar el Código

支持物化视图

AE86 hace 2 años
padre
commit
0fe66c3385

+ 26 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -9,9 +9,11 @@ import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.database.ds.SimpleConnection;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.enums.SetterEnum;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
+import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Filter;
 import org.dbsyncer.connector.model.MetaInfo;
@@ -59,6 +61,11 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return String.format("%s-%s-%s", config.getConnectorType(), config.getUrl(), config.getUsername());
     }
 
+    @Override
+    public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
+        return getTable(connectorMapper, null, null, null);
+    }
+
     @Override
     public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) {
         String quotation = buildSqlWithQuotation();
@@ -281,15 +288,27 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
      * 获取表列表
      *
      * @param connectorMapper
-     * @param sql
+     * @param catalog
+     * @param schema
+     * @param tableNamePattern
      * @return
      */
-    protected List<Table> getTable(DatabaseConnectorMapper connectorMapper, String sql) {
-        List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
-        if (!CollectionUtils.isEmpty(tableNames)) {
-            return tableNames.stream().map(name -> new Table(name)).collect(Collectors.toList());
-        }
-        return Collections.EMPTY_LIST;
+    protected List<Table> getTable(DatabaseConnectorMapper connectorMapper, String catalog, String schema, String tableNamePattern) {
+        return connectorMapper.execute(databaseTemplate -> {
+            List<Table> tables = new ArrayList<>();
+            SimpleConnection connection = (SimpleConnection) databaseTemplate.getConnection();
+            Connection conn = connection.getConnection();
+            String databaseCatalog = null == catalog ? conn.getCatalog() : catalog;
+            String schemaNamePattern = null == schema ? conn.getSchema() : schema;
+            String[] types = {TableTypeEnum.TABLE.getCode(), TableTypeEnum.VIEW.getCode(), TableTypeEnum.MATERIALIZED_VIEW.getCode()};
+            final ResultSet rs = conn.getMetaData().getTables(databaseCatalog, schemaNamePattern, tableNamePattern, types);
+            while (rs.next()) {
+                final String tableName = rs.getString("TABLE_NAME");
+                final String tableType = rs.getString("TABLE_TYPE");
+                tables.add(new Table(tableName, tableType));
+            }
+            return tables;
+        });
     }
 
     /**

+ 7 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/TableTypeEnum.java

@@ -13,10 +13,16 @@ public enum TableTypeEnum {
      * 表
      */
     TABLE("TABLE"),
+
     /**
      * 视图
      */
-    VIEW("VIEW");
+    VIEW("VIEW"),
+
+    /**
+     * 物化视图
+     */
+    MATERIALIZED_VIEW("MATERIALIZED VIEW");
 
     private String code;
 

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Table.java

@@ -17,7 +17,7 @@ public class Table {
     private String name;
 
     /**
-     * 表类型[TABLE、VIEW]
+     * 表类型[TABLE、VIEW、MATERIALIZED VIEW]
      */
     private String type;
 

+ 0 - 9
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -3,11 +3,7 @@ package org.dbsyncer.connector.mysql;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
-import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.model.PageSql;
-import org.dbsyncer.connector.model.Table;
-
-import java.util.List;
 
 public final class MysqlConnector extends AbstractDatabaseConnector {
 
@@ -56,9 +52,4 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
         return true;
     }
 
-    @Override
-    public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
-        return super.getTable(connectorMapper, "show tables");
-    }
-
 }

+ 3 - 11
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.connector.oracle;
 
-import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
@@ -11,21 +10,14 @@ import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.model.Table;
 
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 
 public final class OracleConnector extends AbstractDatabaseConnector {
 
     @Override
-    public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
-        final String sql = "SELECT TABLE_NAME,TABLE_TYPE FROM USER_TAB_COMMENTS";
-        List<Map<String, Object>> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql));
-        if (!CollectionUtils.isEmpty(list)) {
-            return list.stream().map(r -> new Table(r.get("TABLE_NAME").toString(), r.get("TABLE_TYPE").toString())).collect(Collectors.toList());
-        }
-        return Collections.EMPTY_LIST;
+    public List<Table> getTable(DatabaseConnectorMapper config) {
+        DatabaseConfig cfg = config.getConfig();
+        return super.getTable(config, null, cfg.getUsername().toUpperCase(), null);
     }
 
     @Override

+ 0 - 25
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -1,40 +1,15 @@
 package org.dbsyncer.connector.postgresql;
 
-import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
-import org.dbsyncer.connector.database.DatabaseConnectorMapper;
-import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.PageSql;
-import org.dbsyncer.connector.model.Table;
-
-import java.util.LinkedList;
-import java.util.List;
 
 public final class PostgreSQLConnector extends AbstractDatabaseConnector {
 
-    @Override
-    public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
-        List<Table> list = new LinkedList<>();
-        DatabaseConfig config = connectorMapper.getConfig();
-        final String queryTableSql = String.format("SELECT TABLENAME FROM PG_TABLES WHERE SCHEMANAME ='%s'", config.getSchema());
-        List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(queryTableSql, String.class));
-        if (!CollectionUtils.isEmpty(tableNames)) {
-            tableNames.forEach(name -> list.add(new Table(name, TableTypeEnum.TABLE.getCode())));
-        }
-
-        final String queryTableViewSql = String.format("SELECT VIEWNAME FROM PG_VIEWS WHERE SCHEMANAME ='%s'", config.getSchema());
-        List<String> tableViewNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(queryTableViewSql, String.class));
-        if (!CollectionUtils.isEmpty(tableViewNames)) {
-            tableViewNames.forEach(name -> list.add(new Table(name, TableTypeEnum.VIEW.getCode())));
-        }
-        return list;
-    }
-
     @Override
     public String getPageSql(PageSql config) {
         return config.getQuerySql() + DatabaseConstant.POSTGRESQL_PAGE_SQL;

+ 3 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java

@@ -1,7 +1,8 @@
 package org.dbsyncer.connector.sql;
 
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.config.CommandConfig;
+import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
@@ -10,7 +11,6 @@ import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.model.Table;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -25,9 +25,7 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
     @Override
     public List<Table> getTable(DatabaseConnectorMapper config) {
         DatabaseConfig cfg = config.getConfig();
-        List<Table> tables = new ArrayList<>();
-        tables.add(new Table(cfg.getTable()));
-        return tables;
+        return super.getTable(config, null, null, cfg.getTable());
     }
 
     @Override

+ 11 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java

@@ -1,11 +1,22 @@
 package org.dbsyncer.connector.sql;
 
+import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
+import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.model.Table;
+
+import java.util.List;
 
 public final class DQLOracleConnector extends AbstractDQLConnector {
 
+    @Override
+    public List<Table> getTable(DatabaseConnectorMapper config) {
+        DatabaseConfig cfg = config.getConfig();
+        return super.getTable(config, null, cfg.getUsername().toUpperCase(), cfg.getTable());
+    }
+
     @Override
     public String getPageSql(PageSql config) {
         return DatabaseConstant.ORACLE_PAGE_SQL_START + config.getQuerySql() + DatabaseConstant.ORACLE_PAGE_SQL_END;

+ 19 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.sqlserver;
 
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
@@ -7,17 +8,26 @@ import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
+import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.model.Table;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public final class SqlServerConnector extends AbstractDatabaseConnector {
 
+    private static final String QUERY_VIEW = "select name from sysobjects where xtype in('v')";
+
+    private static final String QUERY_TABLE = "select name from sys.tables where schema_id = schema_id('%s') and is_ms_shipped = 0";
+
     @Override
     public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
         DatabaseConfig config = connectorMapper.getConfig();
-        return super.getTable(connectorMapper, String.format("select name from sys.tables where schema_id = schema_id('%s') and is_ms_shipped = 0", config.getSchema()));
+        List<Table> tables = getTables(connectorMapper, String.format(QUERY_TABLE, config.getSchema()), TableTypeEnum.TABLE);
+        tables.addAll(getTables(connectorMapper, QUERY_VIEW, TableTypeEnum.VIEW));
+        return tables;
     }
 
     @Override
@@ -45,4 +55,12 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
         // 从存储过程查询(定时更新总数,可能存在误差)
         return String.format("select rows from sysindexes where id = object_id('%s.%s') and indid in (0, 1)", cfg.getSchema(), table);
     }
+
+    private List<Table> getTables(DatabaseConnectorMapper connectorMapper, String sql, TableTypeEnum type) {
+        List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
+        if (!CollectionUtils.isEmpty(tableNames)) {
+            return tableNames.stream().map(name -> new Table(name, type.getCode())).collect(Collectors.toList());
+        }
+        return new ArrayList<>();
+    }
 }

+ 85 - 17
dbsyncer-connector/src/main/test/SqlServerConnectionTest.java → dbsyncer-connector/src/main/test/ConnectionTest.java

@@ -2,16 +2,18 @@ import oracle.jdbc.OracleConnection;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
+import org.dbsyncer.connector.enums.TableTypeEnum;
+import org.dbsyncer.connector.model.Table;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.jdbc.core.BatchPreparedStatementSetter;
 
 import java.nio.charset.Charset;
-import java.sql.Clob;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
+import java.sql.*;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.*;
 
 /**
@@ -19,20 +21,15 @@ import java.util.concurrent.*;
  * @version 1.0.0
  * @date 2022/4/11 20:19
  */
-public class SqlServerConnectionTest {
+public class ConnectionTest {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Test
     public void testByte() {
-        DatabaseConfig config = new DatabaseConfig();
-        config.setUrl("jdbc:oracle:thin:@127.0.0.1:1521:XE");
-        config.setUsername("ae86");
-        config.setPassword("123");
-        config.setDriverClassName("oracle.jdbc.OracleDriver");
-        final DatabaseConnectorMapper connectorMapper = new DatabaseConnectorMapper(config);
+        final DatabaseConnectorMapper connectorMapper = new DatabaseConnectorMapper(createOracleConfig());
 
-        String executeSql="UPDATE \"my_user\" SET \"name\"=?,\"clo\"=? WHERE \"id\"=?";
+        String executeSql = "UPDATE \"my_user\" SET \"name\"=?,\"clo\"=? WHERE \"id\"=?";
         int[] execute = connectorMapper.execute(databaseTemplate ->
                 databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
                     @Override
@@ -62,12 +59,7 @@ public class SqlServerConnectionTest {
 
     @Test
     public void testConnection() throws InterruptedException {
-        DatabaseConfig config = new DatabaseConfig();
-        config.setUrl("jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test");
-        config.setUsername("sa");
-        config.setPassword("123");
-        config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
-        final DatabaseConnectorMapper connectorMapper = new DatabaseConnectorMapper(config);
+        final DatabaseConnectorMapper connectorMapper = new DatabaseConnectorMapper(createSqlServerConfig());
 
         // 模拟并发
         final int threadSize = 100;
@@ -109,4 +101,80 @@ public class SqlServerConnectionTest {
         TimeUnit.SECONDS.sleep(3);
         logger.info("test end");
     }
+
+    @Test
+    public void testReadSchema() {
+        getTables(createOracleConfig(), "test", "AE86", "MY_ORG");
+        getTables(createOracleConfig(), "test", "AE86", null);
+
+        getTables(createMysqlConfig());
+
+        getTables(createSqlServerConfig());
+
+        getTables(createPostgresConfig());
+    }
+
+    private List<Table> getTables(DatabaseConfig config) {
+        return getTables(config, null, null, null);
+    }
+
+    private List<Table> getTables(DatabaseConfig config, final String catalog, final String schema, final String tableNamePattern) {
+        final DatabaseConnectorMapper connectorMapper = new DatabaseConnectorMapper(config);
+        List<Table> tables = new ArrayList<>();
+        connectorMapper.execute(databaseTemplate -> {
+            SimpleConnection connection = (SimpleConnection) databaseTemplate.getConnection();
+            Connection conn = connection.getConnection();
+            String databaseCatalog = null == catalog ? conn.getCatalog() : catalog;
+            String schemaNamePattern = null == schema ? conn.getSchema() : schema;
+            String[] types = {TableTypeEnum.TABLE.getCode(), TableTypeEnum.VIEW.getCode(), TableTypeEnum.MATERIALIZED_VIEW.getCode()};
+            final ResultSet rs = conn.getMetaData().getTables(databaseCatalog, schemaNamePattern, tableNamePattern, types);
+            while (rs.next()) {
+                final String tableName = rs.getString("TABLE_NAME");
+                final String tableType = rs.getString("TABLE_TYPE");
+                tables.add(new Table(tableName, tableType));
+            }
+            return tables;
+        });
+
+        logger.info("\r 表总数{}", tables.size());
+        tables.forEach(t -> logger.info("{} {}", t.getName(), t.getType()));
+
+        return tables;
+    }
+
+    private DatabaseConfig createSqlServerConfig() {
+        DatabaseConfig config = new DatabaseConfig();
+        config.setUrl("jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test");
+        config.setUsername("sa");
+        config.setPassword("123");
+        config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+        return config;
+    }
+
+    private DatabaseConfig createOracleConfig() {
+        DatabaseConfig config = new DatabaseConfig();
+        config.setUrl("jdbc:oracle:thin:@127.0.0.1:1521:ORCL");
+        config.setUsername("ae86");
+        config.setPassword("123");
+        config.setDriverClassName("oracle.jdbc.OracleDriver");
+        return config;
+    }
+
+    private DatabaseConfig createMysqlConfig() {
+        DatabaseConfig config = new DatabaseConfig();
+        config.setUrl("jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false&autoReconnect=true&failOverReadOnly=false");
+        config.setUsername("root");
+        config.setPassword("123");
+        config.setDriverClassName("com.mysql.cj.jdbc.Driver");
+        return config;
+    }
+
+    private DatabaseConfig createPostgresConfig() {
+        DatabaseConfig config = new DatabaseConfig();
+        config.setUrl("jdbc:postgresql://127.0.0.1:5432/postgres");
+        config.setUsername("postgres");
+        config.setPassword("123456");
+        config.setDriverClassName("org.postgresql.Driver");
+        return config;
+    }
 }