Explorar o código

修复连接超时问题

AE86 hai 1 ano
pai
achega
52fe609616

+ 1 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseConnectorMapper.java

@@ -5,7 +5,6 @@ import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
 import org.dbsyncer.connector.database.ds.SimpleDataSource;
 import org.dbsyncer.connector.database.ds.SimpleDataSource;
-import org.dbsyncer.connector.util.DatabaseUtil;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.dao.EmptyResultDataAccessException;
 import org.springframework.dao.EmptyResultDataAccessException;
@@ -33,7 +32,7 @@ public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig,
             logger.error(e.getMessage());
             logger.error(e.getMessage());
             throw new ConnectorException(e.getMessage(), e.getCause());
             throw new ConnectorException(e.getMessage(), e.getCause());
         } finally {
         } finally {
-            DatabaseUtil.close(connection);
+            dataSource.close(connection);
         }
         }
     }
     }
 
 

+ 24 - 25
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleConnection.java

@@ -1,10 +1,24 @@
 package org.dbsyncer.connector.database.ds;
 package org.dbsyncer.connector.database.ds;
 
 
-import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.util.DatabaseUtil;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import java.sql.*;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
 import java.time.Instant;
 import java.time.Instant;
 import java.util.Map;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Properties;
@@ -13,14 +27,12 @@ import java.util.concurrent.Executor;
 public class SimpleConnection implements Connection {
 public class SimpleConnection implements Connection {
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Connection connection;
     private final Connection connection;
-    private final SimpleDataSource simpleDataSource;
-    private final long activeTime = Instant.now().toEpochMilli();
+    private final long ACTIVE_TIME = Instant.now().toEpochMilli();
     private boolean oracleDriver;
     private boolean oracleDriver;
 
 
-    public SimpleConnection(SimpleDataSource simpleDataSource, Connection connection) {
-        this.simpleDataSource = simpleDataSource;
+    public SimpleConnection(Connection connection, boolean oracleDriver) {
         this.connection = connection;
         this.connection = connection;
-        oracleDriver = StringUtil.equals(simpleDataSource.getDriverClassName(), "oracle.jdbc.OracleDriver");
+        this.oracleDriver = oracleDriver;
     }
     }
 
 
     @Override
     @Override
@@ -65,24 +77,7 @@ public class SimpleConnection implements Connection {
 
 
     @Override
     @Override
     public void close() {
     public void close() {
-        if(activeTime + simpleDataSource.getLifeTime() < Instant.now().toEpochMilli()){
-            closeQuietly();
-            return;
-        }
-
-        // 回收连接
-        simpleDataSource.getPool().offer(this);
-    }
-
-    /**
-     * 断开连接
-     */
-    public void closeQuietly() {
-        try {
-            connection.close();
-        } catch (SQLException e) {
-            logger.error(e.getMessage());
-        }
+        DatabaseUtil.close(connection);
     }
     }
 
 
     @Override
     @Override
@@ -319,6 +314,10 @@ public class SimpleConnection implements Connection {
         return connection;
         return connection;
     }
     }
 
 
+    public long getActiveTime() {
+        return ACTIVE_TIME;
+    }
+
     public boolean isOracleDriver() {
     public boolean isOracleDriver() {
         return oracleDriver;
         return oracleDriver;
     }
     }

+ 50 - 28
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleDataSource.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.database.ds;
 package org.dbsyncer.connector.database.ds;
 
 
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.util.DatabaseUtil;
 import org.dbsyncer.connector.util.DatabaseUtil;
 
 
@@ -8,6 +9,7 @@ import java.io.PrintWriter;
 import java.sql.Connection;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLFeatureNotSupportedException;
+import java.time.Instant;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Logger;
 import java.util.logging.Logger;
@@ -15,7 +17,14 @@ import java.util.logging.Logger;
 public class SimpleDataSource implements DataSource, AutoCloseable {
 public class SimpleDataSource implements DataSource, AutoCloseable {
 
 
     private final BlockingQueue<SimpleConnection> pool = new LinkedBlockingQueue<>(300);
     private final BlockingQueue<SimpleConnection> pool = new LinkedBlockingQueue<>(300);
-    private long lifeTime = 60 * 1000;
+    /**
+     * 有效期(毫秒),默认60s
+     */
+    private final long KEEP_ALIVE = 60000;
+    /**
+     * 有效检测时间(秒),默认10s
+     */
+    private final int VALID_TIMEOUT_SECONDS = 10;
     private String driverClassName;
     private String driverClassName;
     private String url;
     private String url;
     private String username;
     private String username;
@@ -30,21 +39,19 @@ public class SimpleDataSource implements DataSource, AutoCloseable {
 
 
     @Override
     @Override
     public Connection getConnection() throws SQLException {
     public Connection getConnection() throws SQLException {
-        SimpleConnection poll = null;
-        int i = 3;
-        do {
-            if (pool.isEmpty()) {
-                pool.offer(createConnection());
-            }
-            poll = pool.poll();
-            if (null != poll) {
-                break;
-            }
-            i--;
-        } while (i > 1);
-
+        SimpleConnection poll = pool.poll();
         if (null == poll) {
         if (null == poll) {
-            poll = createConnection();
+            return createConnection();
+        }
+
+        // 连接无效
+        if (!poll.isValid(VALID_TIMEOUT_SECONDS)) {
+            return createConnection();
+        }
+
+        // 连接过期
+        if (isExpired(poll)) {
+            return createConnection();
         }
         }
         return poll;
         return poll;
     }
     }
@@ -91,26 +98,41 @@ public class SimpleDataSource implements DataSource, AutoCloseable {
 
 
     @Override
     @Override
     public void close() {
     public void close() {
-        pool.forEach(c -> c.closeQuietly());
+        pool.forEach(c -> c.close());
     }
     }
 
 
-    private SimpleConnection createConnection() throws SQLException {
-        return new SimpleConnection(this, DatabaseUtil.getConnection(driverClassName, url, username, password));
-    }
+    public void close(Connection connection) {
+        if (connection != null && connection instanceof SimpleConnection) {
+            SimpleConnection simpleConnection = (SimpleConnection) connection;
+            // 连接过期
+            if (isExpired(simpleConnection)) {
+                simpleConnection.close();
+                return;
+            }
 
 
-    public String getDriverClassName() {
-        return driverClassName;
+            // 回收连接
+            pool.offer(simpleConnection);
+        }
     }
     }
 
 
-    public BlockingQueue<SimpleConnection> getPool() {
-        return pool;
+    /**
+     * 连接是否过期
+     *
+     * @param connection
+     * @return
+     */
+    private boolean isExpired(SimpleConnection connection) {
+        return connection.getActiveTime() + KEEP_ALIVE < Instant.now().toEpochMilli();
     }
     }
 
 
-    public long getLifeTime() {
-        return lifeTime;
+    /**
+     * 创建新连接
+     *
+     * @return
+     * @throws SQLException
+     */
+    private SimpleConnection createConnection() throws SQLException {
+        return new SimpleConnection(DatabaseUtil.getConnection(driverClassName, url, username, password), StringUtil.equals(driverClassName, "oracle.jdbc.OracleDriver"));
     }
     }
 
 
-    public void setLifeTime(long lifeTime) {
-        this.lifeTime = lifeTime;
-    }
 }
 }