AE86 5 years ago
parent
commit
fdef6ce6cc

+ 128 - 117
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -1,168 +1,179 @@
 package org.dbsyncer.listener.oracle.dcn;
 
+import oracle.jdbc.OracleDriver;
+import oracle.jdbc.OracleStatement;
+import oracle.jdbc.dcn.*;
+import oracle.jdbc.driver.OracleConnection;
+import oracle.sql.ROWID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Properties;
-
-import oracle.jdbc.OracleConnection;
-import oracle.jdbc.OracleDriver;
-import oracle.jdbc.OracleStatement;
-import oracle.jdbc.dcn.DatabaseChangeEvent;
-import oracle.jdbc.dcn.DatabaseChangeListener;
-import oracle.jdbc.dcn.DatabaseChangeRegistration;
+import java.util.concurrent.TimeUnit;
 
 /**
+ * 授予登录账号监听事件权限
+ * <p>grant change notification to AE86
+ *
+ * <p>select regid,callback from USER_CHANGE_NOTIFICATION_REGS
+ * <p>select * from dba_change_notification_regs where username='AE86';
+ *
  * @version 1.0.0
  * @Author AE86
- * @Date 2020-06-11 00:59
+ * @Date 2020-06-08 21:53
  */
 public class DBChangeNotification {
-    static final String USERNAME = "scott";
-    static final String PASSWORD = "tiger";
-    static String URL;
-
-    public static void main(String[] argv) {
-        if (argv.length < 1) {
-            // grant change notification to scott;
-            System.out.println("Error: You need to provide the URL in the first argument.");
-            System.out.println("  For example: > java -classpath .:ojdbc5.jar DBChangeNotification \\\"jdbc:oracle:thin:\n" +
-                    "@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)(HOST=yourhost.yourdomain.com)(PORT=1521))(CONNECT_DATA=\n" +
-                    "(SERVICE_NAME=yourservicename)))\\\"");
-
-            System.exit(1);
-        }
-        URL = argv[0];
-        DBChangeNotification demo = new DBChangeNotification();
-        try {
-            demo.run();
-        } catch (SQLException mainSQLException) {
-            mainSQLException.printStackTrace();
-        }
-    }
 
-    void run() throws SQLException {
-        OracleConnection conn = connect();
+    private final Logger logger = LoggerFactory.getLogger(getClass());
 
-        // first step: create a registration on the server:
-        Properties prop = new Properties();
+    private static String username = "AE86";
+    private static String password = "123";
+    private static String url = "jdbc:oracle:thin:@127.0.0.1:1521:orcl";
+    private static String host = "127.0.0.1";
 
-        // if connected through the VPN, you need to provide the TCP address of the client.
-        // For example:
-        // prop.setProperty(OracleConnection.NTF_LOCAL_HOST,"14.14.13.12");
+    private OracleConnection conn;
+    private DatabaseChangeRegistration dcr;
 
-        // Ask the server to send the ROWIDs as part of the DCN events (small performance
-        // cost):
-        prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
+    public DBChangeNotification() throws SQLException {
+        init();
+    }
 
-        //Set the DCN_QUERY_CHANGE_NOTIFICATION option for query registration with finer granularity.
-        prop.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true");
+    public void init() throws SQLException {
+        conn = connect();
 
-        // The following operation does a roundtrip to the database to create a new
-        // registration for DCN. It sends the client address (ip address and port) that
-        // the server will use to connect to the client and send the notification
-        // when necessary. Note that for now the registration is empty (we haven't registered
-        // any table). This also opens a new thread in the drivers. This thread will be
-        // dedicated to DCN (accept connection to the server and dispatch the events to
-        // the listeners).
-        DatabaseChangeRegistration dcr = conn.registerDatabaseChangeNotification(prop);
+        // 配置监听参数
+        Properties prop = new Properties();
+        prop.setProperty(OracleConnection.NTF_TIMEOUT, "0");
+        prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
+        prop.setProperty(OracleConnection.DCN_IGNORE_UPDATEOP, "false");
+        prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, "false");
+        prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, "false");
 
         try {
-            // add the listenerr:
-            DCNDemoListener list = new DCNDemoListener(this);
-            dcr.addListener(list);
+            // add the listener:NTFDCNRegistration
+            dcr = conn.registerDatabaseChangeNotification(prop);
+
+//            int port = getPort(dcr);
+//            logger.info("当前regId:{}, port:{}", dcr.getRegId(), port);
+
+            // 清空历史注册记录:
+            // (ADDRESS=(PROTOCOL=tcp)(HOST=127.0.0.1)(PORT=47632))?PR=0
+//            cleanRegistrations(conn);
+
+            dcr.addListener(new DCNListener());
 
-            // second step: add objects in the registration:
             Statement stmt = conn.createStatement();
             // associate the statement with the registration:
             ((OracleStatement) stmt).setDatabaseChangeRegistration(dcr);
-            ResultSet rs = stmt.executeQuery("select * from dept where deptno='45'");
-            while (rs.next()) {
-            }
+            // 配置监听表
+            stmt.executeQuery("select * from \"my_user\" t where 1=2");
+            stmt.executeQuery("select * from \"my_org\" t where 1=2");
+
             String[] tableNames = dcr.getTables();
             for (int i = 0; i < tableNames.length; i++)
                 System.out.println(tableNames[i] + " is part of the registration.");
-            rs.close();
+
             stmt.close();
+            logger.info("数据库更改通知开启");
         } catch (SQLException ex) {
             // if an exception occurs, we need to close the registration in order
             // to interrupt the thread otherwise it will be hanging around.
-            if (conn != null)
+            if (conn != null) {
                 conn.unregisterDatabaseChangeNotification(dcr);
-            throw ex;
-        } finally {
-            try {
-                // Note that we close the connection!
-                conn.close();
-            } catch (Exception innerex) {
-                innerex.printStackTrace();
             }
+            throw ex;
         }
+    }
 
-        synchronized (this) {
-            // The following code modifies the dept table and commits:
-            try {
-                OracleConnection conn2 = connect();
-                conn2.setAutoCommit(false);
-                Statement stmt2 = conn2.createStatement();
-                stmt2.executeUpdate("insert into dept (deptno,dname) values ('45','cool dept')",
-                        Statement.RETURN_GENERATED_KEYS);
-                ResultSet autoGeneratedKey = stmt2.getGeneratedKeys();
-                if (autoGeneratedKey.next())
-                    System.out.println("inserted one row with ROWID=" + autoGeneratedKey.getString(1));
-                stmt2.executeUpdate("insert into dept (deptno,dname) values ('50','fun dept')", Statement.RETURN_GENERATED_KEYS);
-                autoGeneratedKey = stmt2.getGeneratedKeys();
-                if (autoGeneratedKey.next())
-                    System.out.println("inserted one row with ROWID=" + autoGeneratedKey.getString(1));
-                stmt2.close();
-                conn2.commit();
-                conn2.close();
-            } catch (SQLException ex) {
-                ex.printStackTrace();
-            }
+    private int getPort(DatabaseChangeRegistration dcr) {
+        Object obj = null;
+        try {
+            Method method = dcr.getClass().getMethod("getClientTCPPort");
+            obj = method.invoke(dcr, new Object[]{});
+        } catch (NoSuchMethodException e) {
+            e.printStackTrace();
+        } catch (IllegalAccessException e) {
+            e.printStackTrace();
+        } catch (InvocationTargetException e) {
+            e.printStackTrace();
+        }
 
-            // wait until we get the event
-            try {
-                this.wait();
-            } catch (InterruptedException ie) {
-            }
+        return null == obj ? 0 : Integer.parseInt(String.valueOf(obj));
+    }
+
+    public void close() throws SQLException {
+        if (null != conn) {
+            conn.unregisterDatabaseChangeNotification(dcr);
+            conn.close();
         }
+    }
 
-        // At the end: close the registration (comment out these 3 lines in order
-        // to leave the registration open).
-        OracleConnection conn3 = connect();
-        conn3.unregisterDatabaseChangeNotification(dcr);
-        conn3.close();
+    /**
+     * 清空历史注册
+     *
+     * @param conn
+     * @throws SQLException
+     */
+    private void cleanRegistrations(OracleConnection conn) throws SQLException {
+        Statement stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery("select regid,callback from USER_CHANGE_NOTIFICATION_REGS");
+        while (rs.next()) {
+            long regId = rs.getLong(1);
+            String callback = rs.getString(2);
+            logger.info("regid:{}, callback:{}", regId, callback);
+
+            // TODO 排除当前注册
+            conn.unregisterDatabaseChangeNotification(regId, callback);
+        }
+        rs.close();
+        stmt.close();
     }
 
     /**
      * Creates a connection the database.
      */
-    OracleConnection connect() throws SQLException {
+    private OracleConnection connect() throws SQLException {
         OracleDriver dr = new OracleDriver();
         Properties prop = new Properties();
-        prop.setProperty("user", DBChangeNotification.USERNAME);
-        prop.setProperty("password", DBChangeNotification.PASSWORD);
-        return (OracleConnection) dr.connect(DBChangeNotification.URL, prop);
+        prop.setProperty("user", username);
+        prop.setProperty("password", password);
+        return (OracleConnection) dr.connect(url, prop);
     }
-}
 
-/**
- * DCN listener: it prints out the event details in stdout.
- */
-class DCNDemoListener implements DatabaseChangeListener {
-    DBChangeNotification demo;
+    final class DCNListener implements DatabaseChangeListener {
 
-    DCNDemoListener(DBChangeNotification dem) {
-        demo = dem;
-    }
+        @Override
+        public void onDatabaseChangeNotification(DatabaseChangeEvent event) {
+            TableChangeDescription[] tds = event.getTableChangeDescription();
+            logger.info("=============================");
+
+            for (TableChangeDescription td : tds) {
+                logger.info("数据库表id:{}", td.getObjectNumber());
+                logger.info("数据表名称:{}", td.getTableName());
 
-    public void onDatabaseChangeNotification(DatabaseChangeEvent e) {
-        Thread t = Thread.currentThread();
-        System.out.println("DCNDemoListener: got an event (" + this + " running on thread " + t + ")");
-        System.out.println(e.toString());
-        synchronized (demo) {
-            demo.notify();
+                // 获得返回的行级变化描述通知 行id、影响这一行的DML操作(行是插入、更新或删除的一种)
+                RowChangeDescription[] rds = td.getRowChangeDescription();
+                for (RowChangeDescription rd : rds) {
+                    RowChangeDescription.RowOperation rowOperation = rd.getRowOperation();
+                    logger.info("数据库表行级变化:", rowOperation.toString());
+
+                    ROWID rowid = rd.getRowid();
+                    logger.info("事件:{},ROWID:{}", rowOperation.name(), rowid.stringValue());
+                }
+            }
         }
     }
-}
+
+    public static void main(String[] args) throws SQLException, InterruptedException {
+        DBChangeNotification client = new DBChangeNotification();
+        // sleep 30s
+        TimeUnit.SECONDS.sleep(120);
+//        client.close();
+    }
+
+}

+ 0 - 105
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/OracleRemoteClient.java

@@ -1,105 +0,0 @@
-package org.dbsyncer.listener.oracle.dcn;
-
-import oracle.jdbc.OracleStatement;
-import oracle.jdbc.dcn.*;
-import oracle.jdbc.driver.OracleConnection;
-import oracle.jdbc.pool.OracleDataSource;
-import oracle.sql.ROWID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.SQLException;
-import java.util.EnumSet;
-import java.util.Properties;
-
-/**
- * @version 1.0.0
- * @Author AE86
- * @Date 2020-06-08 21:53
- */
-public class OracleRemoteClient {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    private OracleConnection conn;
-    private OracleStatement statement;
-    private DatabaseChangeRegistration dcr;
-
-    public void init() throws SQLException, InterruptedException {
-        OracleDataSource dataSource = new OracleDataSource();
-        dataSource.setUser("ae86");
-        dataSource.setPassword("123");
-        dataSource.setURL("jdbc:oracle:thin:@127.0.0.1:1521:orcl");
-        conn = (OracleConnection) dataSource.getConnection();
-
-        // 配置监听参数
-        Properties prop = new Properties();
-        prop.setProperty(OracleConnection.NTF_TIMEOUT, "0");
-        prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
-        prop.setProperty(OracleConnection.DCN_IGNORE_UPDATEOP, "false");
-        prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, "false");
-        prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, "false");
-        prop.setProperty(OracleConnection.CONNECTION_PROPERTY_CREATE_DESCRIPTOR_USE_CURRENT_SCHEMA_FOR_SCHEMA_NAME_DEFAULT, "true");
-
-        statement = (OracleStatement) conn.createStatement();
-        dcr = conn.registerDatabaseChangeNotification(prop);
-        dcr.addListener(new DataBaseChangeListener());
-        statement.setDatabaseChangeRegistration(dcr);
-
-        // 监听的表
-        statement.executeQuery("select * from \"my_user\" t where 1=2");
-        statement.executeQuery("select * from \"my_org\" t where 1=2");
-
-        long regId = dcr.getRegId();
-        String[] dcrTables = dcr.getTables();
-        logger.info("regId:{}", regId);
-        logger.info("dcrTables:{}", dcrTables);
-
-        logger.info("数据库更改通知开启");
-    }
-
-    public void close() throws SQLException {
-        if (null != statement) {
-            statement.close();
-        }
-
-        if (null != conn) {
-            conn.unregisterDatabaseChangeNotification(dcr);
-            conn.close();
-        }
-    }
-
-    final class DataBaseChangeListener implements DatabaseChangeListener {
-
-        @Override
-        public void onDatabaseChangeNotification(DatabaseChangeEvent event) {
-            TableChangeDescription[] tds = event.getTableChangeDescription();
-            logger.info("=============================");
-
-            for (TableChangeDescription td : tds) {
-                EnumSet<TableChangeDescription.TableOperation> operations = td.getTableOperations();
-                operations.contains("DELETE");
-                logger.info("数据库表id:{}", td.getObjectNumber());
-                logger.info("数据表名称:{}", td.getTableName());
-
-                // 获得返回的行级变化描述通知 行id、影响这一行的DML操作(行是插入、更新或删除的一种)
-                RowChangeDescription[] rds = td.getRowChangeDescription();
-                for (RowChangeDescription rd : rds) {
-                    RowChangeDescription.RowOperation rowOperation = rd.getRowOperation();
-                    logger.info("数据库表行级变化:", rowOperation.toString());
-
-                    ROWID rowid = rd.getRowid();
-                    logger.info("事件:{},ROWID:{}", rowOperation.name(), rowid.stringValue());
-                }
-            }
-        }
-    }
-
-    public static void main(String[] args) throws SQLException, InterruptedException {
-        OracleRemoteClient client = new OracleRemoteClient();
-        client.init();
-        Thread.currentThread().join();
-//        client.close();
-    }
-
-}