Browse Source

连接器配置下沉

AE86 3 năm trước cách đây
mục cha
commit
dbea599ab0

+ 6 - 0
dbsyncer-connector/pom.xml

@@ -24,6 +24,12 @@
             <artifactId>spring-jdbc</artifactId>
         </dependency>
 
+        <!-- druid数据源 -->
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+        </dependency>
+
         <!-- mysql-driver -->
         <dependency>
             <groupId>mysql</groupId>

+ 11 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorMapper.java

@@ -2,6 +2,15 @@ package org.dbsyncer.connector;
 
 import org.dbsyncer.connector.config.ConnectorConfig;
 
+/**
+ * 连接器实例,管理连接生命周期
+ *
+ * @param <K> 配置
+ * @param <V> 实例
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/20 23:00
+ */
 public interface ConnectorMapper<K, V> {
 
     default ConnectorConfig getOriginalConfig() {
@@ -11,4 +20,6 @@ public interface ConnectorMapper<K, V> {
     K getConfig();
 
     V getConnection();
+
+    void close();
 }

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -32,7 +32,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     @Override
     public ConnectorMapper connect(DatabaseConfig config) {
         try {
-            return new DatabaseConnectorMapper(config, DatabaseUtil.getConnection(config));
+            return new DatabaseConnectorMapper(config);
         } catch (Exception e) {
             logger.error("Failed to connect:{}, message:{}", config.getUrl(), e.getMessage());
         }
@@ -41,7 +41,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
     @Override
     public void disconnect(DatabaseConnectorMapper connectorMapper) {
-        DatabaseUtil.close(connectorMapper.getConnection());
+        connectorMapper.close();
     }
 
     @Override

+ 12 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseConnectorMapper.java

@@ -3,22 +3,24 @@ package org.dbsyncer.connector.database;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.util.DatabaseUtil;
 import org.springframework.dao.EmptyResultDataAccessException;
 
 import java.sql.Connection;
+import java.sql.SQLException;
 
 public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig, Connection> {
     protected DatabaseConfig config;
-    protected Connection connection;
+    protected DatabaseTemplate template;
 
-    public DatabaseConnectorMapper(DatabaseConfig config, Connection connection) {
+    public DatabaseConnectorMapper(DatabaseConfig config) throws SQLException {
         this.config = config;
-        this.connection = connection;
+        template = new DatabaseTemplate(DatabaseUtil.getConnection(config));
     }
 
     public <T> T execute(HandleCallback callback) {
         try {
-            return (T) callback.apply(new DatabaseTemplate(connection));
+            return (T) callback.apply(template);
         } catch (EmptyResultDataAccessException e) {
             throw e;
         } catch (Exception e) {
@@ -33,6 +35,11 @@ public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig,
 
     @Override
     public Connection getConnection() {
-        return connection;
+        return template.getConnection();
+    }
+
+    @Override
+    public void close() {
+        DatabaseUtil.close(getConnection());
     }
 }

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -60,12 +60,12 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
 
     @Override
     public ConnectorMapper connect(ESConfig config) {
-        return new ESConnectorMapper(config, ESUtil.getConnection(config));
+        return new ESConnectorMapper(config);
     }
 
     @Override
     public void disconnect(ESConnectorMapper connectorMapper) {
-        ESUtil.close(connectorMapper.getConnection());
+        connectorMapper.close();
     }
 
     @Override

+ 8 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnectorMapper.java

@@ -2,15 +2,16 @@ package org.dbsyncer.connector.es;
 
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ESConfig;
+import org.dbsyncer.connector.util.ESUtil;
 import org.elasticsearch.client.RestHighLevelClient;
 
 public final class ESConnectorMapper implements ConnectorMapper<ESConfig, RestHighLevelClient> {
     private ESConfig config;
     private RestHighLevelClient client;
 
-    public ESConnectorMapper(ESConfig config, RestHighLevelClient client) {
+    public ESConnectorMapper(ESConfig config) {
         this.config = config;
-        this.client = client;
+        this.client = ESUtil.getConnection(config);
     }
 
     @Override
@@ -22,4 +23,9 @@ public final class ESConnectorMapper implements ConnectorMapper<ESConfig, RestHi
     public RestHighLevelClient getConnection() {
         return client;
     }
+
+    @Override
+    public void close() {
+        ESUtil.close(client);
+    }
 }

+ 2 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -8,7 +8,6 @@ import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
-import org.dbsyncer.connector.util.KafkaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -24,7 +23,7 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
     @Override
     public ConnectorMapper connect(KafkaConfig config) {
         try {
-            return new KafkaConnectorMapper(config, KafkaUtil.getConnection(config));
+            return new KafkaConnectorMapper(config);
         } catch (Exception e) {
             throw new ConnectorException("无法连接, 请检查配置:" + e.getMessage());
         }
@@ -32,7 +31,7 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
 
     @Override
     public void disconnect(KafkaConnectorMapper connectorMapper) {
-        KafkaUtil.close(connectorMapper.getConnection());
+        connectorMapper.close();
     }
 
     @Override

+ 8 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnectorMapper.java

@@ -2,14 +2,15 @@ package org.dbsyncer.connector.kafka;
 
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.KafkaConfig;
+import org.dbsyncer.connector.util.KafkaUtil;
 
 public final class KafkaConnectorMapper implements ConnectorMapper<KafkaConfig, KafkaClient> {
     private KafkaConfig config;
     private KafkaClient client;
 
-    public KafkaConnectorMapper(KafkaConfig config, KafkaClient client) {
+    public KafkaConnectorMapper(KafkaConfig config) {
         this.config = config;
-        this.client = client;
+        this.client = KafkaUtil.getConnection(config);
     }
 
     @Override
@@ -21,4 +22,9 @@ public final class KafkaConnectorMapper implements ConnectorMapper<KafkaConfig,
     public KafkaClient getConnection() {
         return client;
     }
+
+    @Override
+    public void close() {
+        KafkaUtil.close(client);
+    }
 }

+ 1 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java

@@ -8,7 +8,6 @@ import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.sqlserver.SqlServerConnectorMapper;
-import org.dbsyncer.connector.util.DatabaseUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -22,7 +21,7 @@ public final class DQLSqlServerConnector extends AbstractDatabaseConnector {
     @Override
     public ConnectorMapper connect(DatabaseConfig config) {
         try {
-            return new SqlServerConnectorMapper(config, DatabaseUtil.getConnection(config));
+            return new SqlServerConnectorMapper(config);
         } catch (Exception e) {
             logger.error(e.getMessage());
             throw new ConnectorException(e.getMessage());

+ 1 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -7,7 +7,6 @@ import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
-import org.dbsyncer.connector.util.DatabaseUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -21,7 +20,7 @@ public final class SqlServerConnector extends AbstractDatabaseConnector implemen
     @Override
     public ConnectorMapper connect(DatabaseConfig config) {
         try {
-            return new SqlServerConnectorMapper(config, DatabaseUtil.getConnection(config));
+            return new SqlServerConnectorMapper(config);
         } catch (Exception e) {
             logger.error(e.getMessage());
             throw new ConnectorException(e.getMessage());

+ 5 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnectorMapper.java

@@ -4,12 +4,11 @@ import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.database.HandleCallback;
-import org.dbsyncer.connector.database.DatabaseTemplate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.dao.EmptyResultDataAccessException;
 
-import java.sql.Connection;
+import java.sql.SQLException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -19,8 +18,8 @@ public final class SqlServerConnectorMapper extends DatabaseConnectorMapper {
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Lock   lock   = new ReentrantLock(true);
 
-    public SqlServerConnectorMapper(DatabaseConfig config, Connection connection) {
-        super(config, connection);
+    public SqlServerConnectorMapper(DatabaseConfig config) throws SQLException {
+        super(config);
     }
 
     /**
@@ -29,6 +28,7 @@ public final class SqlServerConnectorMapper extends DatabaseConnectorMapper {
      * @param callback
      * @return
      */
+    @Override
     public <T> T execute(HandleCallback callback) {
         final Lock connectionLock = lock;
         boolean locked = false;
@@ -36,7 +36,7 @@ public final class SqlServerConnectorMapper extends DatabaseConnectorMapper {
         try {
             locked = connectionLock.tryLock(60, TimeUnit.SECONDS);
             if (locked) {
-                apply = callback.apply(new DatabaseTemplate(connection));
+                apply = callback.apply(template);
             }
         } catch (EmptyResultDataAccessException e) {
             throw e;

+ 1 - 2
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -12,7 +12,6 @@ import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.SetterEnum;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
-import org.dbsyncer.connector.util.DatabaseUtil;
 import org.dbsyncer.storage.AbstractStorageService;
 import org.dbsyncer.storage.StorageException;
 import org.dbsyncer.storage.constant.ConfigConstant;
@@ -179,7 +178,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
 
     @Override
     public void destroy() {
-        DatabaseUtil.close(connectorMapper.getConnection());
+        connectorMapper.close();
     }
 
     @Override

+ 8 - 0
pom.xml

@@ -41,6 +41,7 @@
         <commons-fileupload.version>1.4</commons-fileupload.version>
         <commons-io.version>2.5</commons-io.version>
         <lucene-analyzers-smartcn.version>7.7.0</lucene-analyzers-smartcn.version>
+        <druid.version>1.2.8</druid.version>
         <ojdbc6.version>11.2.0.4.0-atlassian-hosted</ojdbc6.version>
         <mysql.version>5.1.40</mysql.version>
         <mysql-binlog.version>0.21.0</mysql-binlog.version>
@@ -129,6 +130,13 @@
                 <version>${lucene-analyzers-smartcn.version}</version>
             </dependency>
 
+            <!-- druid数据源 -->
+            <dependency>
+                <groupId>com.alibaba</groupId>
+                <artifactId>druid</artifactId>
+                <version>${druid.version}</version>
+            </dependency>
+
             <!-- mysql-driver -->
             <dependency>
                 <groupId>mysql</groupId>