AE86 5 anni fa
parent
commit
308cc9f83d

+ 168 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -0,0 +1,168 @@
+package org.dbsyncer.listener.oracle.dcn;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import oracle.jdbc.OracleConnection;
+import oracle.jdbc.OracleDriver;
+import oracle.jdbc.OracleStatement;
+import oracle.jdbc.dcn.DatabaseChangeEvent;
+import oracle.jdbc.dcn.DatabaseChangeListener;
+import oracle.jdbc.dcn.DatabaseChangeRegistration;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-06-11 00:59
+ */
+public class DBChangeNotification {
+    static final String USERNAME = "scott";
+    static final String PASSWORD = "tiger";
+    static String URL;
+
+    public static void main(String[] argv) {
+        if (argv.length < 1) {
+            // grant change notification to scott;
+            System.out.println("Error: You need to provide the URL in the first argument.");
+            System.out.println("  For example: > java -classpath .:ojdbc5.jar DBChangeNotification \\\"jdbc:oracle:thin:\n" +
+                    "@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)(HOST=yourhost.yourdomain.com)(PORT=1521))(CONNECT_DATA=\n" +
+                    "(SERVICE_NAME=yourservicename)))\\\"");
+
+            System.exit(1);
+        }
+        URL = argv[0];
+        DBChangeNotification demo = new DBChangeNotification();
+        try {
+            demo.run();
+        } catch (SQLException mainSQLException) {
+            mainSQLException.printStackTrace();
+        }
+    }
+
+    void run() throws SQLException {
+        OracleConnection conn = connect();
+
+        // first step: create a registration on the server:
+        Properties prop = new Properties();
+
+        // if connected through the VPN, you need to provide the TCP address of the client.
+        // For example:
+        // prop.setProperty(OracleConnection.NTF_LOCAL_HOST,"14.14.13.12");
+
+        // Ask the server to send the ROWIDs as part of the DCN events (small performance
+        // cost):
+        prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
+
+        //Set the DCN_QUERY_CHANGE_NOTIFICATION option for query registration with finer granularity.
+        prop.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true");
+
+        // The following operation does a roundtrip to the database to create a new
+        // registration for DCN. It sends the client address (ip address and port) that
+        // the server will use to connect to the client and send the notification
+        // when necessary. Note that for now the registration is empty (we haven't registered
+        // any table). This also opens a new thread in the drivers. This thread will be
+        // dedicated to DCN (accept connection to the server and dispatch the events to
+        // the listeners).
+        DatabaseChangeRegistration dcr = conn.registerDatabaseChangeNotification(prop);
+
+        try {
+            // add the listenerr:
+            DCNDemoListener list = new DCNDemoListener(this);
+            dcr.addListener(list);
+
+            // second step: add objects in the registration:
+            Statement stmt = conn.createStatement();
+            // associate the statement with the registration:
+            ((OracleStatement) stmt).setDatabaseChangeRegistration(dcr);
+            ResultSet rs = stmt.executeQuery("select * from dept where deptno='45'");
+            while (rs.next()) {
+            }
+            String[] tableNames = dcr.getTables();
+            for (int i = 0; i < tableNames.length; i++)
+                System.out.println(tableNames[i] + " is part of the registration.");
+            rs.close();
+            stmt.close();
+        } catch (SQLException ex) {
+            // if an exception occurs, we need to close the registration in order
+            // to interrupt the thread otherwise it will be hanging around.
+            if (conn != null)
+                conn.unregisterDatabaseChangeNotification(dcr);
+            throw ex;
+        } finally {
+            try {
+                // Note that we close the connection!
+                conn.close();
+            } catch (Exception innerex) {
+                innerex.printStackTrace();
+            }
+        }
+
+        synchronized (this) {
+            // The following code modifies the dept table and commits:
+            try {
+                OracleConnection conn2 = connect();
+                conn2.setAutoCommit(false);
+                Statement stmt2 = conn2.createStatement();
+                stmt2.executeUpdate("insert into dept (deptno,dname) values ('45','cool dept')",
+                        Statement.RETURN_GENERATED_KEYS);
+                ResultSet autoGeneratedKey = stmt2.getGeneratedKeys();
+                if (autoGeneratedKey.next())
+                    System.out.println("inserted one row with ROWID=" + autoGeneratedKey.getString(1));
+                stmt2.executeUpdate("insert into dept (deptno,dname) values ('50','fun dept')", Statement.RETURN_GENERATED_KEYS);
+                autoGeneratedKey = stmt2.getGeneratedKeys();
+                if (autoGeneratedKey.next())
+                    System.out.println("inserted one row with ROWID=" + autoGeneratedKey.getString(1));
+                stmt2.close();
+                conn2.commit();
+                conn2.close();
+            } catch (SQLException ex) {
+                ex.printStackTrace();
+            }
+
+            // wait until we get the event
+            try {
+                this.wait();
+            } catch (InterruptedException ie) {
+            }
+        }
+
+        // At the end: close the registration (comment out these 3 lines in order
+        // to leave the registration open).
+        OracleConnection conn3 = connect();
+        conn3.unregisterDatabaseChangeNotification(dcr);
+        conn3.close();
+    }
+
+    /**
+     * Creates a connection the database.
+     */
+    OracleConnection connect() throws SQLException {
+        OracleDriver dr = new OracleDriver();
+        Properties prop = new Properties();
+        prop.setProperty("user", DBChangeNotification.USERNAME);
+        prop.setProperty("password", DBChangeNotification.PASSWORD);
+        return (OracleConnection) dr.connect(DBChangeNotification.URL, prop);
+    }
+}
+
+/**
+ * DCN listener: it prints out the event details in stdout.
+ */
+class DCNDemoListener implements DatabaseChangeListener {
+    DBChangeNotification demo;
+
+    DCNDemoListener(DBChangeNotification dem) {
+        demo = dem;
+    }
+
+    public void onDatabaseChangeNotification(DatabaseChangeEvent e) {
+        Thread t = Thread.currentThread();
+        System.out.println("DCNDemoListener: got an event (" + this + " running on thread " + t + ")");
+        System.out.println(e.toString());
+        synchronized (demo) {
+            demo.notify();
+        }
+    }
+}

+ 23 - 15
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/OracleRemoteClient.java

@@ -9,6 +9,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.SQLException;
+import java.util.EnumSet;
 import java.util.Properties;
 
 /**
@@ -24,10 +25,10 @@ public class OracleRemoteClient {
     private OracleStatement statement;
     private DatabaseChangeRegistration dcr;
 
-    public void init() throws SQLException {
+    public void init() throws SQLException, InterruptedException {
         OracleDataSource dataSource = new OracleDataSource();
-        dataSource.setUser("admin");
-        dataSource.setPassword("admin");
+        dataSource.setUser("ae86");
+        dataSource.setPassword("123");
         dataSource.setURL("jdbc:oracle:thin:@127.0.0.1:1521:orcl");
         conn = (OracleConnection) dataSource.getConnection();
 
@@ -35,9 +36,10 @@ public class OracleRemoteClient {
         Properties prop = new Properties();
         prop.setProperty(OracleConnection.NTF_TIMEOUT, "0");
         prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
-        prop.setProperty(OracleConnection.DCN_IGNORE_UPDATEOP,"true");
-        prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP,"true");
-        prop.setProperty(OracleConnection.DCN_IGNORE_DELETEOP, "true");
+        prop.setProperty(OracleConnection.DCN_IGNORE_UPDATEOP, "false");
+        prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, "false");
+        prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, "false");
+        prop.setProperty(OracleConnection.CONNECTION_PROPERTY_CREATE_DESCRIPTOR_USE_CURRENT_SCHEMA_FOR_SCHEMA_NAME_DEFAULT, "true");
 
         statement = (OracleStatement) conn.createStatement();
         dcr = conn.registerDatabaseChangeNotification(prop);
@@ -45,19 +47,23 @@ public class OracleRemoteClient {
         statement.setDatabaseChangeRegistration(dcr);
 
         // 监听的表
-        statement.executeQuery("select * from USER t where 1=2");
+        statement.executeQuery("select * from \"my_user\" t where 1=2");
+        statement.executeQuery("select * from \"my_org\" t where 1=2");
+
+        long regId = dcr.getRegId();
+        String[] dcrTables = dcr.getTables();
+        logger.info("regId:{}", regId);
+        logger.info("dcrTables:{}", dcrTables);
 
-        statement.close();
-        conn.close();
         logger.info("数据库更改通知开启");
     }
 
     public void close() throws SQLException {
-        if(null != statement){
+        if (null != statement) {
             statement.close();
         }
 
-        if(null != conn){
+        if (null != conn) {
             conn.unregisterDatabaseChangeNotification(dcr);
             conn.close();
         }
@@ -70,8 +76,9 @@ public class OracleRemoteClient {
             TableChangeDescription[] tds = event.getTableChangeDescription();
             logger.info("=============================");
 
-            logger.info("'TableChangeDescription'(数据表的变化次数):{}", tds.length);
             for (TableChangeDescription td : tds) {
+                EnumSet<TableChangeDescription.TableOperation> operations = td.getTableOperations();
+                operations.contains("DELETE");
                 logger.info("数据库表id:{}", td.getObjectNumber());
                 logger.info("数据表名称:{}", td.getTableName());
 
@@ -82,16 +89,17 @@ public class OracleRemoteClient {
                     logger.info("数据库表行级变化:", rowOperation.toString());
 
                     ROWID rowid = rd.getRowid();
-                    logger.info(rowid.stringValue());
+                    logger.info("事件:{},ROWID:{}", rowOperation.name(), rowid.stringValue());
                 }
             }
         }
     }
 
-    public static void main(String[] args) throws SQLException {
+    public static void main(String[] args) throws SQLException, InterruptedException {
         OracleRemoteClient client = new OracleRemoteClient();
         client.init();
-        client.close();
+        Thread.currentThread().join();
+//        client.close();
     }
 
 }