AE86 5 years ago
parent
commit
88389241d7

+ 45 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/OracleExtractor.java

@@ -1,11 +1,19 @@
 package org.dbsyncer.listener.oracle;
 
+import oracle.jdbc.dcn.TableChangeDescription;
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.ListenerException;
+import org.dbsyncer.listener.oracle.dcn.DBChangeNotification;
+import org.dbsyncer.listener.oracle.dcn.RowChangeEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
 /**
  * @version 1.0.0
  * @Author AE86
@@ -15,10 +23,18 @@ public class OracleExtractor extends AbstractExtractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    private DBChangeNotification client;
+
     @Override
     public void start() {
         try {
             final DatabaseConfig config = (DatabaseConfig) connectorConfig;
+            String username = config.getUsername();
+            String password = config.getPassword();
+            String url = config.getUrl();
+            client = new DBChangeNotification(username, password, url);
+            client.addRowEventListener((e) -> onEvent(e));
+            client.start();
         } catch (Exception e) {
             logger.error("启动失败:{}", e.getMessage());
             throw new ListenerException(e);
@@ -27,6 +43,35 @@ public class OracleExtractor extends AbstractExtractor {
 
     @Override
     public void close() {
+        if(null != client){
+            client.close();
+        }
+    }
+
+    private void onEvent(RowChangeEvent event){
+        logger.info(event.toString());
+        if(event.getEvent() == TableChangeDescription.TableOperation.UPDATE.getCode()){
+            changedLogEvent(event.getTableName(), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, event.getData());
+            return;
+        }
+
+        if(event.getEvent() == TableChangeDescription.TableOperation.INSERT.getCode()){
+            changedLogEvent(event.getTableName(), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, event.getData());
+            return;
+        }
+
+        if(event.getEvent() == TableChangeDescription.TableOperation.DELETE.getCode()){
+            changedLogEvent(event.getTableName(), ConnectorConstant.OPERTION_DELETE, event.getData(), Collections.EMPTY_LIST);
+            return;
+        }
+    }
+
+    public static void main(String[] args) throws SQLException, InterruptedException {
+        DBChangeNotification client = new DBChangeNotification("ae86", "123", "jdbc:oracle:thin:@127.0.0.1:1521:xe");
+        client.addRowEventListener((event) -> System.out.println(event));
+        client.start();
+        TimeUnit.SECONDS.sleep(120);
+        client.close();
     }
 
 }

+ 54 - 52
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -13,11 +13,9 @@ import org.slf4j.LoggerFactory;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
 
 /**
  * 授予登录账号监听事件权限
@@ -33,19 +31,27 @@ public class DBChangeNotification {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private static final String QUERY_ROW_DATA_SQL  = "SELECT * FROM \"%s\" WHERE ROWID = \"%s\"";
+    private static final String QUERY_ROW_DATA_SQL  = "SELECT * FROM \"%s\" WHERE ROWID = '%s'";
     private static final String QUERY_TABLE_ALL_SQL = "SELECT DATA_OBJECT_ID, OBJECT_NAME FROM DBA_OBJECTS WHERE OWNER='%S' AND OBJECT_TYPE = 'TABLE'";
     private static final String QUERY_TABLE_SQL     = "SELECT 1 FROM \"%s\" WHERE 1=2";
     private static final String QUERY_CALLBACK_SQL  = "SELECT REGID,CALLBACK FROM USER_CHANGE_NOTIFICATION_REGS";
     private static final String CALLBACK            = "net8://(ADDRESS=(PROTOCOL=tcp)(HOST=%s)(PORT=%s))?PR=0";
-    private static String username;
-    private static String password;
-    private static String url;
 
+    private String                     username;
+    private String                     password;
+    private String                     url;
     private OracleConnection           conn;
     private OracleStatement            statement;
     private DatabaseChangeRegistration dcr;
     private Map<Integer, String>       tables;
+    private List<RowEventListener>     listeners;
+
+    public DBChangeNotification(String username, String password, String url) {
+        this.username = username;
+        this.password = password;
+        this.url = url;
+        this.listeners = new ArrayList<>();
+    }
 
     public void start() throws SQLException {
         try {
@@ -76,10 +82,6 @@ public class DBChangeNotification {
             for (Map.Entry<Integer, String> m : tables.entrySet()) {
                 statement.executeQuery(String.format(QUERY_TABLE_SQL, m.getValue()));
             }
-            String[] tableNames = dcr.getTables();
-            int tableSize = tableNames.length;
-            for (int i = 0; i < tableSize; i++) { logger.info("{} is part of the registration.", tableNames[i]); }
-            logger.info("数据库更改通知开启");
         } catch (SQLException ex) {
             // if an exception occurs, we need to close the registration in order
             // to interrupt the thread otherwise it will be hanging around.
@@ -107,7 +109,17 @@ public class DBChangeNotification {
         }
     }
 
-    private void readTables() throws SQLException {
+    private void close(ResultSet rs){
+        if (null != rs) {
+            try {
+                rs.close();
+            } catch (SQLException e) {
+                logger.error(e.getMessage());
+            }
+        }
+    }
+
+    private void readTables() {
         tables = new LinkedHashMap<>();
         ResultSet rs = null;
         try {
@@ -121,9 +133,7 @@ public class DBChangeNotification {
         } catch (SQLException e) {
             logger.error(e.getMessage());
         } finally {
-            if (null != rs) {
-                rs.close();
-            }
+            close(rs);
         }
     }
 
@@ -154,7 +164,7 @@ public class DBChangeNotification {
         return null == obj ? 0 : Integer.parseInt(String.valueOf(obj));
     }
 
-    private void clean(OracleStatement statement, long excludeRegId, String excludeCallback) throws SQLException {
+    private void clean(OracleStatement statement, long excludeRegId, String excludeCallback) {
         ResultSet rs = null;
         try {
             rs = statement.executeQuery(QUERY_CALLBACK_SQL);
@@ -170,18 +180,11 @@ public class DBChangeNotification {
         } catch (SQLException e) {
             logger.error(e.getMessage());
         } finally {
-            if (null != rs) {
-                rs.close();
-            }
+            close(rs);
         }
     }
 
     private OracleConnection connect() throws SQLException {
-        // 加载连接参数
-        username = "AE86".toUpperCase();
-        password = "123".toUpperCase();
-        url = "jdbc:oracle:thin:@127.0.0.1:1521:xe";
-
         OracleDriver dr = new OracleDriver();
         Properties prop = new Properties();
         prop.setProperty("user", username);
@@ -189,49 +192,48 @@ public class DBChangeNotification {
         return (OracleConnection) dr.connect(url, prop);
     }
 
+    public void addRowEventListener(RowEventListener rowEventListener) {
+        this.listeners.add(rowEventListener);
+    }
+
     final class DCNListener implements DatabaseChangeListener {
 
         @Override
         public void onDatabaseChangeNotification(DatabaseChangeEvent event) {
             TableChangeDescription[] tds = event.getTableChangeDescription();
-            logger.info("=============================");
 
             for (TableChangeDescription td : tds) {
-                /**
-                 * Select userenv('language') from dual;
-                 * >SIMPLIFIED CHINESE_CHINA.AL32UTF8
-                 *
-                 * select * from V$NLS_PARAMETERS
-                 * >AL32UTF8
-                 *
-                 */
-                // oracle 11g XE 响应编码:AL32UTF8
-                logger.info("数据库表id:{}", td.getObjectNumber());
-                logger.info("数据表名称:{}", td.getTableName());
-                logger.info("数据表名称:{}", tables.get(td.getObjectNumber()));
-
-                // 获得返回的行级变化描述通知 行id、影响这一行的DML操作(行是插入、更新或删除的一种)
                 RowChangeDescription[] rds = td.getRowChangeDescription();
                 for (RowChangeDescription rd : rds) {
                     RowChangeDescription.RowOperation opr = rd.getRowOperation();
-                    parseEvent(opr.name(), rd.getRowid().stringValue(), tables.get(td.getObjectNumber()));
+                    parseEvent(tables.get(td.getObjectNumber()), rd.getRowid().stringValue(), opr);
                 }
             }
         }
 
-        private void parseEvent(String event, String rowId, String tableName){
-            logger.info("event:{}, rowid:{}, tableName:{}", event, rowId, tableName);
-            // QUERY_ROW_DATA_SQL
+        private void parseEvent(String tableName, String rowId, RowChangeDescription.RowOperation event) {
+            List<Object> data = new ArrayList<>();
+            data.add(rowId);
+
+            if(event.getCode() != TableChangeDescription.TableOperation.DELETE.getCode()){
+                ResultSet rs = null;
+                try {
+                    rs = statement.executeQuery(String.format(QUERY_ROW_DATA_SQL, tableName, rowId));
+                    final int size = rs.getMetaData().getColumnCount();
+                    while (rs.next()) {
+                        for (int i = 1; i <= size; i++) {
+                            data.add(rs.getObject(i));
+                        }
+                    }
+                } catch (SQLException e) {
+                    logger.error(e.getMessage());
+                } finally {
+                    close(rs);
+                }
+            }
 
+            listeners.forEach(e -> e.onEvents(new RowChangeEvent(tableName, event.getCode(), data)));
         }
     }
 
-    public static void main(String[] args) throws SQLException, InterruptedException {
-        DBChangeNotification client = new DBChangeNotification();
-        client.start();
-        // sleep 30s
-        TimeUnit.SECONDS.sleep(120);
-        client.close();
-    }
-
 }

+ 47 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/RowChangeEvent.java

@@ -0,0 +1,47 @@
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
+package org.dbsyncer.listener.oracle.dcn;
+
+import java.util.List;
+
+/**
+ * 监听行变更事件
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-06-15 20:00
+ */
+public class RowChangeEvent {
+
+    private String tableName;
+    private int event;
+    private List<Object> data;
+
+    public RowChangeEvent(String tableName, int event, List<Object> data) {
+        this.tableName = tableName;
+        this.event = event;
+        this.data = data;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public int getEvent() {
+        return event;
+    }
+
+    public List<Object> getData() {
+        return data;
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder().append("RowChangeEvent{")
+                .append("tableName='").append(tableName).append('\'')
+                .append("event='").append(event).append('\'')
+                .append(", data=").append(data)
+                .append('}').toString();
+    }
+}

+ 17 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/RowEventListener.java

@@ -0,0 +1,17 @@
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
+package org.dbsyncer.listener.oracle.dcn;
+
+/**
+ * 行变更监听器
+ *
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2020-06-15 20:00
+ */
+public interface RowEventListener {
+
+    void onEvents(RowChangeEvent event);
+
+}