|
@@ -40,7 +40,6 @@ public class SqlServerExtractor extends AbstractExtractor {
|
|
private static final String IS_TABLE_CDC_ENABLED = "SELECT COUNT(*) FROM sys.tables tb WHERE tb.is_tracked_by_cdc = 1 AND tb.name='#'";
|
|
private static final String IS_TABLE_CDC_ENABLED = "SELECT COUNT(*) FROM sys.tables tb WHERE tb.is_tracked_by_cdc = 1 AND tb.name='#'";
|
|
private static final String ENABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name = '#' AND is_cdc_enabled=0) EXEC sys.sp_cdc_enable_db";
|
|
private static final String ENABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name = '#' AND is_cdc_enabled=0) EXEC sys.sp_cdc_enable_db";
|
|
private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0) EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
|
|
private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0) EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
|
|
- private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'#', @capture_instance = 'all'";
|
|
|
|
private static final String GET_TABLES_CDC_ENABLED = "EXEC sys.sp_cdc_help_change_data_capture";
|
|
private static final String GET_TABLES_CDC_ENABLED = "EXEC sys.sp_cdc_help_change_data_capture";
|
|
private static final String GET_MAX_LSN = "SELECT sys.fn_cdc_get_max_lsn()";
|
|
private static final String GET_MAX_LSN = "SELECT sys.fn_cdc_get_max_lsn()";
|
|
private static final String GET_MIN_LSN = "SELECT sys.fn_cdc_get_min_lsn('#')";
|
|
private static final String GET_MIN_LSN = "SELECT sys.fn_cdc_get_min_lsn('#')";
|
|
@@ -103,7 +102,6 @@ public class SqlServerExtractor extends AbstractExtractor {
|
|
worker.interrupt();
|
|
worker.interrupt();
|
|
worker = null;
|
|
worker = null;
|
|
}
|
|
}
|
|
- disableTableCDC();
|
|
|
|
preparedStatementCache.values().forEach(this::close);
|
|
preparedStatementCache.values().forEach(this::close);
|
|
preparedStatementCache.clear();
|
|
preparedStatementCache.clear();
|
|
connected = false;
|
|
connected = false;
|
|
@@ -179,12 +177,6 @@ public class SqlServerExtractor extends AbstractExtractor {
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
- private void disableTableCDC() {
|
|
|
|
- if (!CollectionUtils.isEmpty(tables)) {
|
|
|
|
- tables.forEach(table -> execute(DISABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, table)));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
private void enableTableCDC() {
|
|
private void enableTableCDC() {
|
|
if (!CollectionUtils.isEmpty(tables)) {
|
|
if (!CollectionUtils.isEmpty(tables)) {
|
|
tables.forEach(table -> {
|
|
tables.forEach(table -> {
|