AE86 %!s(int64=4) %!d(string=hai) anos
pai
achega
37a1a10f80

+ 17 - 3
dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java

@@ -2,9 +2,6 @@ package org.dbsyncer.common.util;
 
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 
 
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-
 public abstract class StringUtil {
 public abstract class StringUtil {
     public StringUtil() {
     public StringUtil() {
     }
     }
@@ -22,4 +19,21 @@ public abstract class StringUtil {
         return new StringBuilder().append(Character.toLowerCase(s.charAt(0))).append(s.substring(1)).toString();
         return new StringBuilder().append(Character.toLowerCase(s.charAt(0))).append(s.substring(1)).toString();
     }
     }
 
 
+    /**
+     * Restores a byte array that is encoded as a hex string.
+     */
+    public static byte[] hexStringToByteArray(String hexString) {
+        if (hexString == null) {
+            return null;
+        }
+
+        int length = hexString.length();
+        byte[] bytes = new byte[length / 2];
+        for (int i = 0; i < length; i += 2) {
+            bytes[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4)
+                    + Character.digit(hexString.charAt(i + 1), 16));
+        }
+        return bytes;
+    }
+
 }
 }

+ 159 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/Lsn.java

@@ -0,0 +1,159 @@
+package org.dbsyncer.listener.sqlserver;
+
+import java.util.Arrays;
+
+import org.dbsyncer.common.util.StringUtil;
+
+/**
+ * SQL Server LSN(日志序列号)位置的逻辑表示, LSN不可用时为NULL。
+ *
+ * @Author AE86
+ * @Date 2021-06-04 22:25
+ */
+public class Lsn implements Comparable<Lsn> {
+    private static final String NULL_STRING = "NULL";
+
+    public static final Lsn NULL = new Lsn(null);
+
+    private final byte[] binary;
+    private int[] unsignedBinary;
+
+    private String string;
+
+    public Lsn(byte[] binary) {
+        this.binary = binary;
+    }
+
+    /**
+     * @return binary representation of the stored LSN
+     */
+    public byte[] getBinary() {
+        return binary;
+    }
+
+    /**
+     * @return true if this is a real LSN or false it it is {@code NULL}
+     */
+    public boolean isAvailable() {
+        return binary != null;
+    }
+
+    private int[] getUnsignedBinary() {
+        if (unsignedBinary != null || binary == null) {
+            return unsignedBinary;
+        }
+
+        unsignedBinary = new int[binary.length];
+        for (int i = 0; i < binary.length; i++) {
+            unsignedBinary[i] = Byte.toUnsignedInt(binary[i]);
+        }
+        return unsignedBinary;
+    }
+
+    /**
+     * @return textual representation of the stored LSN
+     */
+    public String toString() {
+        if (string != null) {
+            return string;
+        }
+        final StringBuilder sb = new StringBuilder();
+        if (binary == null) {
+            return NULL_STRING;
+        }
+        final int[] unsigned = getUnsignedBinary();
+        for (int i = 0; i < unsigned.length; i++) {
+            final String byteStr = Integer.toHexString(unsigned[i]);
+            if (byteStr.length() == 1) {
+                sb.append('0');
+            }
+            sb.append(byteStr);
+            if (i == 3 || i == 7) {
+                sb.append(':');
+            }
+        }
+        string = sb.toString();
+        return string;
+    }
+
+    /**
+     * @param lsnString - textual representation of Lsn
+     * @return LSN converted from its textual representation
+     */
+    public static Lsn valueOf(String lsnString) {
+        return (lsnString == null || NULL_STRING.equals(lsnString)) ? NULL : new Lsn(StringUtil.hexStringToByteArray(lsnString.replace(":", "")));
+    }
+
+    /**
+     * @param lsnBinary - binary representation of Lsn
+     * @return LSN converted from its binary representation
+     */
+    public static Lsn valueOf(byte[] lsnBinary) {
+        return (lsnBinary == null) ? NULL : new Lsn(lsnBinary);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + Arrays.hashCode(binary);
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        Lsn other = (Lsn) obj;
+        if (!Arrays.equals(binary, other.binary)) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Enables ordering of LSNs. The {@code NULL} LSN is always the smallest one.
+     */
+    @Override
+    public int compareTo(Lsn o) {
+        if (this == o) {
+            return 0;
+        }
+        if (!this.isAvailable()) {
+            if (!o.isAvailable()) {
+                return 0;
+            }
+            return -1;
+        }
+        if (!o.isAvailable()) {
+            return 1;
+        }
+        final int[] thisU = getUnsignedBinary();
+        final int[] thatU = o.getUnsignedBinary();
+        for (int i = 0; i < thisU.length; i++) {
+            final int diff = thisU[i] - thatU[i];
+            if (diff != 0) {
+                return diff;
+            }
+        }
+        return 0;
+    }
+
+    /**
+     * Verifies whether the LSN falls into a LSN interval
+     *
+     * @param from start of the interval (included)
+     * @param to   end of the interval (excluded)
+     * @return true if the LSN falls into the interval
+     */
+    public boolean isBetween(Lsn from, Lsn to) {
+        return this.compareTo(from) >= 0 && this.compareTo(to) < 0;
+    }
+}

+ 129 - 34
dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java

@@ -1,9 +1,11 @@
+import org.dbsyncer.listener.sqlserver.Lsn;
 import org.junit.Test;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 import java.sql.*;
 import java.sql.*;
 import java.util.*;
 import java.util.*;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
 
 
 /**
 /**
@@ -16,14 +18,32 @@ public class ChangeDataCaptureTest {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
-    private static final String GET_STATE_AGENT_SERVER = "EXEC master.dbo.xp_servicecontrol N'QUERYSTATE', N'SQLSERVERAGENT'";
+    private static final String GET_AGENT_SERVER_STATE = "EXEC master.dbo.xp_servicecontrol N'QUERYSTATE', N'SQLSERVERAGENT'";
     private static final String GET_DATABASE_NAME = "SELECT db_name()";
     private static final String GET_DATABASE_NAME = "SELECT db_name()";
-    private static final String GET_ENABLED_CDC_DATABASE = "SELECT is_cdc_enabled FROM sys.databases WHERE name = 'test'";
+    private static final String IS_CDC_ENABLED = "SELECT is_cdc_enabled FROM sys.databases WHERE name = 'test'";
+
     private static final String GET_MAX_TRANSACTION_LSN = "SELECT MAX(start_lsn) FROM cdc.lsn_time_mapping WHERE tran_id <> 0x00";
     private static final String GET_MAX_TRANSACTION_LSN = "SELECT MAX(start_lsn) FROM cdc.lsn_time_mapping WHERE tran_id <> 0x00";
+    private static final String GET_MIN_LSN = "SELECT sys.fn_cdc_get_min_lsn('#')";
+
+    private static final String LOCK_TABLE = "SELECT * FROM [#] WITH (TABLOCKX)";
+    private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT *# FROM cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
     private static final String GET_LIST_OF_CDC_ENABLED_TABLES = "EXEC sys.sp_cdc_help_change_data_capture";
     private static final String GET_LIST_OF_CDC_ENABLED_TABLES = "EXEC sys.sp_cdc_help_change_data_capture";
+    private static final String SQL_SERVER_VERSION = "SELECT @@VERSION AS 'SQL Server Version'";
+    private static final String LSN_TIMESTAMP_SELECT_STATEMENT = "sys.fn_cdc_map_lsn_to_time([__$start_lsn])";
+    private static final String AT_TIME_ZONE_UTC = "AT TIME ZONE 'UTC'";
+
+    private static final String STATEMENTS_PLACEHOLDER = "#";
     private static final Pattern BRACKET_PATTERN = Pattern.compile("[\\[\\]]");
     private static final Pattern BRACKET_PATTERN = Pattern.compile("[\\[\\]]");
 
 
-    private Connection conn = null;
+    /**
+     * 数据库的实际名称,可能与连接器配置中给定的数据库名称不同
+     */
+    private String realDatabaseName;
+    private String getAllChangesForTable;
+    private String agentState;
+    private boolean enabledCDC;
+
+    private Connection connection = null;
 
 
     /**
     /**
      * <p>cdc.captured_columns – 此表返回捕获列列表的结果。</p>
      * <p>cdc.captured_columns – 此表返回捕获列列表的结果。</p>
@@ -44,17 +64,32 @@ public class ChangeDataCaptureTest {
         cdc.start();
         cdc.start();
 
 
         // 获取数据库名 test
         // 获取数据库名 test
-        cdc.queryAndMap(GET_DATABASE_NAME);
+        realDatabaseName = cdc.queryAndMap(GET_DATABASE_NAME, rs -> rs.getString(1));
+        logger.info("数据库名:{}", realDatabaseName);
+
+        boolean supportsAtTimeZone = supportsAtTimeZone();
+        logger.info("支持时区:{}", supportsAtTimeZone);
+        getAllChangesForTable = GET_ALL_CHANGES_FOR_TABLE.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(lsnTimestampSelectStatement(supportsAtTimeZone)));
+
         // 获取Agent服务状态 Stopped. Running.
         // 获取Agent服务状态 Stopped. Running.
-        cdc.queryAndMap(GET_STATE_AGENT_SERVER);
+        agentState = cdc.queryAndMap(GET_AGENT_SERVER_STATE, rs -> rs.getString(1));
+        logger.info("Agent服务状态:{}", agentState);
         // 获取数据库CDC状态 false 0 true 1
         // 获取数据库CDC状态 false 0 true 1
-        cdc.queryAndMap(GET_ENABLED_CDC_DATABASE);
+        enabledCDC = cdc.queryAndMap(IS_CDC_ENABLED.replace(STATEMENTS_PLACEHOLDER, realDatabaseName), rs -> rs.getBoolean(1));
+        logger.info("CDC状态:{}", enabledCDC);
+
+        // 从只读复制副本读取时,始终启用默认和唯一事务隔离是快照。这意味着CDC元数据对于长时间运行的事务不可见。
+        //因此,有必要在每次读取之前重新启动事务。对于R/W数据库,执行常规提交以保持TempDB的大小是很重要的
+//        connection.commit();
+
+        // 读取LSN 00000017:0000080d:0008
+        byte[] bytes = cdc.queryAndMap(GET_MAX_TRANSACTION_LSN, rs -> rs.getBytes(1));
+        Lsn lsn = new Lsn(bytes);
+        logger.info("最新LSN:{}", lsn);
 
 
-        // 读取事务
-        cdc.queryAndMap(GET_MAX_TRANSACTION_LSN);
         // 读取增量
         // 读取增量
-        cdc.queryAndMap(GET_LIST_OF_CDC_ENABLED_TABLES, rs -> {
-            final Set<SqlServerChangeTable> changeTables = new HashSet<>();
+        Set<SqlServerChangeTable> changeTables = cdc.queryAndMapList(GET_LIST_OF_CDC_ENABLED_TABLES, rs -> {
+            final Set<SqlServerChangeTable> tables = new HashSet<>();
             while (rs.next()) {
             while (rs.next()) {
                 SqlServerChangeTable changeTable = new SqlServerChangeTable(
                 SqlServerChangeTable changeTable = new SqlServerChangeTable(
                         // schemaName
                         // schemaName
@@ -71,11 +106,15 @@ public class ChangeDataCaptureTest {
                         rs.getBytes(7),
                         rs.getBytes(7),
                         // capturedColumns
                         // capturedColumns
                         rs.getString(15));
                         rs.getString(15));
-                changeTables.add(changeTable);
+                tables.add(changeTable);
             }
             }
-            logger.info("changeTables:{} ", changeTables.size());
-            return changeTables;
+            return tables;
         });
         });
+        logger.info("监听表数:{} ", changeTables.size());
+        changeTables.forEach(t -> logger.info(t.toString()));
+
+        // Terminate the transaction otherwise CDC could not be disabled for tables
+//        connection.rollback();
 
 
         cdc.close();
         cdc.close();
     }
     }
@@ -84,9 +123,9 @@ public class ChangeDataCaptureTest {
         String username = "sa";
         String username = "sa";
         String password = "123";
         String password = "123";
         String url = "jdbc:sqlserver://127.0.0.1:1434;DatabaseName=test";
         String url = "jdbc:sqlserver://127.0.0.1:1434;DatabaseName=test";
-        conn = DriverManager.getConnection(url, username, password);
-        if (conn != null) {
-            DatabaseMetaData dm = (DatabaseMetaData) conn.getMetaData();
+        connection = DriverManager.getConnection(url, username, password);
+        if (connection != null) {
+            DatabaseMetaData dm = (DatabaseMetaData) connection.getMetaData();
             System.out.println("Driver name: " + dm.getDriverName());
             System.out.println("Driver name: " + dm.getDriverName());
             System.out.println("Driver version: " + dm.getDriverVersion());
             System.out.println("Driver version: " + dm.getDriverVersion());
             System.out.println("Product name: " + dm.getDatabaseProductName());
             System.out.println("Product name: " + dm.getDatabaseProductName());
@@ -95,8 +134,8 @@ public class ChangeDataCaptureTest {
     }
     }
 
 
     private void close() {
     private void close() {
-        if (null != conn) {
-            close(conn);
+        if (null != connection) {
+            close(connection);
         }
         }
     }
     }
 
 
@@ -104,25 +143,26 @@ public class ChangeDataCaptureTest {
         T apply(ResultSet rs) throws SQLException;
         T apply(ResultSet rs) throws SQLException;
     }
     }
 
 
-    public <T> T queryAndMap(String sql) throws SQLException {
-        return (T) queryAndMap(sql, rs -> {
-            ResultSetMetaData metaData = rs.getMetaData();
-            int columnCount = metaData.getColumnCount();
-            List<Map> data = new ArrayList<>();
-            while (rs.next()) {
-                Map<String, Object> row = new LinkedHashMap<>();
-                for (int i = 1; i <= columnCount; i++) {
-                    row.put(metaData.getColumnLabel(i), rs.getObject(i));
-                }
-                data.add(row);
-                logger.info(row.toString());
+    public <T> T queryAndMap(String sql, ResultSetMapper<T> mapper) throws SQLException {
+        Statement statement = connection.createStatement();
+        ResultSet rs = null;
+        T apply = null;
+        try {
+            rs = statement.executeQuery(sql);
+            if (rs.next()) {
+                apply = mapper.apply(rs);
             }
             }
-            return data;
-        });
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        } finally {
+            close(rs);
+            close(statement);
+        }
+        return apply;
     }
     }
 
 
-    public <T> T queryAndMap(String sql, ResultSetMapper<T> mapper) throws SQLException {
-        Statement statement = conn.createStatement();
+    public <T> T queryAndMapList(String sql, ResultSetMapper<T> mapper) throws SQLException {
+        Statement statement = connection.createStatement();
         ResultSet rs = null;
         ResultSet rs = null;
         T apply = null;
         T apply = null;
         try {
         try {
@@ -147,6 +187,49 @@ public class ChangeDataCaptureTest {
         }
         }
     }
     }
 
 
+    /**
+     * Returns the query for obtaining the LSN-to-TIMESTAMP query. On SQL Server
+     * 2016 and newer, the query will normalize the value to UTC. This means that
+     * the SERVER_TIMEZONE is not necessary to be given. The returned TIMESTAMP will
+     * be adjusted by the JDBC driver using this VM's TZ (as required by the JDBC
+     * spec), and that same TZ will be applied when converting
+     * the TIMESTAMP value into an {@code Instant}.
+     */
+    private String lsnTimestampSelectStatement(boolean supportsAtTimeZone) {
+        String result = ", " + LSN_TIMESTAMP_SELECT_STATEMENT;
+        if (supportsAtTimeZone) {
+            result += " " + AT_TIME_ZONE_UTC;
+        }
+        return result;
+    }
+
+    /**
+     * SELECT ... AT TIME ZONE only works on SQL Server 2016 and newer.
+     */
+    private boolean supportsAtTimeZone() {
+        try {
+            // Always expect the support if database is not standalone SQL Server, e.g. Azure
+            return getSqlServerVersion().orElse(Integer.MAX_VALUE) > 2016;
+        } catch (Exception e) {
+            logger.error("Couldn't obtain database server version; assuming 'AT TIME ZONE' is not supported.", e);
+            return false;
+        }
+    }
+
+    private Optional<Integer> getSqlServerVersion() {
+        try {
+            // As per https://www.mssqltips.com/sqlservertip/1140/how-to-tell-what-sql-server-version-you-are-running/
+            // Always beginning with 'Microsoft SQL Server NNNN' but only in case SQL Server is standalone
+            String version = queryAndMap(SQL_SERVER_VERSION, rs -> rs.getString(1));
+            if (!version.startsWith("Microsoft SQL Server ")) {
+                return Optional.empty();
+            }
+            return Optional.of(Integer.valueOf(version.substring(21, 25)));
+        } catch (Exception e) {
+            throw new RuntimeException("Couldn't obtain database server version", e);
+        }
+    }
+
     final class SqlServerChangeTable {
     final class SqlServerChangeTable {
         String schemaName;
         String schemaName;
         String tableName;
         String tableName;
@@ -168,6 +251,18 @@ public class ChangeDataCaptureTest {
             this.stopLsn = stopLsn;
             this.stopLsn = stopLsn;
         }
         }
 
 
+        @Override
+        public String toString() {
+            return "SqlServerChangeTable{" +
+                    "schemaName='" + schemaName + '\'' +
+                    ", tableName='" + tableName + '\'' +
+                    ", captureInstance='" + captureInstance + '\'' +
+                    ", changeTableObjectId=" + changeTableObjectId +
+                    ", startLsn=" + Arrays.toString(startLsn) +
+                    ", stopLsn=" + Arrays.toString(stopLsn) +
+                    ", capturedColumns='" + capturedColumns + '\'' +
+                    '}';
+        }
     }
     }
 
 
 }
 }