Browse Source

fix connection pool

AE86 3 years ago
parent
commit
794d54ef26
32 changed files with 207 additions and 795 deletions
  1. 7 5
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/ConnectorChecker.java
  2. 2 2
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MappingServiceImpl.java
  3. 0 6
      dbsyncer-connector/pom.xml
  4. 3 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java
  5. 22 25
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java
  6. 9 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java
  7. 4 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterBatchConfig.java
  8. 9 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterConfig.java
  9. 4 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterSingleConfig.java
  10. 39 92
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  11. 0 6
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/Database.java
  12. 1 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/ConnectorEnum.java
  13. 0 34
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/Redis.java
  14. 0 155
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/RedisConnector.java
  15. 0 40
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/RedisTemplate.java
  16. 0 55
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/RedisTemplateCluster.java
  17. 0 61
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/RedisTemplateSingle.java
  18. 3 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLMysqlConnector.java
  19. 3 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java
  20. 3 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java
  21. 0 33
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/JDBCUtil.java
  22. 0 20
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PingUtil.java
  23. 0 177
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/RedisUtil.java
  24. 3 1
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java
  25. 9 16
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java
  26. 8 3
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java
  27. 14 3
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java
  28. 1 1
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/Monitor.java
  29. 2 3
      dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java
  30. 19 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java
  31. 29 16
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  32. 13 4
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

+ 7 - 5
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/ConnectorChecker.java

@@ -4,6 +4,7 @@ import org.dbsyncer.biz.BizException;
 import org.dbsyncer.biz.checker.AbstractChecker;
 import org.dbsyncer.biz.checker.ConnectorConfigChecker;
 import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.manager.Manager;
@@ -106,13 +107,14 @@ public class ConnectorChecker extends AbstractChecker {
     }
 
     private void setTable(Connector connector) {
-        // 获取表信息
-        boolean alive = manager.alive(connector.getConfig());
-        if (!alive) {
+        boolean isAlive = manager.refreshConnectorConfig(connector.getConfig());
+        if (!isAlive) {
             logService.log(LogType.ConnectorLog.FAILED);
         }
-        Assert.isTrue(alive, "无法连接.");
-        List<String> table = manager.getTable(connector.getConfig());
+        Assert.isTrue(isAlive, "无法连接.");
+        // 获取表信息
+        ConnectorMapper connectorMapper = manager.connect(connector.getConfig());
+        List<String> table = manager.getTable(connectorMapper);
         connector.setTable(table);
     }
 

+ 2 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MappingServiceImpl.java

@@ -151,9 +151,9 @@ public class MappingServiceImpl extends BaseServiceImpl implements MappingServic
         Assert.notNull(mapping, "Mapping can not be null.");
         Connector s = manager.getConnector(mapping.getSourceConnectorId());
         Connector t = manager.getConnector(mapping.getTargetConnectorId());
-        ConnectorVo sConn = new ConnectorVo(monitor.alive(s.getId()));
+        ConnectorVo sConn = new ConnectorVo(monitor.isAlive(s.getId()));
         BeanUtils.copyProperties(s, sConn);
-        ConnectorVo tConn = new ConnectorVo(monitor.alive(t.getId()));
+        ConnectorVo tConn = new ConnectorVo(monitor.isAlive(t.getId()));
         BeanUtils.copyProperties(t, tConn);
 
         // 元信息

+ 0 - 6
dbsyncer-connector/pom.xml

@@ -53,12 +53,6 @@
             <artifactId>mssql-jdbc</artifactId>
         </dependency>
 
-        <!-- redis客户端连接工具 -->
-        <dependency>
-            <groupId>redis.clients</groupId>
-            <artifactId>jedis</artifactId>
-        </dependency>
-
     </dependencies>
 
 </project>

+ 3 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java

@@ -52,7 +52,7 @@ public interface Connector {
      * @param config
      * @return
      */
-    List<String> getTable(ConnectorConfig config);
+    List<String> getTable(ConnectorMapper config);
 
     /**
      * 获取表元信息
@@ -61,7 +61,7 @@ public interface Connector {
      * @param tableName
      * @return
      */
-    MetaInfo getMetaInfo(ConnectorConfig config, String tableName);
+    MetaInfo getMetaInfo(ConnectorMapper config, String tableName);
 
     /**
      * 获取数据源同步参数
@@ -86,7 +86,7 @@ public interface Connector {
      * @param command
      * @return
      */
-    long getCount(ConnectorConfig config, Map<String, String> command);
+    long getCount(ConnectorMapper config, Map<String, String> command);
 
     /**
      * 分页获取数据源数据

+ 22 - 25
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -6,7 +6,6 @@ import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.util.Assert;
 
-import java.sql.Connection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -30,15 +29,11 @@ public class ConnectorFactory implements DisposableBean {
     }
 
     /**
-     * 建立连接
+     * 建立连接,返回缓存连接对象
      *
      * @param config
      */
-    public Connection connect(ConnectorConfig config) {
-        return connect(config, Connection.class);
-    }
-
-    public <T> T connect(ConnectorConfig config, Class<T> valueType) {
+    public synchronized ConnectorMapper connect(ConnectorConfig config) {
         Assert.notNull(config, "ConnectorConfig can not be null.");
         String type = config.getConnectorType();
         Connector connector = getConnector(type);
@@ -48,16 +43,16 @@ public class ConnectorFactory implements DisposableBean {
             mapper = connector.connect(config);
             connectorCache.putIfAbsent(mapper.getCacheKey(), mapper);
         }
-        return (T) mapper.getConnection();
+        return mapper;
     }
 
     /**
-     * TODO 更新连接配置
+     * 新连接配置
      *
      * @param config
      * @return
      */
-    public void updateConnectorConfig(ConnectorConfig config) {
+    public synchronized boolean refresh(ConnectorConfig config) {
         Assert.notNull(config, "ConnectorConfig can not be null.");
         String type = config.getConnectorType();
         Connector connector = getConnector(type);
@@ -68,6 +63,7 @@ public class ConnectorFactory implements DisposableBean {
             connectorCache.remove(cacheKey);
         }
         connect(config);
+        return isAlive(config);
     }
 
     /**
@@ -84,8 +80,7 @@ public class ConnectorFactory implements DisposableBean {
         if (!connectorCache.containsKey(cacheKey)) {
             connect(config);
         }
-        final ConnectorMapper connectorMapper = connectorCache.get(cacheKey);
-        return connector.isAlive(connectorMapper);
+        return connector.isAlive(connectorCache.get(cacheKey));
     }
 
     /**
@@ -93,9 +88,9 @@ public class ConnectorFactory implements DisposableBean {
      *
      * @return
      */
-    public List<String> getTable(ConnectorConfig config) {
-        Assert.notNull(config, "ConnectorConfig can not be null.");
-        String type = config.getConnectorType();
+    public List<String> getTable(ConnectorMapper config) {
+        Assert.notNull(config, "ConnectorMapper can not be null.");
+        String type = config.getConfig().getConnectorType();
         return getConnector(type).getTable(config);
     }
 
@@ -104,11 +99,10 @@ public class ConnectorFactory implements DisposableBean {
      *
      * @return
      */
-    public MetaInfo getMetaInfo(ConnectorConfig config, String tableName) {
-        Assert.notNull(config, "ConnectorConfig can not be null.");
+    public MetaInfo getMetaInfo(ConnectorMapper config, String tableName) {
+        Assert.notNull(config, "ConnectorMapper can not be null.");
         Assert.hasText(tableName, "tableName can not be empty.");
-        String type = config.getConnectorType();
-        return getConnector(type).getMetaInfo(config, tableName);
+        return getConnector(config).getMetaInfo(config, tableName);
     }
 
     /**
@@ -136,32 +130,35 @@ public class ConnectorFactory implements DisposableBean {
      * @param command
      * @return
      */
-    public long getCount(ConnectorConfig config, Map<String, String> command) {
-        Connector connector = getConnector(config.getConnectorType());
-        return connector.getCount(config, command);
+    public long getCount(ConnectorMapper config, Map<String, String> command) {
+        return getConnector(config).getCount(config, command);
     }
 
     public Result reader(ReaderConfig config) {
-        Connector connector = getConnector(config.getConfig().getConnectorType());
+        Connector connector = getConnector(config.getConnectorMapper());
         Result result = connector.reader(config);
         Assert.notNull(result, "Connector reader result can not null");
         return result;
     }
 
     public Result writer(WriterBatchConfig config) {
-        Connector connector = getConnector(config.getConfig().getConnectorType());
+        Connector connector = getConnector(config.getConnectorMapper());
         Result result = connector.writer(config);
         Assert.notNull(result, "Connector writer result can not null");
         return result;
     }
 
     public Result writer(WriterSingleConfig config) {
-        Connector connector = getConnector(config.getConfig().getConnectorType());
+        Connector connector = getConnector(config.getConnectorMapper());
         Result result = connector.writer(config);
         Assert.notNull(result, "Connector writer result can not null");
         return result;
     }
 
+    private Connector getConnector(ConnectorMapper connectorMapper) {
+        return getConnector(connectorMapper.getConfig().getConnectorType());
+    }
+
     /**
      * 获取连接器
      *

+ 9 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java

@@ -1,30 +1,32 @@
 package org.dbsyncer.connector.config;
 
+import org.dbsyncer.connector.ConnectorMapper;
+
 import java.util.List;
 import java.util.Map;
 
 public class ReaderConfig {
 
-    private ConnectorConfig config;
+    private ConnectorMapper connectorMapper;
     private Map<String, String> command;
     private List<Object> args;
     private int pageIndex;
     private int pageSize;
 
-    public ReaderConfig(ConnectorConfig config, Map<String,String> command, List<Object> args, int pageIndex, int pageSize) {
-        this.config = config;
+    public ReaderConfig(ConnectorMapper connectorMapper, Map<String,String> command, List<Object> args, int pageIndex, int pageSize) {
+        this.connectorMapper = connectorMapper;
         this.command = command;
         this.args = args;
         this.pageIndex = pageIndex;
         this.pageSize = pageSize;
     }
 
-    public ConnectorConfig getConfig() {
-        return config;
+    public ConnectorMapper getConnectorMapper() {
+        return connectorMapper;
     }
 
-    public ReaderConfig setConfig(ConnectorConfig config) {
-        this.config = config;
+    public ReaderConfig setConnectorMapper(ConnectorMapper connectorMapper) {
+        this.connectorMapper = connectorMapper;
         return this;
     }
 

+ 4 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterBatchConfig.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.connector.config;
 
+import org.dbsyncer.connector.ConnectorMapper;
+
 import java.util.List;
 import java.util.Map;
 
@@ -10,8 +12,8 @@ public class WriterBatchConfig extends WriterConfig {
      */
     private List<Map> data;
 
-    public WriterBatchConfig(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map> data) {
-        setConfig(config);
+    public WriterBatchConfig(ConnectorMapper connectorMapper, Map<String, String> command, List<Field> fields, List<Map> data) {
+        setConnectorMapper(connectorMapper);
         setCommand(command);
         setFields(fields);
         setData(data);

+ 9 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterConfig.java

@@ -1,14 +1,16 @@
 package org.dbsyncer.connector.config;
 
+import org.dbsyncer.connector.ConnectorMapper;
+
 import java.util.List;
 import java.util.Map;
 
 public class WriterConfig {
 
     /**
-     * 连接配置
+     * 连接配置
      */
-    private ConnectorConfig     config;
+    private ConnectorMapper connectorMapper;
     /**
      * 执行命令
      */
@@ -16,14 +18,14 @@ public class WriterConfig {
     /**
      * 字段信息
      */
-    private List<Field>         fields;
+    private List<Field> fields;
 
-    public ConnectorConfig getConfig() {
-        return config;
+    public ConnectorMapper getConnectorMapper() {
+        return connectorMapper;
     }
 
-    public WriterConfig setConfig(ConnectorConfig config) {
-        this.config = config;
+    public WriterConfig setConnectorMapper(ConnectorMapper connectorMapper) {
+        this.connectorMapper = connectorMapper;
         return this;
     }
 

+ 4 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterSingleConfig.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.connector.config;
 
+import org.dbsyncer.connector.ConnectorMapper;
+
 import java.util.List;
 import java.util.Map;
 
@@ -25,8 +27,8 @@ public class WriterSingleConfig extends WriterConfig {
      */
     private boolean retry;
 
-    public WriterSingleConfig(ConnectorConfig config, List<Field> fields, Map<String, String> command, String event, Map<String, Object> data, String table) {
-        setConfig(config);
+    public WriterSingleConfig(ConnectorMapper connectorMapper, List<Field> fields, Map<String, String> command, String event, Map<String, Object> data, String table) {
+        setConnectorMapper(connectorMapper);
         setCommand(command);
         setFields(fields);
         setData(data);

+ 39 - 92
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -11,14 +11,12 @@ import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.enums.SetterEnum;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
 import org.dbsyncer.connector.util.DatabaseUtil;
-import org.dbsyncer.connector.util.JDBCUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.jdbc.core.BatchPreparedStatementSetter;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.util.Assert;
 
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.*;
@@ -35,7 +33,7 @@ public abstract class AbstractDatabaseConnector implements Database {
         DatabaseConfig cfg = (DatabaseConfig) config;
         try {
             String cacheKey = getConnectorMapperCacheKey(config);
-            return new ConnectorMapper(config, cacheKey, JDBCUtil.getConnection(cfg.getDriverClassName(), cfg.getUrl(), cfg.getUsername(), cfg.getPassword()));
+            return new ConnectorMapper(config, cacheKey, DatabaseUtil.getJdbcTemplate(cfg));
         } catch (Exception e) {
             logger.error("Failed to connect:{}, message:{}", cfg.getUrl(), e.getMessage());
         }
@@ -44,18 +42,21 @@ public abstract class AbstractDatabaseConnector implements Database {
 
     @Override
     public void disconnect(ConnectorMapper connectorMapper) {
-        JDBCUtil.close((Connection) connectorMapper.getConnection());
+        try {
+            DatabaseUtil.close((JdbcTemplate) connectorMapper.getConnection());
+        } catch (SQLException e) {
+            logger.error("Close jdbcTemplate failed: {}", e.getMessage());
+        }
     }
 
     @Override
     public boolean isAlive(ConnectorMapper connectorMapper) {
         try {
-            Connection connection = (Connection) connectorMapper.getConnection();
-            return null != connection && !connection.isClosed();
+            JdbcTemplate jdbcTemplate = (JdbcTemplate) connectorMapper.getConnection();
+            return null != jdbcTemplate && !jdbcTemplate.getDataSource().getConnection().isClosed();
         } catch (SQLException e) {
-            logger.error(e.getMessage());
+            throw new ConnectorException(e.getMessage(), e.getCause());
         }
-        return false;
     }
 
     @Override
@@ -65,40 +66,26 @@ public abstract class AbstractDatabaseConnector implements Database {
     }
 
     @Override
-    public List<String> getTable(ConnectorConfig config) {
-        List<String> tables = new ArrayList<>();
-        DatabaseConfig databaseConfig = (DatabaseConfig) config;
-        JdbcTemplate jdbcTemplate = null;
+    public List<String> getTable(ConnectorMapper config) {
         try {
-            jdbcTemplate = getJdbcTemplate(databaseConfig);
-            String sql = getTablesSql(databaseConfig);
-            tables = jdbcTemplate.queryForList(sql, String.class);
+            JdbcTemplate jdbcTemplate = (JdbcTemplate) config.getConnection();
+            String sql = getTablesSql((DatabaseConfig) config.getConfig());
+            return jdbcTemplate.queryForList(sql, String.class);
         } catch (Exception e) {
             throw new ConnectorException(e.getMessage(), e.getCause());
-        } finally {
-            // 释放连接
-            this.close(jdbcTemplate);
         }
-        return tables;
     }
 
     @Override
-    public MetaInfo getMetaInfo(ConnectorConfig config, String tableName) {
-        DatabaseConfig cfg = (DatabaseConfig) config;
-        JdbcTemplate jdbcTemplate = null;
-        MetaInfo metaInfo = null;
+    public MetaInfo getMetaInfo(ConnectorMapper config, String tableName) {
         try {
-            jdbcTemplate = getJdbcTemplate(cfg);
+            JdbcTemplate jdbcTemplate = (JdbcTemplate) config.getConnection();
             String quotation = buildSqlWithQuotation();
             StringBuilder queryMetaSql = new StringBuilder("SELECT * FROM ").append(quotation).append(tableName).append(quotation).append(" WHERE 1 != 1");
-            metaInfo = DatabaseUtil.getMetaInfo(jdbcTemplate, queryMetaSql.toString(), tableName);
+            return DatabaseUtil.getMetaInfo(jdbcTemplate, queryMetaSql.toString(), tableName);
         } catch (Exception e) {
-            logger.error(e.getMessage());
-        } finally {
-            // 释放连接
-            this.close(jdbcTemplate);
+            throw new ConnectorException(e.getMessage(), e.getCause());
         }
-        return metaInfo;
     }
 
     @Override
@@ -154,25 +141,20 @@ public abstract class AbstractDatabaseConnector implements Database {
     }
 
     @Override
-    public long getCount(ConnectorConfig config, Map<String, String> command) {
+    public long getCount(ConnectorMapper config, Map<String, String> command) {
         // 1、获取select SQL
         String queryCountSql = command.get(ConnectorConstant.OPERTION_QUERY_COUNT);
         Assert.hasText(queryCountSql, "查询总数语句不能为空.");
 
-        DatabaseConfig cfg = (DatabaseConfig) config;
-        JdbcTemplate jdbcTemplate = null;
         try {
             // 2、获取连接
-            jdbcTemplate = getJdbcTemplate(cfg);
+            JdbcTemplate jdbcTemplate = (JdbcTemplate) config.getConnection();
 
             // 3、返回结果集
             return jdbcTemplate.queryForObject(queryCountSql, Long.class);
         } catch (Exception e) {
             logger.error(e.getMessage());
             throw new ConnectorException(e.getMessage());
-        } finally {
-            // 释放连接
-            this.close(jdbcTemplate);
         }
     }
 
@@ -182,11 +164,10 @@ public abstract class AbstractDatabaseConnector implements Database {
         String querySql = config.getCommand().get(SqlBuilderEnum.QUERY.getName());
         Assert.hasText(querySql, "查询语句不能为空.");
 
-        DatabaseConfig cfg = (DatabaseConfig) config.getConfig();
-        JdbcTemplate jdbcTemplate = null;
         try {
             // 2、获取连接
-            jdbcTemplate = getJdbcTemplate(cfg);
+            ConnectorMapper connectorMapper = config.getConnectorMapper();
+            JdbcTemplate jdbcTemplate = (JdbcTemplate) connectorMapper.getConnection();
 
             // 3、设置参数
             Collections.addAll(config.getArgs(), getPageArgs(config.getPageIndex(), config.getPageSize()));
@@ -199,9 +180,6 @@ public abstract class AbstractDatabaseConnector implements Database {
         } catch (Exception e) {
             logger.error(e.getMessage());
             throw new ConnectorException(e.getMessage());
-        } finally {
-            // 释放连接
-            this.close(jdbcTemplate);
         }
     }
 
@@ -224,12 +202,11 @@ public abstract class AbstractDatabaseConnector implements Database {
         final int size = data.size();
         final int fSize = fields.size();
 
-        DatabaseConfig cfg = (DatabaseConfig) config.getConfig();
-        JdbcTemplate jdbcTemplate = null;
         Result result = new Result();
         try {
             // 2、获取连接
-            jdbcTemplate = getJdbcTemplate(cfg);
+            ConnectorMapper connectorMapper = config.getConnectorMapper();
+            JdbcTemplate jdbcTemplate = (JdbcTemplate) connectorMapper.getConnection();
 
             // 3、设置参数
             final JdbcTemplate template = jdbcTemplate;
@@ -251,9 +228,6 @@ public abstract class AbstractDatabaseConnector implements Database {
             result.getFail().set(size);
             result.getError().append(e.getMessage()).append(System.lineSeparator());
             logger.error(e.getMessage());
-        } finally {
-            // 释放连接
-            this.close(jdbcTemplate);
         }
         return result;
     }
@@ -282,20 +256,19 @@ public abstract class AbstractDatabaseConnector implements Database {
         }
 
         int size = fields.size();
-        DatabaseConfig cfg = (DatabaseConfig) config.getConfig();
-        JdbcTemplate jdbcTemplate = null;
         Result result = new Result();
+        // 2、获取连接
+        ConnectorMapper connectorMapper = config.getConnectorMapper();
+        JdbcTemplate jdbcTemplate = (JdbcTemplate) connectorMapper.getConnection();
+
         int update = 0;
         try {
-            // 2、获取连接
-            jdbcTemplate = getJdbcTemplate(cfg);
             // 3、设置参数
-            final JdbcTemplate template = jdbcTemplate;
             update = jdbcTemplate.update(sql, (ps) -> {
                 Field f = null;
                 for (int i = 0; i < size; i++) {
                     f = fields.get(i);
-                    SetterEnum.getSetter(f.getType()).set(template, ps, i + 1, f.getType(), data.get(f.getName()));
+                    SetterEnum.getSetter(f.getType()).set(jdbcTemplate, ps, i + 1, f.getType(), data.get(f.getName()));
                 }
             });
         } catch (Exception e) {
@@ -306,16 +279,13 @@ public abstract class AbstractDatabaseConnector implements Database {
                     .append("DATA:").append(data).append(System.lineSeparator())
                     .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
             logger.error("SQL:{}, DATA:{}, ERROR:{}", sql, data, e.getMessage());
-        } finally {
-            // 释放连接
-            this.close(jdbcTemplate);
         }
 
         // 更新失败尝试插入
         if (0 == update && isUpdate(event) && null != pkField && !config.isRetry()) {
             // 插入前检查有无数据
             String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
-            if (!existRow(cfg, queryCount, data.get(pkField.getName()))) {
+            if (!existRow(jdbcTemplate, queryCount, data.get(pkField.getName()))) {
                 fields.remove(fields.size() - 1);
                 config.setEvent(ConnectorConstant.OPERTION_INSERT);
                 config.setRetry(true);
@@ -326,30 +296,16 @@ public abstract class AbstractDatabaseConnector implements Database {
         return result;
     }
 
-    @Override
-    public JdbcTemplate getJdbcTemplate(DatabaseConfig config) {
-        return DatabaseUtil.getJdbcTemplate(config);
-    }
-
-    @Override
-    public void close(JdbcTemplate jdbcTemplate) {
-        try {
-            DatabaseUtil.close(jdbcTemplate);
-        } catch (SQLException e) {
-            logger.error("Close jdbcTemplate failed: {}", e.getMessage());
-        }
-    }
-
     /**
      * 获取DQL表信息
      *
      * @param config
      * @return
      */
-    protected List<String> getDqlTable(ConnectorConfig config) {
+    protected List<String> getDqlTable(ConnectorMapper config) {
         MetaInfo metaInfo = getDqlMetaInfo(config);
         Assert.notNull(metaInfo, "SQL解析异常.");
-        DatabaseConfig cfg = (DatabaseConfig) config;
+        DatabaseConfig cfg = (DatabaseConfig) config.getConfig();
         List<String> tables = new ArrayList<>();
         tables.add(cfg.getSql());
         return tables;
@@ -361,22 +317,17 @@ public abstract class AbstractDatabaseConnector implements Database {
      * @param config
      * @return
      */
-    protected MetaInfo getDqlMetaInfo(ConnectorConfig config) {
-        DatabaseConfig cfg = (DatabaseConfig) config;
-        JdbcTemplate jdbcTemplate = null;
-        MetaInfo metaInfo = null;
+    protected MetaInfo getDqlMetaInfo(ConnectorMapper config) {
         try {
-            jdbcTemplate = getJdbcTemplate(cfg);
+            DatabaseConfig cfg = (DatabaseConfig) config.getConfig();
+            JdbcTemplate jdbcTemplate = (JdbcTemplate) config.getConnection();
             String sql = cfg.getSql().toUpperCase();
             String queryMetaSql = StringUtils.contains(sql, " WHERE ") ? sql + " AND 1!=1 " : sql + " WHERE 1!=1 ";
-            metaInfo = DatabaseUtil.getMetaInfo(jdbcTemplate, queryMetaSql, cfg.getTable());
+            return DatabaseUtil.getMetaInfo(jdbcTemplate, queryMetaSql, cfg.getTable());
         } catch (Exception e) {
             logger.error(e.getMessage());
-        } finally {
-            // 释放连接
-            this.close(jdbcTemplate);
+            throw new ConnectorException(e.getMessage());
         }
-        return metaInfo;
     }
 
     /**
@@ -567,18 +518,14 @@ public abstract class AbstractDatabaseConnector implements Database {
         }
     }
 
-    private boolean existRow(DatabaseConfig config, String sql, Object value) {
-        JdbcTemplate jdbcTemplate = null;
-        Integer rowNum = null;
+    private boolean existRow(JdbcTemplate jdbcTemplate, String sql, Object value) {
+        int rowNum = 0;
         try {
-            jdbcTemplate = getJdbcTemplate(config);
             rowNum = jdbcTemplate.queryForObject(sql, new Object[]{value}, Integer.class);
         } catch (Exception e) {
             logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, value);
-        } finally {
-            this.close(jdbcTemplate);
         }
-        return null != rowNum && rowNum > 0;
+        return rowNum > 0;
     }
 
     private Field getPrimaryKeyField(List<Field> fields) {

+ 0 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/Database.java

@@ -1,16 +1,10 @@
 package org.dbsyncer.connector.database;
 
 import org.dbsyncer.connector.Connector;
-import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.PageSqlConfig;
-import org.springframework.jdbc.core.JdbcTemplate;
 
 public interface Database extends Connector {
 
-    JdbcTemplate getJdbcTemplate(DatabaseConfig config);
-
-    void close(JdbcTemplate jdbcTemplate);
-
     /**
      * 获取分页SQL
      *

+ 1 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/ConnectorEnum.java

@@ -4,10 +4,8 @@ import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.RedisConfig;
 import org.dbsyncer.connector.mysql.MysqlConnector;
 import org.dbsyncer.connector.oracle.OracleConnector;
-import org.dbsyncer.connector.redis.RedisConnector;
 import org.dbsyncer.connector.sql.DQLMysqlConnector;
 import org.dbsyncer.connector.sql.DQLOracleConnector;
 import org.dbsyncer.connector.sql.DQLSqlServerConnector;
@@ -45,11 +43,7 @@ public enum ConnectorEnum {
     /**
      * DqlSqlServer 连接器
      */
-    DQL_SQL_SERVER("DqlSqlServer", new DQLSqlServerConnector(), DatabaseConfig.class),
-    /**
-     * Redis 连接器
-     */
-    REDIS("Redis", new RedisConnector(), RedisConfig.class);
+    DQL_SQL_SERVER("DqlSqlServer", new DQLSqlServerConnector(), DatabaseConfig.class);
 
     // 连接器名称
     private String type;

+ 0 - 34
dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/Redis.java

@@ -1,34 +0,0 @@
-package org.dbsyncer.connector.redis;
-
-import org.dbsyncer.connector.Connector;
-import org.dbsyncer.connector.config.RedisConfig;
-
-public interface Redis extends Connector {
-
-    /**
-     * 获取redis连接实例
-     *
-     * @param config
-     * @return
-     */
-    RedisTemplate getRedisTemplate(RedisConfig config);
-
-    /**
-     * 获取redis连接实例
-     *
-     * @param config
-     * @param maxTotal 最大连接数
-     * @param maxIdle  在jedispool中最大的idle状态(空闲的)的jedis实例的个数
-     * @param minIdle  在jedispool中最小的idle状态(空闲的)的jedis实例的个数
-     * @return
-     */
-    RedisTemplate getRedisTemplate(RedisConfig config, Integer maxTotal, Integer maxIdle, Integer minIdle);
-
-    /**
-     * 关闭连接
-     *
-     * @param redisTemplate
-     */
-    void close(RedisTemplate redisTemplate);
-
-}

+ 0 - 155
dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/RedisConnector.java

@@ -1,155 +0,0 @@
-package org.dbsyncer.connector.redis;
-
-import org.apache.commons.lang.StringUtils;
-import org.dbsyncer.common.model.Result;
-import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.*;
-import org.dbsyncer.connector.util.RedisUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisPool;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public final class RedisConnector implements Redis {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    @Override
-    public ConnectorMapper connect(ConnectorConfig config) {
-        return null;
-    }
-
-    @Override
-    public void disconnect(ConnectorMapper connectorMapper) {
-
-    }
-
-    @Override
-    public boolean isAlive(ConnectorMapper connectorMapper) {
-        RedisConfig cfg = (RedisConfig) connectorMapper.getConfig();
-        RedisTemplate template = null;
-        boolean r = false;
-        try {
-            // 打开连接
-            template = this.getRedisTemplate(cfg, 1, 1, 1);
-            r = true;
-        } catch (Exception e) {
-            logger.error("Failed to connect:{}", cfg.getUrl(), e.getLocalizedMessage());
-        } finally {
-            // 释放连接
-            if (null != template) {
-                template.close();
-            }
-        }
-        return r;
-    }
-
-    @Override
-    public String getConnectorMapperCacheKey(ConnectorConfig config) {
-        return null;
-    }
-
-    @Override
-    public List<String> getTable(ConnectorConfig config) {
-        return null;
-    }
-
-    @Override
-    public MetaInfo getMetaInfo(ConnectorConfig config, String tableName) {
-        return null;
-    }
-
-    @Override
-    public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
-        return null;
-    }
-
-    @Override
-    public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
-        return null;
-    }
-
-    @Override
-    public long getCount(ConnectorConfig config, Map<String, String> command) {
-        return 0;
-    }
-
-    @Override
-    public Result reader(ReaderConfig config) {
-        return null;
-    }
-
-    @Override
-    public Result writer(WriterBatchConfig config) {
-        return null;
-    }
-
-    @Override
-    public Result writer(WriterSingleConfig config) {
-        return null;
-    }
-
-    @Override
-    public RedisTemplate getRedisTemplate(RedisConfig config) {
-        return this.getRedisTemplate(config, null, null, null);
-    }
-
-    @Override
-    public RedisTemplate getRedisTemplate(RedisConfig config, Integer maxTotal, Integer maxIdle, Integer minIdle) {
-        // 获取节点地址
-        List<String[]> servers = this.getServers(config.getUrl());
-        if (null == servers || servers.isEmpty()) {
-            logger.error("Url can not be null or invalid.");
-            throw new IllegalArgumentException("Url can not be null or invalid.");
-        }
-
-        String password = config.getPassword();
-        RedisTemplate redisTemplate = null;
-        // 判断集群/单机
-        int size = servers.size();
-        if (1 < size) {
-            JedisCluster cluster = RedisUtil.getJedisCluster(servers, password);
-            redisTemplate = new RedisTemplateCluster(cluster);
-        } else {
-            String[] server = servers.get(0);
-            String ip = server[0];
-            int port = Integer.parseInt(server[1]);
-            JedisPool pool = RedisUtil.getJedisPool(ip, port, password, maxTotal, maxIdle, minIdle);
-            redisTemplate = new RedisTemplateSingle(pool);
-        }
-        return redisTemplate;
-    }
-
-    @Override
-    public void close(RedisTemplate redisTemplate) {
-        redisTemplate.close();
-    }
-
-    /**
-     * 获取redis服务列表
-     *
-     * @param servers
-     * @return List<String服务列表
-     */
-    private List<String[]> getServers(String servers) {
-        List<String[]> list = null;
-        if (StringUtils.isNotEmpty(servers)) {
-            list = new ArrayList<String[]>();
-            // 服务列表以逗号分隔
-            String[] splits = servers.split(",");
-            for (String server : splits) {
-                String[] node = server.split(":");
-                if (node.length != 2) {
-                    continue;
-                }
-                list.add(node);
-            }
-        }
-        return list;
-    }
-
-}

+ 0 - 40
dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/RedisTemplate.java

@@ -1,40 +0,0 @@
-package org.dbsyncer.connector.redis;
-
-/**
- * 提供Redis单机/集群统一API接口
- * @author AE86
- * @date 2018年5月28日 上午9:48:17
- * @version 1.0.0
- */
-public interface RedisTemplate {
-
-    /**
-     * 插入key值
-     * @param key
-     * @param val
-     */
-    void set(byte[] key, byte[] val);
-
-    /**
-     * 获取key值
-     * @param key
-     * @return
-     */
-    byte[] get(byte[] key);
-
-    /**
-     * 删除指定key
-     * @param key
-     */
-    void del(byte[] key);
-
-    /**
-     * 清空所有数据
-     */
-    void flushAll();
-    
-    /**
-     * 释放连接
-     */
-    void close();
-}

+ 0 - 55
dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/RedisTemplateCluster.java

@@ -1,55 +0,0 @@
-package org.dbsyncer.connector.redis;
-
-import org.dbsyncer.connector.util.RedisUtil;
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisPool;
-
-import java.util.Map;
-
-/**
- * 提供Redis集群API接口
- * @author AE86
- * @date 2018年5月28日 上午10:08:45
- * @version 1.0.0
- */
-public class RedisTemplateCluster implements RedisTemplate {
-
-    /**
-     * 集群客户端
-     */
-    private JedisCluster cluster;
-
-    public RedisTemplateCluster(JedisCluster cluster) {
-        this.cluster = cluster;
-    }
-
-    @Override
-    public void set(byte[] key, byte[] val) {
-        cluster.set(key, val);
-    }
-
-    @Override
-    public byte[] get(byte[] key) {
-        return cluster.get(key);
-    }
-
-    @Override
-    public void del(byte[] key) {
-        cluster.del(key);
-    }
-
-    @Override
-    public void flushAll() {
-        Map<String, JedisPool> nodes = cluster.getClusterNodes();
-        for (Map.Entry<String, JedisPool> n : nodes.entrySet()) {
-            JedisPool pool = n.getValue();
-            pool.getResource().flushAll();
-        }
-    }
-
-    @Override
-    public void close() {
-        // 关闭集群客户端
-        RedisUtil.close(cluster);
-    }
-}

+ 0 - 61
dbsyncer-connector/src/main/java/org/dbsyncer/connector/redis/RedisTemplateSingle.java

@@ -1,61 +0,0 @@
-package org.dbsyncer.connector.redis;
-
-import org.dbsyncer.connector.util.RedisUtil;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
-
-/**
- * 提供Redis单机API接口
- * @author AE86
- * @date 2018年5月28日 上午10:08:26
- * @version 1.0.0
- */
-public class RedisTemplateSingle implements RedisTemplate {
-
-    /**
-     * 连接池
-     */
-    private JedisPool pool;
-
-    public RedisTemplateSingle(JedisPool pool) {
-        this.pool = pool;
-    }
-
-    @Override
-    public void set(byte[] key, byte[] val) {
-        // 获取一个连接管道
-        Jedis jedis = pool.getResource();
-        jedis.set(key, val);
-        // 关闭连接管道
-        RedisUtil.close(jedis);
-    }
-
-    @Override
-    public byte[] get(byte[] key) {
-        Jedis jedis = pool.getResource();
-        byte[] bs = jedis.get(key);
-        RedisUtil.close(jedis);
-        return bs;
-    }
-
-    @Override
-    public void del(byte[] key) {
-        Jedis jedis = pool.getResource();
-        jedis.del(key);
-        RedisUtil.close(jedis);
-    }
-
-    @Override
-    public void flushAll() {
-        Jedis jedis = pool.getResource();
-        jedis.flushAll();
-        RedisUtil.close(jedis);
-    }
-
-    @Override
-    public void close() {
-        // 释放连接池
-        RedisUtil.close(pool);
-    }
-
-}

+ 3 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLMysqlConnector.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.sql;
 
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
@@ -25,12 +26,12 @@ public final class DQLMysqlConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public List<String> getTable(ConnectorConfig config) {
+    public List<String> getTable(ConnectorMapper config) {
         return super.getDqlTable(config);
     }
 
     @Override
-    public MetaInfo getMetaInfo(ConnectorConfig config, String tableName) {
+    public MetaInfo getMetaInfo(ConnectorMapper config, String tableName) {
         return super.getDqlMetaInfo(config);
     }
 

+ 3 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.sql;
 
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
@@ -25,12 +26,12 @@ public final class DQLOracleConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public List<String> getTable(ConnectorConfig config) {
+    public List<String> getTable(ConnectorMapper config) {
         return super.getDqlTable(config);
     }
 
     @Override
-    public MetaInfo getMetaInfo(ConnectorConfig config, String tableName) {
+    public MetaInfo getMetaInfo(ConnectorMapper config, String tableName) {
         return super.getDqlMetaInfo(config);
     }
 

+ 3 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java

@@ -2,6 +2,7 @@ package org.dbsyncer.connector.sql;
 
 import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.connector.ConnectorException;
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
@@ -35,12 +36,12 @@ public final class DQLSqlServerConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public List<String> getTable(ConnectorConfig config) {
+    public List<String> getTable(ConnectorMapper config) {
         return super.getDqlTable(config);
     }
 
     @Override
-    public MetaInfo getMetaInfo(ConnectorConfig config, String tableName) {
+    public MetaInfo getMetaInfo(ConnectorMapper config, String tableName) {
         return super.getDqlMetaInfo(config);
     }
 

+ 0 - 33
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/JDBCUtil.java

@@ -1,33 +0,0 @@
-package org.dbsyncer.connector.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-public abstract class JDBCUtil {
-
-    private final static Logger logger = LoggerFactory.getLogger(JDBCUtil.class);
-
-    public static Connection getConnection(String driver, String url, String username, String password) throws SQLException {
-        try {
-            Class.forName(driver);
-        } catch (ClassNotFoundException e) {
-            logger.error(e.getMessage());
-        }
-        return DriverManager.getConnection(url, username, password);
-    }
-
-    public static void close(Connection conn) {
-        if (null != conn) {
-            try {
-                conn.close();
-            } catch (SQLException e) {
-                logger.error(e.getClass() + " >> " + e.getLocalizedMessage());
-            }
-        }
-    }
-
-}

+ 0 - 20
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PingUtil.java

@@ -1,20 +0,0 @@
-package org.dbsyncer.connector.util;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-
-public abstract class PingUtil {
-
-    public static boolean ping(String ip, int port) {
-        try {
-            Socket s = new Socket();
-            s.connect(new InetSocketAddress(ip, port));
-            s.close();
-            return true;
-        } catch (IOException e) {
-        }
-        return false;
-    }
-    
-}

+ 0 - 177
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/RedisUtil.java

@@ -1,177 +0,0 @@
-package org.dbsyncer.connector.util;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.*;
-
-import java.io.*;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Redis连接工具类,支持单机集群连接
- * @author AE86
- * @date 2018年5月25日 上午10:42:03
- * @version 1.0.0
- */
-public abstract class RedisUtil {
-
-    private static final Logger logger = LoggerFactory.getLogger(RedisUtil.class);
-
-    private static JedisPoolConfig getJedisPoolConfig(Integer maxTotal, Integer maxIdle, Integer minIdle) {
-        JedisPoolConfig c = new JedisPoolConfig();
-        //最大连接数
-        c.setMaxTotal(null == maxTotal ? 100 : maxTotal);
-        //在jedispool中最大的idle状态(空闲的)的jedis实例的个数
-        c.setMaxIdle(null == maxIdle ? 50 : maxIdle);
-        //在jedispool中最小的idle状态(空闲的)的jedis实例的个数
-        c.setMinIdle(null == minIdle ? 20 : minIdle);
-        // 等待时间
-        c.setMaxWaitMillis(3000);
-        //在borrow一个jedis实例的时候,是否要进行验证操作,如果赋值true。则得到的jedis实例肯定是可以用的。
-        c.setTestOnBorrow(false);
-        //在return一个jedis实例的时候,是否要进行验证操作,如果赋值true。则放回jedispool的jedis实例肯定是可以用的。
-        c.setTestOnReturn(false);
-        //连接耗尽的时候,是否阻塞,false会抛出异常,true阻塞直到超时。默认为true。
-        c.setBlockWhenExhausted(false);
-        return c;
-    }
-
-    public static JedisPool getJedisPool(String ip, Integer port) {
-        return getJedisPool(ip, port, null);
-    }
-
-    public static JedisPool getJedisPool(String ip, Integer port, String password) {
-        return getJedisPool(ip, port, password, null, null, null);
-    }
-
-    public static JedisPool getJedisPool(String ip, Integer port, String password, Integer maxTotal, Integer maxIdle, Integer minIdle) {
-        if (StringUtils.isBlank(ip) || null == port) {
-            return null;
-        }
-        // 设置连接池配置
-        JedisPoolConfig c = getJedisPoolConfig(maxTotal, maxIdle, minIdle);
-
-        //jedis连接池
-        JedisPool jedisPool = new JedisPool(c, ip, port);
-        
-        // 验证密码有效性
-        if(StringUtils.isNotBlank(password)){
-            /**
-             * If the password is invalid, it will throw an JedisDataException as follows, otherwise return OK.
-             * redis.clients.jedis.exceptions.JedisDataException: ERR invalid password
-             */
-            Jedis jedis = jedisPool.getResource();
-            jedis.auth(password);
-            // 关闭管道
-            close(jedis);
-        }
-        return jedisPool;
-    }
-
-    public static JedisCluster getJedisCluster(List<String[]> servers, String password) {
-        return getJedisCluster(servers, null, null, null, password);
-    }
-
-    public static JedisCluster getJedisCluster(List<String[]> servers, Integer maxTotal, Integer maxIdle, Integer minIdle, String password) {
-        if (null == servers || servers.isEmpty()) {
-            return null;
-        }
-        // 设置连接池配置
-        JedisPoolConfig c = getJedisPoolConfig(maxTotal, maxIdle, minIdle);
-
-        // 根据服务列表明设置连接node
-        Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
-        String ip;
-        Integer port;
-        for (String[] server : servers) {
-            ip = server[0];
-            port = Integer.parseInt(server[1]);
-            // 如果不能ping
-            if(!PingUtil.ping(ip, port)){
-                String err = new StringBuilder("Can't connect ").append(ip).append(":").append(port).toString();
-                throw new IllegalArgumentException(err);
-            }
-            jedisClusterNodes.add(new HostAndPort(ip, port));
-        }
-        return new JedisCluster(jedisClusterNodes, 1000, 1000, 3, c);
-    }
-    
-    public static void close(JedisPool pool) {
-        if (null != pool) {
-            pool.destroy();
-        }
-    }
-
-    public static void close(Jedis jedis) {
-        if (jedis != null) {
-            jedis.close();
-        }
-    }
-
-    public static void close(JedisCluster cluster) {
-        if (null != cluster) {
-            try {
-                cluster.close();
-            } catch (IOException e) {
-                logger.error(e.getLocalizedMessage());
-            }
-        }
-    }
-
-    public static byte[] serializeObj(Object value) {
-        byte[] result = null;
-        if (value == null) {
-            logger.error("Can't serialize null");
-            return result;
-        }
-        ByteArrayOutputStream bos = null;
-        ObjectOutputStream os = null;
-        try {
-            bos = new ByteArrayOutputStream();
-            os = new ObjectOutputStream(bos);
-            os.writeObject(value);
-            result = bos.toByteArray();
-        } catch (IOException e) {
-            logger.error("Non-serializable object", e);
-        } finally {
-            close(os);
-            close(bos);
-        }
-        return result;
-    }
-
-    public static Object deserializeObj(byte[] in) {
-        Object result = null;
-        ByteArrayInputStream bis = null;
-        ObjectInputStream is = null;
-        try {
-            if (in != null) {
-                bis = new ByteArrayInputStream(in);
-                is = new ObjectInputStream(bis);
-                result = is.readObject();
-            }
-        } catch (IOException e) {
-            logger.error("Non-serializable object", e);
-
-        } catch (ClassNotFoundException e) {
-            logger.error("Non-serializable object", e);
-        } finally {
-            close(is);
-            close(bis);
-        }
-        return result;
-    }
-
-    private static void close(Closeable closeable) {
-        if (closeable != null) {
-            try {
-                closeable.close();
-            } catch (Exception e) {
-                throw new IllegalArgumentException("Non-serializable object", e);
-            }
-        }
-    }
-}

+ 3 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/QuartzExtractor.java

@@ -5,6 +5,7 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.UUIDUtil;
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
@@ -75,10 +76,11 @@ public class QuartzExtractor extends AbstractExtractor implements ScheduledTaskJ
 
     private void execute(Map<String, String> command, int index) {
         // 检查增量点
+        ConnectorMapper connectionMapper = connectorFactory.connect(connectorConfig);
         Point point = checkLastPoint(command, index);
         int pageIndex = 1;
         for (; ; ) {
-            Result reader = connectorFactory.reader(new ReaderConfig(connectorConfig, point.getCommand(), point.getArgs(), pageIndex++, readNum));
+            Result reader = connectorFactory.reader(new ReaderConfig(connectionMapper, point.getCommand(), point.getArgs(), pageIndex++, readNum));
             List<Map> data = reader.getData();
             if (CollectionUtils.isEmpty(data)) {
                 break;

+ 9 - 16
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -1,20 +1,19 @@
 package org.dbsyncer.listener.sqlserver;
 
-import com.microsoft.sqlserver.jdbc.SQLServerConnection;
 import com.microsoft.sqlserver.jdbc.SQLServerException;
 import org.apache.commons.lang.math.RandomUtils;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.util.Assert;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.sql.*;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -80,7 +79,6 @@ public class SqlServerExtractor extends AbstractExtractor {
             enableTableCDC();
             readChangeTables();
             readLastLsn();
-            getTrustedServerNameAE();
 
             worker = new Worker();
             worker.setName(new StringBuilder("cdc-parser-").append(serverName).append("_").append(RandomUtils.nextInt(100)).toString());
@@ -119,16 +117,12 @@ public class SqlServerExtractor extends AbstractExtractor {
         }
     }
 
-    private void connect() {
-        connection = connectorFactory.connect((DatabaseConfig) connectorConfig);
-    }
-
-    private void getTrustedServerNameAE() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
-        SQLServerConnection conn = (SQLServerConnection) connection;
-        Class clazz = conn.getClass();
-        Method method = clazz.getDeclaredMethod("getTrustedServerNameAE");
-        method.setAccessible(true);
-        serverName = (String) method.invoke(conn, new Object[]{});
+    private void connect() throws SQLException {
+        DatabaseConfig cfg = (DatabaseConfig) connectorConfig;
+        final ConnectorMapper connectorMapper = connectorFactory.connect(cfg);
+        JdbcTemplate jdbcTemplate = (JdbcTemplate) connectorMapper.getConnection();
+        connection = jdbcTemplate.getDataSource().getConnection();
+        serverName = cfg.getUrl();
     }
 
     private void readLastLsn() {
@@ -398,11 +392,10 @@ public class SqlServerExtractor extends AbstractExtractor {
             while (!isInterrupted() && connected) {
                 try {
                     Lsn stopLsn = queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
-                    if (!stopLsn.isAvailable()) {
+                    if (null == stopLsn || !stopLsn.isAvailable()) {
                         TimeUnit.MICROSECONDS.sleep(DEFAULT_POLL_INTERVAL_MILLIS);
                         continue;
                     }
-
                     if (stopLsn.compareTo(lastLsn) <= 0) {
                         TimeUnit.MICROSECONDS.sleep(DEFAULT_POLL_INTERVAL_MILLIS);
                         continue;

+ 8 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.manager;
 
 import org.dbsyncer.common.model.Paging;
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.enums.ConnectorEnum;
@@ -25,13 +26,17 @@ import java.util.Map;
  */
 public interface Manager extends Executor {
 
-    boolean alive(ConnectorConfig config);
+    // Connector
+    ConnectorMapper connect(ConnectorConfig config);
+
+    boolean refreshConnectorConfig(ConnectorConfig config);
 
-    List<String> getTable(ConnectorConfig config);
+    boolean isAliveConnectorConfig(ConnectorConfig config);
+
+    List<String> getTable(ConnectorMapper config);
 
     MetaInfo getMetaInfo(String connectorId, String tableName);
 
-    // Connector
     String addConnector(ConfigModel model);
 
     String editConnector(ConfigModel model);

+ 14 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -2,6 +2,7 @@ package org.dbsyncer.manager;
 
 import org.dbsyncer.common.event.ClosedEvent;
 import org.dbsyncer.common.model.Paging;
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.enums.ConnectorEnum;
@@ -62,12 +63,22 @@ public class ManagerFactory implements Manager, ApplicationListener<ClosedEvent>
     private Map<String, Puller> map;
 
     @Override
-    public boolean alive(ConnectorConfig config) {
-        return parser.alive(config);
+    public ConnectorMapper connect(ConnectorConfig config) {
+        return parser.connect(config);
     }
 
     @Override
-    public List<String> getTable(ConnectorConfig config) {
+    public boolean refreshConnectorConfig(ConnectorConfig config) {
+        return parser.refreshConnectorConfig(config);
+    }
+
+    @Override
+    public boolean isAliveConnectorConfig(ConnectorConfig config) {
+        return parser.isAliveConnectorConfig(config);
+    }
+
+    @Override
+    public List<String> getTable(ConnectorMapper config) {
         return parser.getTable(config);
     }
 

+ 1 - 1
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/Monitor.java

@@ -9,7 +9,7 @@ import java.util.Map;
  */
 public interface Monitor {
 
-    boolean alive(String id);
+    boolean isAlive(String id);
 
     Map getThreadInfo();
 }

+ 2 - 3
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.monitor;
 
-import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.model.Connector;
 import org.slf4j.Logger;
@@ -33,9 +32,9 @@ public class MonitorFactory implements Monitor {
 
     @Override
     @Cacheable(value = "connector", keyGenerator = "cacheKeyGenerator")
-    public boolean alive(String id) {
+    public boolean isAlive(String id) {
         Connector connector = manager.getConnector(id);
-        return null != connector ? manager.alive(connector.getConfig()) : false;
+        return null != connector ? manager.isAliveConnectorConfig(connector.getConfig()) : false;
     }
 
     @Override

+ 19 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -2,6 +2,7 @@ package org.dbsyncer.parser;
 
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Task;
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.enums.ConnectorEnum;
@@ -25,12 +26,27 @@ import java.util.Map;
 public interface Parser {
 
     /**
-     * 解析连接器配置是否可用
+     * 获取连接配置
      *
      * @param config
      * @return
      */
-    boolean alive(ConnectorConfig config);
+    ConnectorMapper connect(ConnectorConfig config);
+
+    /**
+     * 刷新连接配置
+     *
+     * @param config
+     */
+    boolean refreshConnectorConfig(ConnectorConfig config);
+
+    /**
+     * 连接配置是否可用
+     *
+     * @param config
+     * @return
+     */
+    boolean isAliveConnectorConfig(ConnectorConfig config);
 
     /**
      * 获取连接器表
@@ -38,7 +54,7 @@ public interface Parser {
      * @param config
      * @return
      */
-    List<String> getTable(ConnectorConfig config);
+    List<String> getTable(ConnectorMapper config);
 
     /**
      * 获取表元信息

+ 29 - 16
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -9,6 +9,7 @@ import org.dbsyncer.common.model.Task;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.ConnectorEnum;
@@ -66,19 +67,29 @@ public class ParserFactory implements Parser {
     private ApplicationContext applicationContext;
 
     @Override
-    public boolean alive(ConnectorConfig config) {
+    public ConnectorMapper connect(ConnectorConfig config) {
+        return connectorFactory.connect(config);
+    }
+
+    @Override
+    public boolean refreshConnectorConfig(ConnectorConfig config) {
+        return connectorFactory.refresh(config);
+    }
+
+    @Override
+    public boolean isAliveConnectorConfig(ConnectorConfig config) {
         return connectorFactory.isAlive(config);
     }
 
     @Override
-    public List<String> getTable(ConnectorConfig config) {
+    public List<String> getTable(ConnectorMapper config) {
         return connectorFactory.getTable(config);
     }
 
     @Override
     public MetaInfo getMetaInfo(String connectorId, String tableName) {
-        ConnectorConfig config = getConnectorConfig(connectorId);
-        return connectorFactory.getMetaInfo(config, tableName);
+        ConnectorMapper connectorMapper = connectorFactory.connect(getConnectorConfig(connectorId));
+        return connectorFactory.getMetaInfo(connectorMapper, tableName);
     }
 
     @Override
@@ -109,8 +120,8 @@ public class ParserFactory implements Parser {
 
     @Override
     public long getCount(String connectorId, Map<String, String> command) {
-        ConnectorConfig config = getConnectorConfig(connectorId);
-        return connectorFactory.getCount(config, command);
+        ConnectorMapper connectorMapper = connectorFactory.connect(getConnectorConfig(connectorId));
+        return connectorFactory.getCount(connectorMapper, command);
     }
 
     @Override
@@ -201,6 +212,8 @@ public class ParserFactory implements Parser {
         int pageSize = mapping.getReadNum();
         int threadSize = mapping.getThreadNum();
         int batchSize = mapping.getBatchNum();
+        ConnectorMapper sConnectionMapper = connectorFactory.connect(sConfig);
+        ConnectorMapper tConnectionMapper = connectorFactory.connect(tConfig);
 
         for (; ; ) {
             if (!task.isRunning()) {
@@ -210,7 +223,7 @@ public class ParserFactory implements Parser {
 
             // 1、获取数据源数据
             int pageIndex = Integer.parseInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
-            Result reader = connectorFactory.reader(new ReaderConfig(sConfig, command, new ArrayList<>(), pageIndex, pageSize));
+            Result reader = connectorFactory.reader(new ReaderConfig(sConnectionMapper, command, new ArrayList<>(), pageIndex, pageSize));
             List<Map> data = reader.getData();
             if (CollectionUtils.isEmpty(data)) {
                 params.clear();
@@ -228,7 +241,7 @@ public class ParserFactory implements Parser {
             pluginFactory.convert(group.getPlugin(), data, target);
 
             // 5、写入目标源
-            Result writer = writeBatch(tConfig, command, picker.getTargetFields(), target, threadSize, batchSize);
+            Result writer = writeBatch(tConnectionMapper, command, picker.getTargetFields(), target, threadSize, batchSize);
 
             // 6、更新结果
             flush(task, writer, target);
@@ -244,7 +257,7 @@ public class ParserFactory implements Parser {
                 rowChangedEvent.getBefore(), rowChangedEvent.getAfter());
         final String metaId = mapping.getMetaId();
 
-        ConnectorConfig tConfig = getConnectorConfig(mapping.getTargetConnectorId());
+        ConnectorMapper tConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
         // 1、获取映射字段
         final String event = rowChangedEvent.getEvent();
         Map<String, Object> data = StringUtils.equals(ConnectorConstant.OPERTION_DELETE, event) ? rowChangedEvent.getBefore() : rowChangedEvent.getAfter();
@@ -258,7 +271,7 @@ public class ParserFactory implements Parser {
         pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
 
         // 4、写入目标源
-        Result writer = connectorFactory.writer(new WriterSingleConfig(tConfig, picker.getTargetFields(), tableGroup.getCommand(), event, target, rowChangedEvent.getTableName()));
+        Result writer = connectorFactory.writer(new WriterSingleConfig(tConnectorMapper, picker.getTargetFields(), tableGroup.getCommand(), event, target, rowChangedEvent.getTableName()));
 
         // 5、更新结果
         flush(metaId, writer, event, picker.getTargetMapList());
@@ -329,7 +342,7 @@ public class ParserFactory implements Parser {
     /**
      * 批量写入
      *
-     * @param config
+     * @param connectorMapper
      * @param command
      * @param fields
      * @param target
@@ -337,13 +350,13 @@ public class ParserFactory implements Parser {
      * @param batchSize
      * @return
      */
-    private Result writeBatch(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map> target,
+    private Result writeBatch(ConnectorMapper connectorMapper, Map<String, String> command, List<Field> fields, List<Map> target,
                               int threadSize, int batchSize) {
         // 总数
         int total = target.size();
         // 单次任务
         if (total <= batchSize) {
-            return connectorFactory.writer(new WriterBatchConfig(config, command, fields, target));
+            return connectorFactory.writer(new WriterBatchConfig(connectorMapper, command, fields, target));
         }
 
         // 批量任务, 拆分
@@ -365,7 +378,7 @@ public class ParserFactory implements Parser {
             for (int i = 0; i < threadSize; i++) {
                 executor.execute(() -> {
                     try {
-                        Result w = parallelTask(batchSize, queue, config, command, fields);
+                        Result w = parallelTask(batchSize, queue, connectorMapper, command, fields);
                         // CAS
                         result.getFailData().addAll(w.getFailData());
                         result.getFail().getAndAdd(w.getFail().get());
@@ -390,7 +403,7 @@ public class ParserFactory implements Parser {
         return result;
     }
 
-    private Result parallelTask(int batchSize, Queue<Map> queue, ConnectorConfig config, Map<String, String> command,
+    private Result parallelTask(int batchSize, Queue<Map> queue, ConnectorMapper connectorMapper, Map<String, String> command,
                                 List<Field> fields) {
         List<Map> data = new ArrayList<>();
         for (int j = 0; j < batchSize; j++) {
@@ -400,7 +413,7 @@ public class ParserFactory implements Parser {
             }
             data.add(poll);
         }
-        return connectorFactory.writer(new WriterBatchConfig(config, command, fields, data));
+        return connectorFactory.writer(new WriterBatchConfig(connectorMapper, command, fields, data));
     }
 
     private ThreadPoolTaskExecutor getThreadPoolTaskExecutor(int threadSize, int queueCapacity) {

+ 13 - 4
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -4,6 +4,7 @@ import org.apache.commons.lang.StringUtils;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.Database;
@@ -12,7 +13,6 @@ import org.dbsyncer.connector.enums.SetterEnum;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
 import org.dbsyncer.connector.mysql.MysqlConnector;
 import org.dbsyncer.connector.util.DatabaseUtil;
-import org.dbsyncer.connector.util.JDBCUtil;
 import org.dbsyncer.storage.AbstractStorageService;
 import org.dbsyncer.storage.StorageException;
 import org.dbsyncer.storage.constant.ConfigConstant;
@@ -32,7 +32,11 @@ import org.springframework.util.Assert;
 import javax.annotation.PostConstruct;
 import java.io.*;
 import java.sql.Connection;
-import java.util.*;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -88,7 +92,11 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             logger.error("无法连接Mysql,URL:{}", config.getUrl());
             throw new StorageException(e.getMessage());
         } finally {
-            JDBCUtil.close(conn);
+            try {
+                DatabaseUtil.close(conn);
+            } catch (SQLException e) {
+                logger.error("关闭连接Mysql,URL:{}", config.getUrl());
+            }
         }
 
         // 初始化表
@@ -163,7 +171,8 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             Executor executor = getExecutor(type, table);
             Map<String, String> command = new HashMap<>();
             command.put(SqlBuilderEnum.INSERT.getName(), executor.getInsert());
-            connectorFactory.writer(new WriterBatchConfig(config, command, executor.getFields(), list));
+            ConnectorMapper connectorMapper = connectorFactory.connect(config);
+            connectorFactory.writer(new WriterBatchConfig(connectorMapper, command, executor.getFields(), list));
         }
 
     }