穿云 il y a 3 mois
Parent
commit
cdc24c495c

+ 48 - 81
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java

@@ -19,7 +19,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 /**
  * @Author AE86
@@ -35,10 +34,7 @@ public class LogMiner {
     private final String url;
     private final String schema;
     private final String driverClassName;
-    private final String miningStrategy = "DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG";
-    /** LogMiner执行查询SQL的超时参数,单位秒 */
     private final int queryTimeout = 300;
-    /** LogMiner从v$logmnr_contents视图中批量拉取条数,值越大,消费存量数据越快 */
     private final int fetchSize = 1000;
     private volatile boolean connected = false;
     private Connection connection;
@@ -65,6 +61,7 @@ public class LogMiner {
     }
 
     private void closeQuietly() {
+        LogMinerHelper.endLogMiner(connection);
         if (null != worker && !worker.isInterrupted()) {
             worker.interrupt();
             worker = null;
@@ -95,18 +92,39 @@ public class LogMiner {
         return DatabaseUtil.getConnection(driverClassName, url, username, password);
     }
 
+    private Connection validateConnection() throws SQLException {
+        Connection conn = null;
+        try {
+            conn = DatabaseUtil.getConnection(driverClassName, url, username, password);
+            LogMinerHelper.setSessionParameter(conn);
+            int version = conn.getMetaData().getDatabaseMajorVersion();
+            // 19支持cdb模式
+            if (version == 19) {
+                LogMinerHelper.setSessionContainerIfCdbMode(conn);
+            }
+            // 低于10不支持
+            else if (version < 10) {
+                throw new IllegalArgumentException(String.format("Unsupported database version: %d(current) < 10", version));
+            }
+            // 检查账号权限
+            LogMinerHelper.checkPermissions(conn, version);
+        } catch (Exception e) {
+            close(conn);
+            throw e;
+        }
+        return conn;
+    }
+
     private void connect() throws SQLException {
-        this.connection = createConnection();
+        this.connection = validateConnection();
         // 判断是否第一次读取
         if (startScn == 0) {
-            startScn = getCurrentScn(connection);
+            startScn = LogMinerHelper.getCurrentScn(connection);
+            restartLogMiner(startScn);
+        } else {
+            restartLogMiner(LogMinerHelper.getCurrentScn(connection));
         }
-        logger.info("start LogMiner, scn={}", startScn);
-        LogMinerHelper.setSessionParameter(connection);
-        // 1.记录当前redoLog,用于下文判断redoLog 是否切换
-        currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
-        // 2.构建数据字典 && add redo / archived log
-        initializeLogMiner();
+        logger.info("Start log miner, scn={}", startScn);
         worker = new Worker();
         worker.setName("log-miner-parser-" + url + "_" + worker.hashCode());
         worker.setDaemon(false);
@@ -138,21 +156,9 @@ public class LogMiner {
         }
     }
 
-    public long getCurrentScn(Connection connection) throws SQLException {
-        try (Statement statement = connection.createStatement()) {
-            ResultSet rs = statement.executeQuery("select CURRENT_SCN from V$DATABASE");
-
-            if (!rs.next()) {
-                throw new IllegalStateException("Couldn't get SCN");
-            }
-
-            return rs.getLong(1);
-        }
-    }
-
-    private void restartLogMiner() throws SQLException {
-        LogMinerHelper.endLogMiner(connection);
-        initializeLogMiner();
+    private void restartLogMiner(long endScn) throws SQLException {
+        LogMinerHelper.startLogMiner(connection, startScn, endScn);
+        currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
     }
 
     private boolean redoLogSwitchOccurred() throws SQLException {
@@ -164,41 +170,6 @@ public class LogMiner {
         return false;
     }
 
-    private BigInteger determineEndScn() throws SQLException {
-        return BigInteger.valueOf(getCurrentScn(connection));
-    }
-
-    private void initializeLogMiner() throws SQLException {
-        // 默认使用在线数据字典,所以此处不做数据字典相关操作
-        LogMinerHelper.buildDataDictionary(connection, miningStrategy);
-
-        setRedoLog();
-    }
-
-    private void setRedoLog() throws SQLException {
-        LogMinerHelper.removeLogFilesFromMining(connection);
-        List<LogFile> onlineLogFiles = LogMinerHelper.getOnlineLogFilesForOffsetScn(connection, BigInteger.valueOf(startScn));
-        List<LogFile> archivedLogFiles = LogMinerHelper.getArchivedLogFilesForOffsetScn(connection, BigInteger.valueOf(startScn));
-        List<String> logFilesNames = archivedLogFiles.stream().map(LogFile::getFileName).collect(Collectors.toList());
-        for (LogFile onlineLogFile : onlineLogFiles) {
-            boolean found = false;
-            for (LogFile archivedLogFile : archivedLogFiles) {
-                if (onlineLogFile.isSameRange(archivedLogFile)) {
-                    // 如果redo 已经被归档,那么就不需要加载这个redo了
-                    found = true;
-                    break;
-                }
-            }
-            if (!found)
-                logFilesNames.add(onlineLogFile.getFileName());
-        }
-
-        // 加载所需要的redo / archived
-        for (String fileName : logFilesNames) {
-            LogMinerHelper.addLogFile(connection, fileName);
-        }
-    }
-
     private void logMinerViewProcessor(ResultSet rs) throws SQLException {
         while (rs.next()) {
             BigInteger scn = rs.getBigDecimal("SCN").toBigInteger();
@@ -350,41 +321,37 @@ public class LogMiner {
                         connection = createConnection();
                     }
                     closeResources(rs, statement);
-                    statement = connection.prepareStatement(minerViewQuery, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
-                    statement.setFetchSize(fetchSize);
-                    statement.setFetchDirection(ResultSet.FETCH_FORWARD);
-                    statement.setQueryTimeout(queryTimeout);
                     // 1.确定 endScn
-                    BigInteger endScn = determineEndScn();
+                    long endScn = LogMinerHelper.getCurrentScn(connection);
 
                     // 2.是否发生redoLog切换
                     if (redoLogSwitchOccurred()) {
-                        // 如果切换则重启logMiner会话
-                        restartLogMiner();
-                        currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
+                        logger.info("Switch to new redo log");
+                        restartLogMiner(endScn);
                     }
 
-                    // 3.start logMiner
-                    LogMinerHelper.startLogMiner(connection, BigInteger.valueOf(startScn), endScn, miningStrategy);
-
-                    // 4.查询 logMiner view, 处理结果集
+                    // 3.查询 logMiner view, 处理结果集
+                    statement = connection.prepareStatement(minerViewQuery, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
+                    statement.setFetchSize(fetchSize);
+                    statement.setFetchDirection(ResultSet.FETCH_FORWARD);
+                    statement.setQueryTimeout(queryTimeout);
                     statement.setString(1, String.valueOf(startScn));
-                    statement.setString(2, endScn.toString());
+                    statement.setString(2, String.valueOf(endScn));
                     try {
                         rs = statement.executeQuery();
                         logMinerViewProcessor(rs);
                     } catch (SQLException e) {
                         if (e.getMessage().contains("ORA-00310")) {
-                            logger.error("ORA-00310 try continue");
-                            restartLogMiner();
-                            currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection);
+                            logger.info("ORA-00310 restart log miner");
+                            LogMinerHelper.endLogMiner(connection);
+                            restartLogMiner(endScn);
                             continue;
                         }
                         throw e;
                     }
-                    // 5.确定新的SCN
-                    startScn = Long.parseLong(endScn.toString());
-                    sleepSeconds(3);
+                    // 4.确定新的SCN
+                    startScn = endScn;
+                    sleepSeconds(1);
                 }
             } catch (Exception e) {
                 if (connected) {

+ 137 - 131
dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMinerHelper.java

@@ -3,7 +3,9 @@
  */
 package org.dbsyncer.connector.oracle.logminer;
 
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.oracle.OracleException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -15,10 +17,10 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
-import java.util.LinkedHashSet;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 
 /**
  * @Author AE86
@@ -26,7 +28,7 @@ import java.util.Set;
  * @Date 2023-12-09 20:23
  */
 public class LogMinerHelper {
-    private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerHelper.class);
+    private static final Logger logger = LoggerFactory.getLogger(LogMinerHelper.class);
     public static final int LOG_MINER_OC_INSERT = 1;
     public static final int LOG_MINER_OC_DELETE = 2;
     public static final int LOG_MINER_OC_UPDATE = 3;
@@ -34,21 +36,46 @@ public class LogMinerHelper {
     public static final int LOG_MINER_OC_COMMIT = 7;
     public static final int LOG_MINER_OC_MISSING_SCN = 34;
     public static final int LOG_MINER_OC_ROLLBACK = 36;
-
-    public static void removeLogFilesFromMining(Connection conn) throws SQLException {
-        try (PreparedStatement ps = conn.prepareStatement("SELECT FILENAME AS NAME FROM V$LOGMNR_LOGS");
-             ResultSet result = ps.executeQuery()) {
-            Set<String> files = new LinkedHashSet<>();
-            while (result.next()) {
-                files.add(result.getString(1));
-            }
-            for (String fileName : files) {
-                String sql = String.format("BEGIN SYS.DBMS_LOGMNR.REMOVE_LOGFILE(LOGFILENAME => '%s');END;", fileName);
-                executeCallableStatement(conn, sql);
-                LOGGER.debug("File {} was removed from mining", fileName);
-            }
-        }
-    }
+    private static final String LOG_MINER_SQL_QUERY_ROLES = "SELECT * FROM USER_ROLE_PRIVS";
+    private static final String LOG_MINER_KEY_GRANTED_ROLE = "GRANTED_ROLE";
+    private static final String LOG_MINER_SQL_QUERY_PRIVILEGES = "SELECT * FROM SESSION_PRIVS";
+    private static final String LOG_MINER_KEY_PRIVILEGE = "PRIVILEGE";
+    private static final List<String> LOG_MINER_PRIVILEGES_NEEDED = Arrays.asList("EXECUTE_CATALOG_ROLE", "CREATE SESSION", "SELECT ANY TRANSACTION", "SELECT ANY DICTIONARY", "LOGMINING");
+    private static final List<String> LOG_MINER_ORACLE_11_PRIVILEGES_NEEDED = Arrays.asList("EXECUTE_CATALOG_ROLE", "CREATE SESSION", "SELECT ANY TRANSACTION", "SELECT ANY DICTIONARY");
+    private static final String LOG_MINER_DBA_ROLE = "DBA";
+    private static final String LOG_MINER_SQL_GET_CURRENT_SCN = "select CURRENT_SCN from V$DATABASE";
+    private static final String LOG_MINER_SQL_IS_CDB = "select cdb from v$database";
+    private static final String LOG_MINER_SQL_ALTER_SESSION_CONTAINER = "alter session set container=CDB$ROOT";
+    private static final String LOG_MINER_SQL_ALTER_NLS_SESSION_PARAMETERS = "ALTER SESSION SET "
+            + "  NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS'"
+            + "  NLS_TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF'"
+            + "  NLS_TIMESTAMP_TZ_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'"
+            + "  NLS_NUMERIC_CHARACTERS = '.,'"
+            + "  TIME_ZONE = '00:00'";
+    private static final String LOG_MINER_SQL_CURRENT_REDO_SEQUENCE = "SELECT SEQUENCE# FROM V$LOG WHERE STATUS = 'CURRENT'";
+    private static final String LOG_MINER_SQL_END_LOG_MINER = "BEGIN SYS.DBMS_LOGMNR.END_LOGMNR(); END;";
+    private static final String LOG_MINER_SQL_START_LOG_MINER = "DECLARE\n" +
+            "start_scn NUMBER := ?; end_scn NUMBER := ?; first_file BOOLEAN := true; \n" +
+            "BEGIN \n" +
+            "FOR log_file IN\n" +
+            " (\n" +
+            "  SELECT MIN(name) name, first_change# FROM \n" +
+            "  (\n" +
+            "   SELECT member AS name, first_change# FROM v$log l INNER JOIN v$logfile f ON l.group# = f.group# WHERE (l.STATUS = 'CURRENT' OR l.STATUS = 'ACTIVE') AND first_change# < end_scn\n" +
+            "   UNION\n" +
+            "   SELECT name, first_change# FROM v$archived_log WHERE name IS NOT NULL AND STANDBY_DEST='NO' AND first_change# < end_scn AND next_change# > start_scn \n" +
+            "  ) group by first_change# ORDER BY first_change# \n" +
+            " ) LOOP \n" +
+            " IF first_file THEN\n" +
+            "  SYS.DBMS_LOGMNR.add_logfile(log_file.name, SYS.DBMS_LOGMNR.NEW);\n" +
+            "  first_file := false;\n" +
+            " ELSE\n" +
+            "  SYS.DBMS_LOGMNR.add_logfile(log_file.name, SYS.DBMS_LOGMNR.ADDFILE);\n" +
+            " END IF;\n" +
+            "END LOOP;\n" +
+            "\n" +
+            "SYS.DBMS_LOGMNR.start_logmnr( options => SYS.DBMS_LOGMNR.SKIP_CORRUPTION + SYS.DBMS_LOGMNR.NO_SQL_DELIMITER + SYS.DBMS_LOGMNR.NO_ROWID_IN_STMT + SYS.DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG);\n" +
+            "END;";
 
     public static void executeCallableStatement(Connection connection, String statement) throws SQLException {
         Objects.requireNonNull(statement);
@@ -57,117 +84,29 @@ public class LogMinerHelper {
         }
     }
 
-    public static List<LogFile> getOnlineLogFilesForOffsetScn(Connection connection, BigInteger offsetScn) throws SQLException {
-        List<LogFile> redoLogFiles = new ArrayList<>();
-        String onlineLogQuery = "SELECT MIN(F.MEMBER) AS FILE_NAME, L.STATUS, L.FIRST_CHANGE# AS FIRST_CHANGE, L.NEXT_CHANGE# AS NEXT_CHANGE FROM V$LOG L, V$LOGFILE F WHERE F.GROUP# = L.GROUP# AND L.NEXT_CHANGE# > 0 GROUP BY F.GROUP#, L.NEXT_CHANGE#, L.FIRST_CHANGE#, L.STATUS";
-        try (PreparedStatement s = connection.prepareStatement(onlineLogQuery)) {
-            try (ResultSet rs = s.executeQuery()) {
-                while (rs.next()) {
-                    String fileName = rs.getString(1);// FILE_NAME
-                    String status = rs.getString(2); // STATUS
-                    BigInteger firstChangeNumber = new BigInteger(rs.getString(3));//FIRST_CHANGE
-                    BigInteger nextChangeNumber = new BigInteger(rs.getString(4));//NEXT_CHANGE
-                    LogFile logFile = new LogFile(fileName, firstChangeNumber, nextChangeNumber, "CURRENT".equalsIgnoreCase(status));
-                    // 添加Current Redo || scn 范围符合的
-                    if (logFile.isCurrent() || logFile.getNextScn().compareTo(offsetScn) >= 0) {
-                        redoLogFiles.add(logFile);
-                    }
-                }
-            }
-        }
-        return redoLogFiles;
-    }
-
-    public static List<LogFile> get10GOnlineLogFilesForOffsetScn(Connection connection, BigInteger offsetScn) throws SQLException {
-        List<LogFile> redoLogFiles = new ArrayList<>();
-        String onlineLogQuery = "SELECT MIN(F.MEMBER) AS FILE_NAME, L.STATUS, L.FIRST_CHANGE# AS FIRST_CHANGE FROM V$LOG L, V$LOGFILE F WHERE F.GROUP# = L.GROUP# GROUP BY F.GROUP#, L.FIRST_CHANGE#, L.STATUS";
-        try (PreparedStatement s = connection.prepareStatement(onlineLogQuery)) {
-            try (ResultSet rs = s.executeQuery()) {
-                while (rs.next()) {
-                    String fileName = rs.getString(1);// FILE_NAME
-                    String status = rs.getString(2); // STATUS
-                    BigInteger firstChangeNumber = new BigInteger(rs.getString(3));//FIRST_CHANGE
-                    LogFile logFile = new LogFile(fileName, firstChangeNumber, null, "CURRENT".equalsIgnoreCase(status));
-                    // 添加Current Redo || scn 范围符合的
-                    if (logFile.isCurrent() || logFile.getFirstScn().compareTo(offsetScn) >= 0) {
-                        redoLogFiles.add(logFile);
-                    }
-                }
-            }
-        }
-        return redoLogFiles;
-    }
-
-    public static List<LogFile> getArchivedLogFilesForOffsetScn(Connection connection, BigInteger offsetScn) throws SQLException {
-        String archiveLogsQuery = String.format("SELECT NAME AS FILE_NAME, NEXT_CHANGE# AS NEXT_CHANGE, FIRST_CHANGE# AS FIRST_CHANGE FROM V$ARCHIVED_LOG " +
-                "WHERE NAME IS NOT NULL AND ARCHIVED = 'YES' " +
-                "AND STATUS = 'A' AND NEXT_CHANGE# > %s ORDER BY 2", offsetScn);
-
-        final List<LogFile> archiveLogFiles = new ArrayList<>();
-        try (PreparedStatement s = connection.prepareStatement(archiveLogsQuery)) {
-            try (ResultSet rs = s.executeQuery()) {
-                while (rs.next()) {
-                    String fileName = rs.getString(1);
-                    BigInteger firstChangeNumber = new BigInteger(rs.getString(3));
-                    BigInteger nextChangeNumber = new BigInteger(rs.getString(2));
-                    archiveLogFiles.add(new LogFile(fileName, firstChangeNumber, nextChangeNumber, false));
-                }
-            }
-        }
-        return archiveLogFiles;
-    }
-
-    public static void addLogFile(Connection connection, String fileName) throws SQLException {
-        String addLogFile = "BEGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => '%s', OPTIONS => DBMS_LOGMNR.ADDFILE);END;";
-        executeCallableStatement(connection, String.format(addLogFile, fileName));
-    }
-
     public static List<BigInteger> getCurrentRedoLogSequences(Connection connection) throws SQLException {
-        String currentRedoSequence = "SELECT SEQUENCE# FROM V$LOG WHERE STATUS = 'CURRENT'";
-        try (Statement statement = connection.createStatement();
-             ResultSet rs = statement.executeQuery(currentRedoSequence)) {
-            List<BigInteger> sequences = new ArrayList<>();
-            if (rs.next()) {
-                sequences.add(new BigInteger(rs.getString(1)));
+        try (Statement statement = connection.createStatement()) {
+            try (ResultSet rs = statement.executeQuery(LOG_MINER_SQL_CURRENT_REDO_SEQUENCE)) {
+                List<BigInteger> sequences = new ArrayList<>();
+                if (rs.next()) {
+                    sequences.add(new BigInteger(rs.getString(1)));
+                }
+                // 如果是RAC则会返回多个SEQUENCE
+                return sequences;
             }
-            // 如果是RAC则会返回多个SEQUENCE
-            return sequences;
-        }
-    }
-
-    public static void buildDataDictionary(Connection connection, String miningStrategy) throws SQLException {
-        if (StringUtil.isBlank(miningStrategy)) {
-            // default
-            String sql = "BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;";
-            executeCallableStatement(connection, sql);
-        }
-    }
-
-    public static void startLogMiner(Connection connection, BigInteger startScn, BigInteger endScn, String miningStrategy) throws SQLException {
-        LOGGER.debug("startLogMiner... startScn {}, endScn {}", startScn, endScn);
-        // default
-        if (StringUtil.isBlank(miningStrategy)) {
-            miningStrategy = "DBMS_LOGMNR.DICT_FROM_REDO_LOGS + DBMS_LOGMNR.DDL_DICT_TRACKING ";
         }
-
-        String startLogMiner = "BEGIN sys.dbms_logmnr.start_logmnr(" +
-                "startScn => '" + startScn + "', " +
-                "endScn => '" + endScn + "', " +
-                "OPTIONS => " + miningStrategy +
-                " + DBMS_LOGMNR.NO_ROWID_IN_STMT);" +
-                "END;";
-
-        executeCallableStatement(connection, startLogMiner);
     }
 
     public static void endLogMiner(Connection connection) {
-        try {
-            executeCallableStatement(connection, "BEGIN SYS.DBMS_LOGMNR.END_LOGMNR(); END;");
-        } catch (SQLException e) {
-            if (e.getMessage().toUpperCase().contains("ORA-01307")) {
-                LOGGER.info("LogMiner session was already closed");
-            } else {
-                LOGGER.error("Cannot close LogMiner session gracefully: {}", e);
+        if (connection != null) {
+            try {
+                executeCallableStatement(connection, LOG_MINER_SQL_END_LOG_MINER);
+            } catch (SQLException e) {
+                if (e.getMessage().toUpperCase().contains("ORA-01307")) {
+                    logger.info("LogMiner session was already closed");
+                } else {
+                    logger.warn("Cannot close log miner session gracefully", e);
+                }
             }
         }
     }
@@ -215,14 +154,81 @@ public class LogMinerHelper {
     }
 
     public static void setSessionParameter(Connection connection) throws SQLException {
-        String sql = "ALTER SESSION SET "
-                + "  NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS'"
-                + "  NLS_TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF'"
-                + "  NLS_TIMESTAMP_TZ_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'"
-                + "  NLS_NUMERIC_CHARACTERS = '.,'";
-
-        executeCallableStatement(connection, sql);
-        executeCallableStatement(connection, "ALTER SESSION SET TIME_ZONE = '00:00'");
+        executeCallableStatement(connection, LOG_MINER_SQL_ALTER_NLS_SESSION_PARAMETERS);
     }
 
+    public static void startLogMiner(Connection connection, long startScn, long endScn) throws SQLException {
+        try (PreparedStatement logMinerStartStmt = connection.prepareCall(LOG_MINER_SQL_START_LOG_MINER)) {
+            logMinerStartStmt.setString(1, String.valueOf(startScn));
+            logMinerStartStmt.setString(2, String.valueOf(endScn));
+            logMinerStartStmt.execute();
+        }
+    }
+
+    public static long getCurrentScn(Connection connection) throws SQLException {
+        try (Statement statement = connection.createStatement()) {
+            try (ResultSet rs = statement.executeQuery(LOG_MINER_SQL_GET_CURRENT_SCN)) {
+                if (!rs.next()) {
+                    throw new IllegalStateException("Couldn't get SCN");
+                }
+                return rs.getLong(1);
+            }
+        }
+    }
+
+    public static void setSessionContainerIfCdbMode(Connection connection) throws SQLException {
+        try (Statement statement = connection.createStatement()) {
+            try (ResultSet rs = statement.executeQuery(LOG_MINER_SQL_IS_CDB)) {
+                rs.next();
+                // cdb模式 需要切换到根容器
+                if (rs.getString(1).equalsIgnoreCase("YES")) {
+                    try (PreparedStatement ps = connection.prepareStatement(LOG_MINER_SQL_ALTER_SESSION_CONTAINER)) {
+                        try {
+                            ps.execute();
+                        } catch (SQLException e) {
+                            throw new OracleException(String.format("sql=%s error=%s", LOG_MINER_SQL_ALTER_SESSION_CONTAINER, e.getMessage()));
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private static List<String> queryList(Connection connection, String querySql, String key) throws SQLException {
+        List<String> list = new ArrayList<>();
+        try (Statement statement = connection.createStatement()) {
+            try (ResultSet rs = statement.executeQuery(querySql)) {
+                while (rs.next()) {
+                    String k = rs.getString(key);
+                    if (StringUtil.isNotBlank(k)) {
+                        list.add(k.toUpperCase());
+                    }
+                }
+            }
+        }
+        return list;
+    }
+
+    public static void checkPermissions(Connection connection, int version) throws SQLException {
+        List<String> roles = queryList(connection, LOG_MINER_SQL_QUERY_ROLES, LOG_MINER_KEY_GRANTED_ROLE);
+        if (CollectionUtils.isEmpty(roles)) {
+            throw new RuntimeException("No permissions");
+        }
+
+        // DBA
+        if (roles.contains(LOG_MINER_DBA_ROLE)) {
+            return;
+        }
+
+        List<String> privileges = queryList(connection, LOG_MINER_SQL_QUERY_PRIVILEGES, LOG_MINER_KEY_PRIVILEGE);
+        if (CollectionUtils.isEmpty(privileges)) {
+            throw new RuntimeException("No permissions");
+        }
+        List<String> checkPrivileges = version <= 11 ? LOG_MINER_ORACLE_11_PRIVILEGES_NEEDED : LOG_MINER_PRIVILEGES_NEEDED;
+        long count = privileges.stream().filter(checkPrivileges::contains).count();
+        if (count != checkPrivileges.size()) {
+            String log = StringUtil.join(Collections.singleton(checkPrivileges), StringUtil.COMMA);
+            throw new IllegalArgumentException(String.format("No permission, please execute sql authorization:GRANT %s TO USER_ROLE;", log));
+        }
+    }
 }

+ 9 - 12
dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/IncrementPuller.java

@@ -93,10 +93,8 @@ public final class IncrementPuller extends AbstractPuller implements Application
                     meta.setEndTime(now);
                     profileComponent.editConfigModel(meta);
                     tableGroupContext.put(mapping, list);
-                    Listener listener = getListener(mapping, connector, list, meta);
-                    listener.start();
-                    return listener;
-                });
+                    return getListener(mapping, connector, list, meta);
+                }).start();
             } catch (Exception e) {
                 close(metaId);
                 logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
@@ -110,15 +108,14 @@ public final class IncrementPuller extends AbstractPuller implements Application
 
     @Override
     public void close(String metaId) {
-        Listener listener = map.get(metaId);
-        if (null != listener) {
-            bufferActuatorRouter.unbind(metaId);
+        map.computeIfPresent(metaId, (k, listener)->{
             listener.close();
-        }
-        map.remove(metaId);
-        publishClosedEvent(metaId);
-        tableGroupContext.clear(metaId);
-        logger.info("关闭成功:{}", metaId);
+            bufferActuatorRouter.unbind(metaId);
+            tableGroupContext.clear(metaId);
+            publishClosedEvent(metaId);
+            logger.info("关闭成功:{}", metaId);
+            return null;
+        });
     }
 
     @Override