1
0
AE86 5 жил өмнө
parent
commit
479f31bb81

+ 143 - 85
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -1,10 +1,12 @@
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
 package org.dbsyncer.listener.oracle.dcn;
 
 import oracle.jdbc.OracleDriver;
 import oracle.jdbc.OracleStatement;
 import oracle.jdbc.dcn.*;
 import oracle.jdbc.driver.OracleConnection;
-import oracle.sql.ROWID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -12,17 +14,17 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 /**
  * 授予登录账号监听事件权限
+ * <p>sqlplus/as sysdba
+ * <p>
  * <p>grant change notification to AE86
  *
- * <p>select regid,callback from USER_CHANGE_NOTIFICATION_REGS
- * <p>select * from dba_change_notification_regs where username='AE86';
- *
  * @version 1.0.0
  * @Author AE86
  * @Date 2020-06-08 21:53
@@ -31,113 +33,155 @@ public class DBChangeNotification {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private static String username = "AE86";
-    private static String password = "123";
-    private static String url = "jdbc:oracle:thin:@127.0.0.1:1521:orcl";
-    private static String host = "127.0.0.1";
-
-    private OracleConnection conn;
+    private static final String QUERY_ROW_DATA_SQL  = "SELECT * FROM \"%s\" WHERE ROWID = \"%s\"";
+    private static final String QUERY_TABLE_ALL_SQL = "SELECT DATA_OBJECT_ID, OBJECT_NAME FROM DBA_OBJECTS WHERE OWNER='%S' AND OBJECT_TYPE = 'TABLE'";
+    private static final String QUERY_TABLE_SQL     = "SELECT 1 FROM \"%s\" WHERE 1=2";
+    private static final String QUERY_CALLBACK_SQL  = "SELECT REGID,CALLBACK FROM USER_CHANGE_NOTIFICATION_REGS";
+    private static final String CALLBACK            = "net8://(ADDRESS=(PROTOCOL=tcp)(HOST=%s)(PORT=%s))?PR=0";
+    private static String username;
+    private static String password;
+    private static String url;
+
+    private OracleConnection           conn;
+    private OracleStatement            statement;
     private DatabaseChangeRegistration dcr;
+    private Map<Integer, String>       tables;
 
-    public DBChangeNotification() throws SQLException {
-        init();
-    }
-
-    public void init() throws SQLException {
-        conn = connect();
+    public void start() throws SQLException {
+        try {
+            conn = connect();
+            statement = (OracleStatement) conn.createStatement();
+            readTables();
 
-        // 配置监听参数
-        Properties prop = new Properties();
-        prop.setProperty(OracleConnection.NTF_TIMEOUT, "0");
-        prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
-        prop.setProperty(OracleConnection.DCN_IGNORE_UPDATEOP, "false");
-        prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, "false");
-        prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, "false");
+            Properties prop = new Properties();
+            prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
+            prop.setProperty(OracleConnection.DCN_IGNORE_UPDATEOP, "false");
+            prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, "false");
+            prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, "false");
 
-        try {
             // add the listener:NTFDCNRegistration
             dcr = conn.registerDatabaseChangeNotification(prop);
-
-//            int port = getPort(dcr);
-//            logger.info("当前regId:{}, port:{}", dcr.getRegId(), port);
-
-            // 清空历史注册记录:
-            // (ADDRESS=(PROTOCOL=tcp)(HOST=127.0.0.1)(PORT=47632))?PR=0
-//            cleanRegistrations(conn);
-
             dcr.addListener(new DCNListener());
 
-            Statement stmt = conn.createStatement();
-            // associate the statement with the registration:
-            ((OracleStatement) stmt).setDatabaseChangeRegistration(dcr);
-            // 配置监听表
-            stmt.executeQuery("select * from \"my_user\" t where 1=2");
-            stmt.executeQuery("select * from \"my_org\" t where 1=2");
+            final long regId = dcr.getRegId();
+            final String host = getHost();
+            final int port = getPort(dcr);
+            final String callback = String.format(CALLBACK, host, port);
+            logger.info("regId:{}, callback:{}", regId, callback);
+            // clean the registrations
+            clean(statement, regId, callback);
+            statement.setDatabaseChangeRegistration(dcr);
 
+            // 配置监听表
+            for (Map.Entry<Integer, String> m : tables.entrySet()) {
+                statement.executeQuery(String.format(QUERY_TABLE_SQL, m.getValue()));
+            }
             String[] tableNames = dcr.getTables();
-            for (int i = 0; i < tableNames.length; i++)
-                System.out.println(tableNames[i] + " is part of the registration.");
-
-            stmt.close();
+            int tableSize = tableNames.length;
+            for (int i = 0; i < tableSize; i++) { logger.info("{} is part of the registration.", tableNames[i]); }
             logger.info("数据库更改通知开启");
         } catch (SQLException ex) {
             // if an exception occurs, we need to close the registration in order
             // to interrupt the thread otherwise it will be hanging around.
-            if (conn != null) {
+            close();
+            throw ex;
+        }
+    }
+
+    public void close() {
+        try {
+            if (null != statement) {
+                statement.close();
+            }
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+        }
+
+        try {
+            if (null != conn) {
                 conn.unregisterDatabaseChangeNotification(dcr);
+                conn.close();
             }
-            throw ex;
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    private void readTables() throws SQLException {
+        tables = new LinkedHashMap<>();
+        ResultSet rs = null;
+        try {
+            String sql = String.format(QUERY_TABLE_ALL_SQL, username);
+            rs = statement.executeQuery(sql);
+            while (rs.next()) {
+                int tableId = rs.getInt(1);
+                String tableName = rs.getString(2);
+                tables.put(tableId, tableName);
+            }
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+        } finally {
+            if (null != rs) {
+                rs.close();
+            }
+        }
+    }
+
+    private String getHost() {
+        if (url != null) {
+            String host = url.substring(url.indexOf("@") + 1);
+            host = host.substring(0, host.indexOf(":"));
+            return host;
         }
+        return "127.0.0.1";
     }
 
     private int getPort(DatabaseChangeRegistration dcr) {
         Object obj = null;
         try {
-            Method method = dcr.getClass().getMethod("getClientTCPPort");
-            obj = method.invoke(dcr, new Object[]{});
+            // 反射获取抽象属性 NTFRegistration
+            Class clazz = dcr.getClass().getSuperclass();
+            Method method = clazz.getDeclaredMethod("getClientTCPPort");
+            method.setAccessible(true);
+            obj = method.invoke(dcr, new Object[] {});
         } catch (NoSuchMethodException e) {
-            e.printStackTrace();
+            logger.error(e.getMessage());
         } catch (IllegalAccessException e) {
-            e.printStackTrace();
+            logger.error(e.getMessage());
         } catch (InvocationTargetException e) {
-            e.printStackTrace();
+            logger.error(e.getMessage());
         }
-
         return null == obj ? 0 : Integer.parseInt(String.valueOf(obj));
     }
 
-    public void close() throws SQLException {
-        if (null != conn) {
-            conn.unregisterDatabaseChangeNotification(dcr);
-            conn.close();
-        }
-    }
-
-    /**
-     * 清空历史注册
-     *
-     * @param conn
-     * @throws SQLException
-     */
-    private void cleanRegistrations(OracleConnection conn) throws SQLException {
-        Statement stmt = conn.createStatement();
-        ResultSet rs = stmt.executeQuery("select regid,callback from USER_CHANGE_NOTIFICATION_REGS");
-        while (rs.next()) {
-            long regId = rs.getLong(1);
-            String callback = rs.getString(2);
-            logger.info("regid:{}, callback:{}", regId, callback);
-
-            // TODO 排除当前注册
-            conn.unregisterDatabaseChangeNotification(regId, callback);
+    private void clean(OracleStatement statement, long excludeRegId, String excludeCallback) throws SQLException {
+        ResultSet rs = null;
+        try {
+            rs = statement.executeQuery(QUERY_CALLBACK_SQL);
+            while (rs.next()) {
+                long regId = rs.getLong(1);
+                String callback = rs.getString(2);
+
+                if (regId != excludeRegId && callback.equals(excludeCallback)) {
+                    logger.info("Clean regid:{}, callback:{}", regId, callback);
+                    conn.unregisterDatabaseChangeNotification(regId, callback);
+                }
+            }
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+        } finally {
+            if (null != rs) {
+                rs.close();
+            }
         }
-        rs.close();
-        stmt.close();
     }
 
-    /**
-     * Creates a connection the database.
-     */
     private OracleConnection connect() throws SQLException {
+        // 加载连接参数
+        username = "AE86".toUpperCase();
+        password = "123".toUpperCase();
+        url = "jdbc:oracle:thin:@127.0.0.1:1521:xe";
+
         OracleDriver dr = new OracleDriver();
         Properties prop = new Properties();
         prop.setProperty("user", username);
@@ -153,27 +197,41 @@ public class DBChangeNotification {
             logger.info("=============================");
 
             for (TableChangeDescription td : tds) {
+                /**
+                 * Select userenv('language') from dual;
+                 * >SIMPLIFIED CHINESE_CHINA.AL32UTF8
+                 *
+                 * select * from V$NLS_PARAMETERS
+                 * >AL32UTF8
+                 *
+                 */
+                // oracle 11g XE 响应编码:AL32UTF8
                 logger.info("数据库表id:{}", td.getObjectNumber());
                 logger.info("数据表名称:{}", td.getTableName());
+                logger.info("数据表名称:{}", tables.get(td.getObjectNumber()));
 
                 // 获得返回的行级变化描述通知 行id、影响这一行的DML操作(行是插入、更新或删除的一种)
                 RowChangeDescription[] rds = td.getRowChangeDescription();
                 for (RowChangeDescription rd : rds) {
-                    RowChangeDescription.RowOperation rowOperation = rd.getRowOperation();
-                    logger.info("数据库表行级变化:", rowOperation.toString());
-
-                    ROWID rowid = rd.getRowid();
-                    logger.info("事件:{},ROWID:{}", rowOperation.name(), rowid.stringValue());
+                    RowChangeDescription.RowOperation opr = rd.getRowOperation();
+                    parseEvent(opr.name(), rd.getRowid().stringValue(), tables.get(td.getObjectNumber()));
                 }
             }
         }
+
+        private void parseEvent(String event, String rowId, String tableName){
+            logger.info("event:{}, rowid:{}, tableName:{}", event, rowId, tableName);
+            // QUERY_ROW_DATA_SQL
+
+        }
     }
 
     public static void main(String[] args) throws SQLException, InterruptedException {
         DBChangeNotification client = new DBChangeNotification();
+        client.start();
         // sleep 30s
         TimeUnit.SECONDS.sleep(120);
-//        client.close();
+        client.close();
     }
 
 }