浏览代码

自定义ds

AE86 3 年之前
父节点
当前提交
a3fe9fa8c2

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorMapper.java

@@ -23,9 +23,9 @@ public interface ConnectorMapper<K, V> {
 
     K getConfig();
 
-    default V getConnection() {
+    default V getConnection() throws Exception {
         throw new ConnectorException("Unsupported method.");
     }
 
-    default void close(){}
+    default void close() {}
 }

+ 22 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseConnectorMapper.java

@@ -3,6 +3,8 @@ package org.dbsyncer.connector.database;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.database.ds.SimpleConnection;
+import org.dbsyncer.connector.database.ds.SimpleDataSource;
 import org.dbsyncer.connector.util.DatabaseUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -11,17 +13,19 @@ import org.springframework.dao.EmptyResultDataAccessException;
 import java.sql.Connection;
 
 public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig, Connection> {
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-    private DatabaseConfig config;
+    private final Logger           logger = LoggerFactory.getLogger(getClass());
+    private       DatabaseConfig   config;
+    private       SimpleDataSource dataSource;
 
     public DatabaseConnectorMapper(DatabaseConfig config) {
         this.config = config;
+        this.dataSource = new SimpleDataSource(config.getUrl(), config.getUsername(), config.getPassword());
     }
 
     public <T> T execute(HandleCallback callback) {
         Connection connection = null;
         try {
-            connection = DatabaseUtil.getConnection(config);
+            connection = getConnection();
             return (T) callback.apply(new DatabaseTemplate(connection));
         } catch (EmptyResultDataAccessException e) {
             throw e;
@@ -29,7 +33,7 @@ public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig,
             logger.error(e.getMessage());
             throw new ConnectorException(e.getMessage());
         } finally {
-            DatabaseUtil.close(connection);
+            close(connection);
         }
     }
 
@@ -38,4 +42,18 @@ public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig,
         return config;
     }
 
+    @Override
+    public Connection getConnection() throws Exception {
+        return dataSource.getConnection();
+    }
+
+    @Override
+    public void close() {
+        dataSource.close();
+    }
+
+    protected void close(Connection connection) {
+        SimpleConnection conn = (SimpleConnection)connection;
+        conn.closeQuietly();
+    }
 }

+ 309 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleConnection.java

@@ -0,0 +1,309 @@
+package org.dbsyncer.connector.database.ds;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.time.Instant;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SimpleConnection implements Connection {
+    private final Logger                       logger     = LoggerFactory.getLogger(getClass());
+    private final AtomicBoolean                isClosed   = new AtomicBoolean(false);
+    private final Connection                   connection;
+    private final LinkedList<SimpleConnection> pool;
+    private       long                         activeTime = Instant.now().toEpochMilli();
+
+    public SimpleConnection(LinkedList<SimpleConnection> pool, Connection connection) {
+        this.pool = pool;
+        this.connection = connection;
+    }
+
+    @Override
+    public Statement createStatement() throws SQLException {
+        return connection.createStatement();
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql) throws SQLException {
+        return connection.prepareStatement(sql);
+    }
+
+    @Override
+    public CallableStatement prepareCall(String sql) throws SQLException {
+        return connection.prepareCall(sql);
+    }
+
+    @Override
+    public String nativeSQL(String sql) throws SQLException {
+        return connection.nativeSQL(sql);
+    }
+
+    @Override
+    public void setAutoCommit(boolean autoCommit) throws SQLException {
+        connection.setAutoCommit(autoCommit);
+    }
+
+    @Override
+    public boolean getAutoCommit() throws SQLException {
+        return connection.getAutoCommit();
+    }
+
+    @Override
+    public void commit() throws SQLException {
+        connection.commit();
+    }
+
+    @Override
+    public void rollback() throws SQLException {
+        connection.rollback();
+    }
+
+    @Override
+    public void close() {
+        // 回收连接
+        pool.addLast(this);
+    }
+
+    /**
+     * 断开连接
+     */
+    public void closeQuietly() {
+        try {
+            connection.close();
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Override
+    public boolean isClosed() throws SQLException {
+        return connection.isClosed();
+    }
+
+    @Override
+    public DatabaseMetaData getMetaData() throws SQLException {
+        return connection.getMetaData();
+    }
+
+    @Override
+    public void setReadOnly(boolean readOnly) throws SQLException {
+        connection.setReadOnly(readOnly);
+    }
+
+    @Override
+    public boolean isReadOnly() throws SQLException {
+        return connection.isReadOnly();
+    }
+
+    @Override
+    public void setCatalog(String catalog) throws SQLException {
+        connection.setCatalog(catalog);
+    }
+
+    @Override
+    public String getCatalog() throws SQLException {
+        return connection.getCatalog();
+    }
+
+    @Override
+    public void setTransactionIsolation(int level) throws SQLException {
+        connection.setTransactionIsolation(level);
+    }
+
+    @Override
+    public int getTransactionIsolation() throws SQLException {
+        return connection.getTransactionIsolation();
+    }
+
+    @Override
+    public SQLWarning getWarnings() throws SQLException {
+        return connection.getWarnings();
+    }
+
+    @Override
+    public void clearWarnings() throws SQLException {
+        connection.clearWarnings();
+    }
+
+    @Override
+    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+        return connection.createStatement(resultSetType, resultSetConcurrency);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+        return connection.prepareStatement(sql, resultSetType, resultSetConcurrency);
+    }
+
+    @Override
+    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+        return connection.prepareCall(sql, resultSetType, resultSetConcurrency);
+    }
+
+    @Override
+    public Map<String, Class<?>> getTypeMap() throws SQLException {
+        return connection.getTypeMap();
+    }
+
+    @Override
+    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+        connection.setTypeMap(map);
+    }
+
+    @Override
+    public void setHoldability(int holdability) throws SQLException {
+        connection.setHoldability(holdability);
+    }
+
+    @Override
+    public int getHoldability() throws SQLException {
+        return connection.getHoldability();
+    }
+
+    @Override
+    public Savepoint setSavepoint() throws SQLException {
+        return connection.setSavepoint();
+    }
+
+    @Override
+    public Savepoint setSavepoint(String name) throws SQLException {
+        return connection.setSavepoint(name);
+    }
+
+    @Override
+    public void rollback(Savepoint savepoint) throws SQLException {
+        connection.rollback(savepoint);
+    }
+
+    @Override
+    public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+        connection.releaseSavepoint(savepoint);
+    }
+
+    @Override
+    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+        return connection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+            throws SQLException {
+        return connection.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+    }
+
+    @Override
+    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+            throws SQLException {
+        return connection.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+        return connection.prepareStatement(sql, autoGeneratedKeys);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+        return connection.prepareStatement(sql, columnIndexes);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+        return connection.prepareStatement(sql, columnNames);
+    }
+
+    @Override
+    public Clob createClob() throws SQLException {
+        return connection.createClob();
+    }
+
+    @Override
+    public Blob createBlob() throws SQLException {
+        return connection.createBlob();
+    }
+
+    @Override
+    public NClob createNClob() throws SQLException {
+        return connection.createNClob();
+    }
+
+    @Override
+    public SQLXML createSQLXML() throws SQLException {
+        return connection.createSQLXML();
+    }
+
+    @Override
+    public boolean isValid(int timeout) throws SQLException {
+        return connection.isValid(timeout);
+    }
+
+    @Override
+    public void setClientInfo(String name, String value) throws SQLClientInfoException {
+        connection.setClientInfo(name, value);
+    }
+
+    @Override
+    public void setClientInfo(Properties properties) throws SQLClientInfoException {
+        connection.setClientInfo(properties);
+    }
+
+    @Override
+    public String getClientInfo(String name) throws SQLException {
+        return connection.getClientInfo(name);
+    }
+
+    @Override
+    public Properties getClientInfo() throws SQLException {
+        return connection.getClientInfo();
+    }
+
+    @Override
+    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+        return connection.createArrayOf(typeName, elements);
+    }
+
+    @Override
+    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+        return connection.createStruct(typeName, attributes);
+    }
+
+    @Override
+    public void setSchema(String schema) throws SQLException {
+        connection.setSchema(schema);
+    }
+
+    @Override
+    public String getSchema() throws SQLException {
+        return connection.getSchema();
+    }
+
+    @Override
+    public void abort(Executor executor) throws SQLException {
+        connection.abort(executor);
+    }
+
+    @Override
+    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+        connection.setNetworkTimeout(executor, milliseconds);
+    }
+
+    @Override
+    public int getNetworkTimeout() throws SQLException {
+        return connection.getNetworkTimeout();
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return false;
+    }
+}

+ 100 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleDataSource.java

@@ -0,0 +1,100 @@
+package org.dbsyncer.connector.database.ds;
+
+import org.dbsyncer.connector.ConnectorException;
+import org.dbsyncer.connector.util.DatabaseUtil;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.LinkedList;
+import java.util.logging.Logger;
+
+public class SimpleDataSource implements DataSource, AutoCloseable {
+
+    private final org.slf4j.Logger       logger = LoggerFactory.getLogger(getClass());
+    private       LinkedList<SimpleConnection> pool   = new LinkedList<>();
+    private       String                 url;
+    private       String                 username;
+    private       String                 password;
+    private       String                 threanPoolName = "SimpleDataSourcePool-";
+    private int minIdle = 10;
+    private long lifeTime = 30 * 1000;
+
+    public SimpleDataSource(String url, String username, String password) {
+        this.url = url;
+        this.username = username;
+        this.password = password;
+
+        try {
+            for (int i = 0; i < minIdle; i++) {
+                createConnection();
+            }
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+        }
+        // TODO 心跳检测过期连接
+    }
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        synchronized (pool) {
+            if (pool.isEmpty()) {
+                createConnection();
+            }
+            return pool.getFirst();
+        }
+    }
+
+    @Override
+    public Connection getConnection(String username, String password) throws SQLException {
+        throw new ConnectorException("Unsupported method.");
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public PrintWriter getLogWriter() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public void setLogWriter(PrintWriter out) throws SQLException {
+
+    }
+
+    @Override
+    public void setLoginTimeout(int seconds) throws SQLException {
+
+    }
+
+    @Override
+    public int getLoginTimeout() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+        return null;
+    }
+
+    @Override
+    public void close() {
+        pool.forEach(c -> c.closeQuietly());
+    }
+
+    private void createConnection() throws SQLException {
+        pool.addLast(new SimpleConnection(pool, DatabaseUtil.getConnection(url, username, password)));
+    }
+
+}

+ 3 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnectorMapper.java

@@ -5,13 +5,11 @@ import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.database.DatabaseTemplate;
 import org.dbsyncer.connector.database.HandleCallback;
-import org.dbsyncer.connector.util.DatabaseUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.dao.EmptyResultDataAccessException;
 
 import java.sql.Connection;
-import java.sql.SQLException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -21,7 +19,7 @@ public final class SqlServerConnectorMapper extends DatabaseConnectorMapper {
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Lock   lock   = new ReentrantLock(true);
 
-    public SqlServerConnectorMapper(DatabaseConfig config) throws SQLException {
+    public SqlServerConnectorMapper(DatabaseConfig config) {
         super(config);
     }
 
@@ -40,7 +38,7 @@ public final class SqlServerConnectorMapper extends DatabaseConnectorMapper {
         try {
             locked = connectionLock.tryLock(60, TimeUnit.SECONDS);
             if (locked) {
-                connection = DatabaseUtil.getConnection(getConfig());
+                connection = getConnection();
                 apply = callback.apply(new DatabaseTemplate(connection));
             }
         } catch (EmptyResultDataAccessException e) {
@@ -50,7 +48,7 @@ public final class SqlServerConnectorMapper extends DatabaseConnectorMapper {
             throw new ConnectorException(e.getMessage());
         } finally {
             if (locked) {
-                DatabaseUtil.close(connection);
+                close(connection);
                 connectionLock.unlock();
             }
         }

+ 3 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java

@@ -1,8 +1,5 @@
 package org.dbsyncer.connector.util;
 
-import org.dbsyncer.connector.ConnectorException;
-import org.dbsyncer.connector.config.DatabaseConfig;
-
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -12,8 +9,8 @@ public abstract class DatabaseUtil {
     private DatabaseUtil() {
     }
 
-    public static Connection getConnection(DatabaseConfig config) throws SQLException {
-        return DriverManager.getConnection(config.getUrl(), config.getUsername(), config.getPassword());
+    public static Connection getConnection(String url, String username, String password) throws SQLException {
+        return DriverManager.getConnection(url, username, password);
     }
 
     public static void close(AutoCloseable rs) {
@@ -21,7 +18,7 @@ public abstract class DatabaseUtil {
             try {
                 rs.close();
             } catch (Exception e) {
-                throw new ConnectorException(e.getMessage());
+                e.printStackTrace();
             }
         }
     }