AE86 4 éve
szülő
commit
710de4dabf
1 módosított fájl, 37 hozzáadás és 17 törlés
  1. 37 17
      dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java

+ 37 - 17
dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java

@@ -3,23 +3,25 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.*;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
 import java.util.regex.Pattern;
 
 /**
  * @version 1.0.0
  * @Author AE86
- * @see https://www.red-gate.com/simple-talk/sql/learn-sql-server/introduction-to-change-data-capture-cdc-in-sql-server-2008/
  * @Date 2021-05-18 22:25
+ * @see https://www.red-gate.com/simple-talk/sql/learn-sql-server/introduction-to-change-data-capture-cdc-in-sql-server-2008/
  */
 public class ChangeDataCaptureTest {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private static final String  GET_MAX_TRANSACTION_LSN        = "SELECT MAX(start_lsn) FROM cdc.lsn_time_mapping WHERE tran_id <> 0x00";
-    private static final String  GET_LIST_OF_CDC_ENABLED_TABLES = "EXEC sys.sp_cdc_help_change_data_capture";
-    private static final Pattern BRACKET_PATTERN                = Pattern.compile("[\\[\\]]");
+    private static final String GET_STATE_AGENT_SERVER = "EXEC master.dbo.xp_servicecontrol N'QUERYSTATE', N'SQLSERVERAGENT'";
+    private static final String GET_DATABASE_NAME = "SELECT db_name()";
+    private static final String GET_ENABLED_CDC_DATABASE = "SELECT is_cdc_enabled FROM sys.databases WHERE name = 'test'";
+    private static final String GET_MAX_TRANSACTION_LSN = "SELECT MAX(start_lsn) FROM cdc.lsn_time_mapping WHERE tran_id <> 0x00";
+    private static final String GET_LIST_OF_CDC_ENABLED_TABLES = "EXEC sys.sp_cdc_help_change_data_capture";
+    private static final Pattern BRACKET_PATTERN = Pattern.compile("[\\[\\]]");
 
     private Connection conn = null;
 
@@ -41,6 +43,15 @@ public class ChangeDataCaptureTest {
         ChangeDataCaptureTest cdc = new ChangeDataCaptureTest();
         cdc.start();
 
+        // 获取数据库名 test
+        cdc.queryAndMap(GET_DATABASE_NAME);
+        // 获取Agent服务状态 Stopped. Running.
+        cdc.queryAndMap(GET_STATE_AGENT_SERVER);
+        // 获取数据库CDC状态 false 0 true 1
+        cdc.queryAndMap(GET_ENABLED_CDC_DATABASE);
+
+        // 读取事务
+        cdc.queryAndMap(GET_MAX_TRANSACTION_LSN);
         // 读取增量
         cdc.queryAndMap(GET_LIST_OF_CDC_ENABLED_TABLES, rs -> {
             final Set<SqlServerChangeTable> changeTables = new HashSet<>();
@@ -66,21 +77,13 @@ public class ChangeDataCaptureTest {
             return changeTables;
         });
 
-        // 读取事务
-        cdc.queryAndMap(GET_MAX_TRANSACTION_LSN, rs -> {
-            while (rs.next()) {
-                logger.info("[{}],[{}]", rs.getString(1), rs.getString(2));
-            }
-            return null;
-        });
-
         cdc.close();
     }
 
     private void start() throws SQLException {
         String username = "sa";
-        String password = "Sa123456";
-        String url = "jdbc:sqlserver://127.0.0.1:1434;DatabaseName=dbsyncer";
+        String password = "123";
+        String url = "jdbc:sqlserver://127.0.0.1:1434;DatabaseName=test";
         conn = DriverManager.getConnection(url, username, password);
         if (conn != null) {
             DatabaseMetaData dm = (DatabaseMetaData) conn.getMetaData();
@@ -101,6 +104,23 @@ public class ChangeDataCaptureTest {
         T apply(ResultSet rs) throws SQLException;
     }
 
+    public <T> T queryAndMap(String sql) throws SQLException {
+        return (T) queryAndMap(sql, rs -> {
+            ResultSetMetaData metaData = rs.getMetaData();
+            int columnCount = metaData.getColumnCount();
+            List<Map> data = new ArrayList<>();
+            while (rs.next()) {
+                Map<String, Object> row = new LinkedHashMap<>();
+                for (int i = 1; i <= columnCount; i++) {
+                    row.put(metaData.getColumnLabel(i), rs.getObject(i));
+                }
+                data.add(row);
+                logger.info(row.toString());
+            }
+            return data;
+        });
+    }
+
     public <T> T queryAndMap(String sql, ResultSetMapper<T> mapper) throws SQLException {
         Statement statement = conn.createStatement();
         ResultSet rs = null;
@@ -131,7 +151,7 @@ public class ChangeDataCaptureTest {
         String schemaName;
         String tableName;
         String captureInstance;
-        int    changeTableObjectId;
+        int changeTableObjectId;
         byte[] startLsn;
         byte[] stopLsn;
         String capturedColumns;