AE86 %!s(int64=3) %!d(string=hai) anos
pai
achega
e021a12c9c

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleDataSource.java

@@ -14,7 +14,7 @@ import java.util.logging.Logger;
 
 public class SimpleDataSource implements DataSource, AutoCloseable {
 
-    private final BlockingQueue<SimpleConnection> pool = new LinkedBlockingQueue<>(2000);
+    private final BlockingQueue<SimpleConnection> pool = new LinkedBlockingQueue<>(500);
     private long lifeTime = 60 * 1000;
     private String driverClassName;
     private String url;

+ 14 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/dcn/DBChangeNotification.java

@@ -23,6 +23,8 @@ import java.sql.SQLException;
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * 授予登录账号监听事件权限
@@ -56,6 +58,8 @@ public class DBChangeNotification {
     private Set<String> filterTable;
     private List<RowEventListener> listeners = new ArrayList<>();
     private BlockingQueue<DCNEvent> queue = new LinkedBlockingQueue<>(100);
+    private final Lock connectLock = new ReentrantLock();
+    private volatile boolean connected;
 
     public DBChangeNotification(String username, String password, String url) {
         this.username = username;
@@ -69,7 +73,13 @@ public class DBChangeNotification {
 
     public void start() throws SQLException {
         try {
+            connectLock.lock();
+            if (connected) {
+                logger.error("DBChangeNotification is already started");
+                return;
+            }
             conn = connect();
+            connected = true;
             statement = (OracleStatement) conn.createStatement();
             readTables();
 
@@ -112,6 +122,8 @@ public class DBChangeNotification {
             // to interrupt the thread otherwise it will be hanging around.
             close();
             throw ex;
+        } finally {
+            connectLock.unlock();
         }
     }
 
@@ -124,6 +136,7 @@ public class DBChangeNotification {
     }
 
     public void close() {
+        connected = false;
         if (null != worker && !worker.isInterrupted()) {
             worker.interrupt();
             worker = null;
@@ -323,7 +336,7 @@ public class DBChangeNotification {
 
         @Override
         public void run() {
-            while (!isInterrupted()) {
+            while (!isInterrupted() && connected) {
                 try {
                     // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
                     DCNEvent event = queue.take();