1
0
Эх сурвалжийг харах

支持SqlServer切换dbo架构名

AE86 3 жил өмнө
parent
commit
3dd863ee58
17 өөрчлөгдсөн 108 нэмэгдсэн , 39 устгасан
  1. 7 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/DqlSqlServerConfigChecker.java
  2. 13 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/SqlServerConfigChecker.java
  3. 8 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/CommandConfig.java
  4. 22 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlServerDatabaseConfig.java
  5. 2 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  6. 3 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/ConnectorEnum.java
  7. 2 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java
  8. 3 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java
  9. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLMysqlConnector.java
  10. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java
  11. 3 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java
  12. 6 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java
  13. 10 7
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java
  14. 10 8
      dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java
  15. 3 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  16. 7 0
      dbsyncer-web/src/main/resources/public/connector/addDqlSqlServer.html
  17. 7 0
      dbsyncer-web/src/main/resources/public/connector/addSqlServer.html

+ 7 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/DqlSqlServerConfigChecker.java

@@ -1,7 +1,9 @@
 package org.dbsyncer.biz.checker.impl.connector;
 
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.SqlServerDatabaseConfig;
 import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
 
 import java.util.Map;
 
@@ -17,5 +19,10 @@ public class DqlSqlServerConfigChecker extends AbstractDataBaseConfigChecker {
     public void modify(DatabaseConfig connectorConfig, Map<String, String> params) {
         super.modify(connectorConfig, params);
         super.modifyDql(connectorConfig, params);
+        String schema = params.get("schema");
+        Assert.hasText(schema, "Schema is empty.");
+
+        SqlServerDatabaseConfig config = (SqlServerDatabaseConfig) connectorConfig;
+        config.setSchema(schema);
     }
 }

+ 13 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/SqlServerConfigChecker.java

@@ -1,6 +1,11 @@
 package org.dbsyncer.biz.checker.impl.connector;
 
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.SqlServerDatabaseConfig;
 import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import java.util.Map;
 
 /**
  * @author AE86
@@ -9,5 +14,13 @@ import org.springframework.stereotype.Component;
  */
 @Component
 public class SqlServerConfigChecker extends AbstractDataBaseConfigChecker {
+    @Override
+    public void modify(DatabaseConfig connectorConfig, Map<String, String> params) {
+        super.modify(connectorConfig, params);
+        String schema = params.get("schema");
+        Assert.hasText(schema, "Schema is empty.");
 
+        SqlServerDatabaseConfig config = (SqlServerDatabaseConfig) connectorConfig;
+        config.setSchema(schema);
+    }
 }

+ 8 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/CommandConfig.java

@@ -19,17 +19,20 @@ public class CommandConfig {
 
     private List<Filter> filter;
 
+    private ConnectorConfig connectorConfig;
+
     public CommandConfig(String type, Table table, Table originalTable) {
         this.type = type;
         this.table = table;
         this.originalTable = originalTable;
     }
 
-    public CommandConfig(String type, Table table, Table originalTable, List<Filter> filter) {
+    public CommandConfig(String type, Table table, Table originalTable, List<Filter> filter, ConnectorConfig connectorConfig) {
         this.type = type;
         this.table = table;
         this.originalTable = originalTable;
         this.filter = filter;
+        this.connectorConfig = connectorConfig;
     }
 
     public String getType() {
@@ -47,4 +50,8 @@ public class CommandConfig {
     public Table getOriginalTable() {
         return originalTable;
     }
+
+    public ConnectorConfig getConnectorConfig() {
+        return connectorConfig;
+    }
 }

+ 22 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlServerDatabaseConfig.java

@@ -0,0 +1,22 @@
+package org.dbsyncer.connector.config;
+
+/**
+ * SqlServer连接配置
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/1/10 23:57
+ */
+public class SqlServerDatabaseConfig extends DatabaseConfig {
+
+    // 构架名
+    private String schema;
+
+    public String getSchema() {
+        return schema;
+    }
+
+    public void setSchema(String schema) {
+        this.schema = schema;
+    }
+}

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -27,7 +27,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    protected abstract String getTableSql();
+    protected abstract String getTableSql(DatabaseConfig config);
 
     @Override
     public ConnectorMapper connect(DatabaseConfig config) {
@@ -57,7 +57,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
     @Override
     public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
-        String sql = getTableSql();
+        String sql = getTableSql(connectorMapper.getConfig());
         List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
         if (!CollectionUtils.isEmpty(tableNames)) {
             return tableNames.stream().map(name -> new Table(name)).collect(Collectors.toList());

+ 3 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/ConnectorEnum.java

@@ -3,9 +3,7 @@ package org.dbsyncer.connector.enums;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
-import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.ESConfig;
-import org.dbsyncer.connector.config.KafkaConfig;
+import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.es.ESConnector;
 import org.dbsyncer.connector.kafka.KafkaConnector;
 import org.dbsyncer.connector.mysql.MysqlConnector;
@@ -35,7 +33,7 @@ public enum ConnectorEnum {
     /**
      * SqlServer 连接器
      */
-    SQL_SERVER("SqlServer", new SqlServerConnector(), DatabaseConfig.class),
+    SQL_SERVER("SqlServer", new SqlServerConnector(), SqlServerDatabaseConfig.class),
     /**
      * Elasticsearch 连接器
      */
@@ -55,7 +53,7 @@ public enum ConnectorEnum {
     /**
      * DqlSqlServer 连接器
      */
-    DQL_SQL_SERVER("DqlSqlServer", new DQLSqlServerConnector(), DatabaseConfig.class);
+    DQL_SQL_SERVER("DqlSqlServer", new DQLSqlServerConnector(), SqlServerDatabaseConfig.class);
 
     // 连接器名称
     private String type;

+ 2 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.mysql;
 
+import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.PageSqlConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
@@ -7,7 +8,7 @@ import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 public final class MysqlConnector extends AbstractDatabaseConnector {
 
     @Override
-    protected String getTableSql() {
+    protected String getTableSql(DatabaseConfig config) {
         return "show tables";
     }
 

+ 3 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.connector.oracle;
 
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.PageSqlConfig;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.constant.DatabaseConstant;
@@ -15,13 +16,13 @@ import java.util.stream.Collectors;
 public final class OracleConnector extends AbstractDatabaseConnector {
 
     @Override
-    protected String getTableSql() {
+    protected String getTableSql(DatabaseConfig config) {
         return "SELECT TABLE_NAME,TABLE_TYPE FROM USER_TAB_COMMENTS";
     }
 
     @Override
     public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
-        String sql = getTableSql();
+        String sql = getTableSql(connectorMapper.getConfig());
         List<Map<String, Object>> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql));
         if (!CollectionUtils.isEmpty(list)) {
             return list.stream().map(r -> new Table(r.get("TABLE_NAME").toString(), r.get("TABLE_TYPE").toString())).collect(Collectors.toList());

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLMysqlConnector.java

@@ -11,7 +11,7 @@ import java.util.Map;
 public final class DQLMysqlConnector extends AbstractDatabaseConnector {
 
     @Override
-    protected String getTableSql() {
+    protected String getTableSql(DatabaseConfig config) {
         return "show tables";
     }
 

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java

@@ -11,7 +11,7 @@ import java.util.Map;
 public final class DQLOracleConnector extends AbstractDatabaseConnector {
 
     @Override
-    protected String getTableSql() {
+    protected String getTableSql(DatabaseConfig config) {
         return "SELECT TABLE_NAME,TABLE_TYPE FROM USER_TAB_COMMENTS";
     }
 

+ 3 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java

@@ -30,8 +30,9 @@ public final class DQLSqlServerConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    protected String getTableSql() {
-        return "SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('dbo')";
+    protected String getTableSql(DatabaseConfig config) {
+        SqlServerDatabaseConfig cfg = (SqlServerDatabaseConfig) config;
+        return String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s')", cfg.getSchema());
     }
 
     @Override

+ 6 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -3,10 +3,7 @@ package org.dbsyncer.connector.sqlserver;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.PageSqlConfig;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
@@ -32,8 +29,9 @@ public final class SqlServerConnector extends AbstractDatabaseConnector implemen
     }
 
     @Override
-    protected String getTableSql() {
-        return "SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('dbo') AND IS_MS_SHIPPED = 0";
+    protected String getTableSql(DatabaseConfig config) {
+        SqlServerDatabaseConfig cfg = (SqlServerDatabaseConfig) config;
+        return String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s') AND IS_MS_SHIPPED = 0", cfg.getSchema());
     }
 
     @Override
@@ -67,8 +65,9 @@ public final class SqlServerConnector extends AbstractDatabaseConnector implemen
         if (StringUtil.isNotBlank(queryFilterSql)) {
             queryCount.append("SELECT COUNT(*) FROM ").append(table.getName()).append(queryFilterSql);
         } else {
+            SqlServerDatabaseConfig cfg = (SqlServerDatabaseConfig) commandConfig.getConnectorConfig();
             // 从存储过程查询(定时更新总数,可能存在误差)
-            queryCount.append("SELECT ROWS FROM SYSINDEXES WHERE ID = OBJECT_ID('").append("dbo.").append(table.getName()).append(
+            queryCount.append("SELECT ROWS FROM SYSINDEXES WHERE ID = OBJECT_ID('").append(cfg.getSchema()).append(".").append(table.getName()).append(
                     "') AND INDID IN (0, 1)");
         }
         map.put(ConnectorConstant.OPERTION_QUERY_COUNT, queryCount.toString());

+ 10 - 7
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -5,6 +5,7 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.RandomUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.SqlServerDatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.listener.AbstractExtractor;
@@ -34,12 +35,12 @@ public class SqlServerExtractor extends AbstractExtractor {
 
     private static final String STATEMENTS_PLACEHOLDER = "#";
     private static final String GET_DATABASE_NAME = "SELECT db_name()";
-    private static final String GET_TABLE_LIST = "SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('dbo') AND IS_MS_SHIPPED = 0";
-    private static final String IS_SERVER_AGENT_RUNNING = "EXEC master.dbo.xp_servicecontrol N'QUERYSTATE', N'SQLSERVERAGENT'";
+    private static final String GET_TABLE_LIST = "SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('#') AND IS_MS_SHIPPED = 0";
+    private static final String IS_SERVER_AGENT_RUNNING = "EXEC master.#.xp_servicecontrol N'QUERYSTATE', N'SQLSERVERAGENT'";
     private static final String IS_DB_CDC_ENABLED = "SELECT is_cdc_enabled FROM sys.databases WHERE name = '#'";
     private static final String IS_TABLE_CDC_ENABLED = "SELECT COUNT(*) FROM sys.tables tb WHERE tb.is_tracked_by_cdc = 1 AND tb.name='#'";
     private static final String ENABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name = '#' AND is_cdc_enabled=0) EXEC sys.sp_cdc_enable_db";
-    private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0) EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
+    private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0) EXEC sys.sp_cdc_enable_table @source_schema = N'%s', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
     private static final String GET_TABLES_CDC_ENABLED = "EXEC sys.sp_cdc_help_change_data_capture";
     private static final String GET_MAX_LSN = "SELECT sys.fn_cdc_get_max_lsn()";
     private static final String GET_MIN_LSN = "SELECT sys.fn_cdc_get_min_lsn('#')";
@@ -60,6 +61,7 @@ public class SqlServerExtractor extends AbstractExtractor {
     private Worker worker;
     private Lsn lastLsn;
     private String serverName;
+    private String schema;
 
     @Override
     public void start() {
@@ -74,7 +76,7 @@ public class SqlServerExtractor extends AbstractExtractor {
             readTables();
             Assert.notEmpty(tables, "No tables available");
 
-            boolean enabledServerAgent = queryAndMap(IS_SERVER_AGENT_RUNNING, rs -> "Running.".equals(rs.getString(1)));
+            boolean enabledServerAgent = queryAndMap(IS_SERVER_AGENT_RUNNING.replace(STATEMENTS_PLACEHOLDER, schema), rs -> "Running.".equals(rs.getString(1)));
             Assert.isTrue(enabledServerAgent, "Please ensure that the SQL Server Agent is running");
 
             enableDBCDC();
@@ -119,10 +121,11 @@ public class SqlServerExtractor extends AbstractExtractor {
     }
 
     private void connect() {
-        DatabaseConfig cfg = (DatabaseConfig) connectorConfig;
+        SqlServerDatabaseConfig cfg = (SqlServerDatabaseConfig) connectorConfig;
         if (connectorFactory.isAlive(cfg)) {
             connectorMapper = (DatabaseConnectorMapper) connectorFactory.connect(cfg);
             serverName = cfg.getUrl();
+            schema = cfg.getSchema();
             connectionClosed = false;
         }
     }
@@ -141,7 +144,7 @@ public class SqlServerExtractor extends AbstractExtractor {
     }
 
     private void readTables() {
-        tables = queryAndMapList(GET_TABLE_LIST, rs -> {
+        tables = queryAndMapList(GET_TABLE_LIST.replace(STATEMENTS_PLACEHOLDER, schema), rs -> {
             Set<String> table = new LinkedHashSet<>();
             while (rs.next()) {
                 if (filterTable.contains(rs.getString(1))) {
@@ -182,7 +185,7 @@ public class SqlServerExtractor extends AbstractExtractor {
             tables.forEach(table -> {
                 boolean enabledTableCDC = queryAndMap(IS_TABLE_CDC_ENABLED.replace(STATEMENTS_PLACEHOLDER, table), rs -> rs.getInt(1) > 0);
                 if (!enabledTableCDC) {
-                    execute(ENABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, table));
+                    execute(String.format(ENABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, table), schema));
                     Lsn minLsn = queryAndMap(GET_MIN_LSN.replace(STATEMENTS_PLACEHOLDER, table), rs -> new Lsn(rs.getBytes(1)));
                     logger.info("启用CDC表[{}]:{}", table, minLsn.isAvailable());
                 }

+ 10 - 8
dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java

@@ -26,13 +26,13 @@ public class ChangeDataCaptureTest {
     private static final String STATEMENTS_PLACEHOLDER = "#";
     private static final String GET_DATABASE_NAME = "SELECT db_name()";
     private static final String GET_DATABASE_VERSION = "SELECT @@VERSION AS 'SQL Server Version'";
-    private static final String GET_TABLE_LIST = "SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('dbo') AND IS_MS_SHIPPED = 0";
-    private static final String IS_SERVER_AGENT_RUNNING = "EXEC master.dbo.xp_servicecontrol N'QUERYSTATE', N'SQLSERVERAGENT'";
+    private static final String GET_TABLE_LIST = "SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('#') AND IS_MS_SHIPPED = 0";
+    private static final String IS_SERVER_AGENT_RUNNING = "EXEC master.#.xp_servicecontrol N'QUERYSTATE', N'SQLSERVERAGENT'";
     private static final String IS_DB_CDC_ENABLED = "SELECT is_cdc_enabled FROM sys.databases WHERE name = '#'";
     private static final String IS_TABLE_CDC_ENABLED = "SELECT COUNT(*) FROM sys.tables tb WHERE tb.is_tracked_by_cdc = 1 AND tb.name='#'";
     private static final String ENABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name = '#' AND is_cdc_enabled=0) EXEC sys.sp_cdc_enable_db";
-    private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0) EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
-    private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'#', @capture_instance = 'all'";
+    private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0) EXEC sys.sp_cdc_enable_table @source_schema = N'%s', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
+    private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'%s', @source_name = N'#', @capture_instance = 'all'";
 
     private static final String AT_TIME_ZONE_UTC = " AT TIME ZONE 'UTC'";
     private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT sys.fn_cdc_map_lsn_to_time([__$start_lsn])#, * FROM cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
@@ -45,6 +45,7 @@ public class ChangeDataCaptureTest {
     private String getAllChangesForTable;
     private Connection connection = null;
     private Set<String> tables;
+    private String schema;
 
     /**
      * <p>cdc.captured_columns – 此表返回捕获列列表的结果。</p>
@@ -75,7 +76,7 @@ public class ChangeDataCaptureTest {
             supportsAtTimeZone = 2016 < Integer.valueOf(version.substring(21, 25));
         }
         logger.info("数据库版本:{}", version);
-        tables = cdc.queryAndMapList(GET_TABLE_LIST, rs -> {
+        tables = cdc.queryAndMapList(GET_TABLE_LIST.replace(STATEMENTS_PLACEHOLDER, realDatabaseName), rs -> {
             Set<String> table = new LinkedHashSet<>();
             while (rs.next()) {
                 table.add(rs.getString(1));
@@ -84,7 +85,7 @@ public class ChangeDataCaptureTest {
         });
         logger.info("所有表:{}", tables);
         // 获取Agent服务状态 Stopped. Running.
-        boolean enabledServerAgent = cdc.queryAndMap(IS_SERVER_AGENT_RUNNING, rs -> "Running.".equals(rs.getString(1)));
+        boolean enabledServerAgent = cdc.queryAndMap(IS_SERVER_AGENT_RUNNING.replace(STATEMENTS_PLACEHOLDER, realDatabaseName), rs -> "Running.".equals(rs.getString(1)));
         logger.info("是否启动Agent服务:{}", enabledServerAgent);
         Assert.assertTrue("The agent server is not running", enabledServerAgent);
         boolean enabledCDC = cdc.queryAndMap(IS_DB_CDC_ENABLED.replace(STATEMENTS_PLACEHOLDER, realDatabaseName), rs -> rs.getBoolean(1));
@@ -101,7 +102,7 @@ public class ChangeDataCaptureTest {
                 boolean enabledTableCDC = cdc.queryAndMap(IS_TABLE_CDC_ENABLED.replace(STATEMENTS_PLACEHOLDER, table), rs -> rs.getInt(1) > 0);
                 logger.info("是否启用CDC表[{}]:{}", table, enabledTableCDC);
                 if (!enabledTableCDC) {
-                    cdc.execute(ENABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, table));
+                    cdc.execute(String.format(ENABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, table), schema));
                     Lsn minLsn = cdc.queryAndMap(GET_MIN_LSN.replace(STATEMENTS_PLACEHOLDER, table), rs -> new Lsn(rs.getBytes(1)));
                     logger.info("启用CDC表[{}]:{}", table, minLsn.isAvailable());
                 }
@@ -190,7 +191,7 @@ public class ChangeDataCaptureTest {
 
         // 注销CDC表
         for (String table : tables) {
-            cdc.execute(DISABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, table));
+            cdc.execute(String.format(DISABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, table), schema));
         }
         cdc.close();
     }
@@ -199,6 +200,7 @@ public class ChangeDataCaptureTest {
         String username = "sa";
         String password = "123";
         String url = "jdbc:sqlserver://127.0.0.1:1434;DatabaseName=test";
+        schema = "dbo";
         connection = DriverManager.getConnection(url, username, password);
         if (connection != null) {
             DatabaseMetaData dm = (DatabaseMetaData) connection.getMetaData();

+ 3 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -132,7 +132,8 @@ public class ParserFactory implements Parser {
 
     @Override
     public Map<String, String> getCommand(Mapping mapping, TableGroup tableGroup) {
-        String sType = getConnectorConfig(mapping.getSourceConnectorId()).getConnectorType();
+        ConnectorConfig connectorConfig = getConnectorConfig(mapping.getSourceConnectorId());
+        String sType = connectorConfig.getConnectorType();
         String tType = getConnectorConfig(mapping.getTargetConnectorId()).getConnectorType();
         Table sourceTable = tableGroup.getSourceTable();
         Table targetTable = tableGroup.getTargetTable();
@@ -149,7 +150,7 @@ public class ParserFactory implements Parser {
                 }
             });
         }
-        final CommandConfig sourceConfig = new CommandConfig(sType, sTable, sourceTable, tableGroup.getFilter());
+        final CommandConfig sourceConfig = new CommandConfig(sType, sTable, sourceTable, tableGroup.getFilter(), connectorConfig);
         final CommandConfig targetConfig = new CommandConfig(tType, tTable, targetTable);
         // 获取连接器同步参数
         Map<String, String> command = connectorFactory.getCommand(sourceConfig, targetConfig);

+ 7 - 0
dbsyncer-web/src/main/resources/public/connector/addDqlSqlServer.html

@@ -34,6 +34,13 @@
             <textarea name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="1024" dbsyncer-valid="require" rows="5" th:text="${connector?.config?.url} ?: 'jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test'"></textarea>
         </div>
     </div>
+    <div class="form-group">
+        <label class="col-sm-2 control-label">架构名 <strong class="driverVerifcateRequired">*</strong></label>
+        <div class="col-sm-4">
+            <input class="form-control" name="schema" type="text" maxlength="32" dbsyncer-valid="require" placeholder="dbo" th:value="${connector?.config?.schema} ?: 'dbo'"/>
+        </div>
+        <div class="col-sm-6"></div>
+    </div>
     <div class="form-group">
         <label class="col-sm-2 control-label">驱动 </label>
         <div class="col-sm-10">

+ 7 - 0
dbsyncer-web/src/main/resources/public/connector/addSqlServer.html

@@ -19,6 +19,13 @@
             <textarea name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="1024" dbsyncer-valid="require" rows="5" th:text="${connector?.config?.url} ?: 'jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test'"></textarea>
         </div>
     </div>
+    <div class="form-group">
+        <label class="col-sm-2 control-label">架构名 <strong class="driverVerifcateRequired">*</strong></label>
+        <div class="col-sm-4">
+            <input class="form-control" name="schema" type="text" maxlength="32" dbsyncer-valid="require" placeholder="dbo" th:value="${connector?.config?.schema} ?: 'dbo'"/>
+        </div>
+        <div class="col-sm-6"></div>
+    </div>
     <div class="form-group">
         <label class="col-sm-2 control-label">驱动 </label>
         <div class="col-sm-10">