AE86 преди 4 години
родител
ревизия
f0c1788b9c

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -547,7 +547,7 @@ public abstract class AbstractDatabaseConnector implements Database {
             jdbcTemplate = getJdbcTemplate(config);
             rowNum = jdbcTemplate.queryForObject(sql, new Object[] {value}, Integer.class);
         } catch (Exception e) {
-            logger.error("检查数据行是否存在异常:", e.getMessage());
+            logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, value);
         } finally {
             this.close(jdbcTemplate);
         }

+ 5 - 0
dbsyncer-listener/pom.xml

@@ -24,6 +24,11 @@
             <artifactId>mysql-binlog-connector-java</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-log4j2</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>

+ 28 - 24
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -107,7 +107,7 @@ public class DBChangeNotification {
         }
     }
 
-    private void close(AutoCloseable rs) {
+    public void close(AutoCloseable rs) {
         if (null != rs) {
             try {
                 rs.close();
@@ -119,6 +119,33 @@ public class DBChangeNotification {
         }
     }
 
+    public void read(String tableName, String rowId, List<Object> data) {
+        OracleStatement os = null;
+        ResultSet rs = null;
+        try {
+            os = (OracleStatement) conn.createStatement();
+            rs = os.executeQuery(String.format(QUERY_ROW_DATA_SQL, tableName, rowId));
+            if (rs.next()) {
+                final int size = rs.getMetaData().getColumnCount();
+                do {
+                    data.add(rowId);
+                    for (int i = 1; i <= size; i++) {
+                        data.add(rs.getObject(i));
+                    }
+                } while (rs.next());
+            }
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+        } finally {
+            close(rs);
+            close(os);
+        }
+    }
+
+    public OracleConnection getOracleConnection() {
+        return conn;
+    }
+
     private void readTables() {
         tables = new LinkedHashMap<>();
         ResultSet rs = null;
@@ -225,29 +252,6 @@ public class DBChangeNotification {
             }
         }
 
-        private void read(String tableName, String rowId, List<Object> data) {
-            OracleStatement os = null;
-            ResultSet rs = null;
-            try {
-                os = (OracleStatement) conn.createStatement();
-                rs = os.executeQuery(String.format(QUERY_ROW_DATA_SQL, tableName, rowId));
-                if (rs.next()) {
-                    final int size = rs.getMetaData().getColumnCount();
-                    do {
-                        data.add(rowId);
-                        for (int i = 1; i <= size; i++) {
-                            data.add(rs.getObject(i));
-                        }
-                    } while (rs.next());
-                }
-            } catch (SQLException e) {
-                logger.error(e.getMessage());
-            } finally {
-                close(rs);
-                close(os);
-            }
-        }
-
     }
 
 }

+ 101 - 0
dbsyncer-listener/src/main/test/DBChangeNotificationTest.java

@@ -0,0 +1,101 @@
+import oracle.jdbc.OracleStatement;
+import oracle.jdbc.driver.OracleConnection;
+import org.dbsyncer.listener.oracle.dcn.DBChangeNotification;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * @version 1.0.0
+ * @Author AE86
+ * @Date 2021-05-10 22:25
+ */
+public class DBChangeNotificationTest {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Test
+    public void testConnect() throws Exception {
+        String username = "ae86";
+        String password = "123";
+        String url = "jdbc:oracle:thin:@127.0.0.1:1521:XE";
+
+        final DBChangeNotification dcn = new DBChangeNotification(username, password, url);
+        dcn.addRowEventListener((e) ->
+            logger.info("{}触发{}, before:{}, after:{}", e.getTableName(), e.getEvent(), e.getBeforeData(), e.getAfterData())
+        );
+        dcn.start();
+
+        // 模拟并发
+        final int threadSize = 301;
+        final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
+        final CyclicBarrier barrier = new CyclicBarrier(threadSize);
+        final CountDownLatch latch = new CountDownLatch(threadSize);
+
+        for (int i = 0; i < threadSize; i++) {
+            final int k = i + 3;
+            pool.submit(() -> {
+                try {
+                    barrier.await();
+                    //read(k, dcn);
+
+                    // 模拟写入操作
+                    insert(k, dcn);
+
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } catch (BrokenBarrierException e) {
+                    e.printStackTrace();
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+
+        try {
+            latch.await();
+            logger.info("try to close");
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        }
+        pool.shutdown();
+
+        TimeUnit.SECONDS.sleep(20);
+        dcn.close();
+        logger.info("test end");
+
+    }
+
+    private void insert(int k, DBChangeNotification dcn) {
+        OracleConnection conn = dcn.getOracleConnection();
+        OracleStatement os = null;
+        ResultSet rs = null;
+        try {
+            os = (OracleStatement) conn.createStatement();
+            String sql = "INSERT INTO \"AE86\".\"my_user\"(\"id\", \"name\", \"age\", \"phone\", \"create_date\", \"last_time\", \"money\", \"car\", \"big\", \"clo\", \"rel\") VALUES (" + k + ", '红包', '2', '18200001111', TO_DATE('2015-10-23 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), TO_TIMESTAMP('2021-01-23 00:00:00.000000', 'SYYYY-MM-DD HH24:MI:SS:FF6'), '200.00000000000000', '4', null, '888', '3.0000000000000000')";
+
+            int i = os.executeUpdate(sql);
+            logger.info("insert:{}, {}", k, i);
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+        } finally {
+            dcn.close(rs);
+            dcn.close(os);
+        }
+    }
+
+    private void read(final int k, DBChangeNotification dcn) {
+        final String tableName = "my_user";
+        final String rowId = "AAAE5fAABAAALCJAAx";
+        List<Object> data = new ArrayList<>();
+        dcn.read(tableName, rowId, data);
+        logger.info("{}, 【{}】, data:{}", k, data.size(), data);
+    }
+
+}