فهرست منبع

优化连接数,修复oracle丢失率

AE86 3 سال پیش
والد
کامیت
ecd3126474

+ 0 - 6
dbsyncer-connector/pom.xml

@@ -24,12 +24,6 @@
             <artifactId>spring-jdbc</artifactId>
         </dependency>
 
-        <!-- druid数据源 -->
-        <dependency>
-            <groupId>com.alibaba</groupId>
-            <artifactId>druid</artifactId>
-        </dependency>
-
         <!-- mysql-driver -->
         <dependency>
             <groupId>mysql</groupId>

+ 4 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorMapper.java

@@ -23,7 +23,9 @@ public interface ConnectorMapper<K, V> {
 
     K getConfig();
 
-    V getConnection();
+    default V getConnection() {
+        throw new ConnectorException("Unsupported method.");
+    }
 
-    void close();
+    default void close(){}
 }

+ 101 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -12,14 +12,17 @@ import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.enums.SetterEnum;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
+import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.util.DatabaseUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.jdbc.core.BatchPreparedStatementSetter;
+import org.springframework.jdbc.support.rowset.ResultSetWrappingSqlRowSet;
+import org.springframework.jdbc.support.rowset.SqlRowSet;
+import org.springframework.jdbc.support.rowset.SqlRowSetMetaData;
 import org.springframework.util.Assert;
 
-import java.sql.Connection;
-import java.sql.PreparedStatement;
+import java.sql.*;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -69,7 +72,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) {
         String quotation = buildSqlWithQuotation();
         StringBuilder queryMetaSql = new StringBuilder("SELECT * FROM ").append(quotation).append(tableName).append(quotation).append(" WHERE 1 != 1");
-        return connectorMapper.execute(databaseTemplate -> DatabaseUtil.getMetaInfo(databaseTemplate, queryMetaSql.toString(), tableName));
+        return connectorMapper.execute(databaseTemplate -> getMetaInfo(databaseTemplate, queryMetaSql.toString(), tableName));
     }
 
     @Override
@@ -230,7 +233,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
         // 获取查询总数SQL
         String quotation = buildSqlWithQuotation();
-        String pk = DatabaseUtil.findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
+        String pk = findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
         StringBuilder queryCount = new StringBuilder();
         queryCount.append("SELECT COUNT(1) FROM (SELECT 1 FROM ").append(quotation).append(table.getName()).append(quotation);
         if (StringUtil.isNotBlank(queryFilterSql)) {
@@ -262,7 +265,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
         // 获取查询数据行是否存在
         String quotation = buildSqlWithQuotation();
-        String pk = DatabaseUtil.findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
+        String pk = findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
         StringBuilder queryCount = new StringBuilder().append("SELECT COUNT(1) FROM ").append(quotation).append(table.getName()).append(
                 quotation).append(" WHERE ").append(pk).append(" = ?");
         String queryCountExist = ConnectorConstant.OPERTION_QUERY_COUNT_EXIST;
@@ -295,7 +298,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         DatabaseConfig cfg = config.getConfig();
         String sql = cfg.getSql().toUpperCase();
         String queryMetaSql = StringUtil.contains(sql, " WHERE ") ? sql + " AND 1!=1 " : sql + " WHERE 1!=1 ";
-        return config.execute(databaseTemplate -> DatabaseUtil.getMetaInfo(databaseTemplate, queryMetaSql, cfg.getTable()));
+        return config.execute(databaseTemplate -> getMetaInfo(databaseTemplate, queryMetaSql, cfg.getTable()));
     }
 
     /**
@@ -320,7 +323,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             querySql += queryFilterSql;
         }
         String quotation = buildSqlWithQuotation();
-        String pk = DatabaseUtil.findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
+        String pk = findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
         map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(new PageSqlConfig(querySql, pk)));
 
         // 获取查询总数SQL
@@ -428,7 +431,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             throw new ConnectorException("Table name can not be empty.");
         }
         if (StringUtil.isBlank(pk)) {
-            pk = DatabaseUtil.findTablePrimaryKey(originalTable, "");
+            pk = findTablePrimaryKey(originalTable, "");
         }
 
         SqlBuilderConfig config = new SqlBuilderConfig(this, tableName, pk, fields, queryFilterSQL, buildSqlWithQuotation());
@@ -505,4 +508,94 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return rowNum > 0;
     }
 
+    /**
+     * 获取数据库表元数据信息
+     *
+     * @param databaseTemplate
+     * @param metaSql          查询元数据
+     * @param tableName        表名
+     * @return
+     */
+    private MetaInfo getMetaInfo(DatabaseTemplate databaseTemplate, String metaSql, String tableName) throws SQLException {
+        SqlRowSet sqlRowSet = databaseTemplate.queryForRowSet(metaSql);
+        ResultSetWrappingSqlRowSet rowSet = (ResultSetWrappingSqlRowSet) sqlRowSet;
+        SqlRowSetMetaData metaData = rowSet.getMetaData();
+
+        // 查询表字段信息
+        int columnCount = metaData.getColumnCount();
+        if (1 > columnCount) {
+            throw new ConnectorException("查询表字段不能为空.");
+        }
+        List<Field> fields = new ArrayList<>(columnCount);
+        Map<String, List<String>> tables = new HashMap<>();
+        try {
+            DatabaseMetaData md = databaseTemplate.getConnection().getMetaData();
+            String name = null;
+            String label = null;
+            String typeName = null;
+            String table = null;
+            int columnType;
+            boolean pk;
+            for (int i = 1; i <= columnCount; i++) {
+                table = StringUtil.isNotBlank(tableName) ? tableName : metaData.getTableName(i);
+                if (null == tables.get(table)) {
+                    tables.putIfAbsent(table, findTablePrimaryKeys(md, table));
+                }
+                name = metaData.getColumnName(i);
+                label = metaData.getColumnLabel(i);
+                typeName = metaData.getColumnTypeName(i);
+                columnType = metaData.getColumnType(i);
+                pk = isPk(tables, table, name);
+                fields.add(new Field(label, typeName, columnType, pk));
+            }
+        } finally {
+            tables.clear();
+        }
+        return new MetaInfo().setColumn(fields);
+    }
+
+    /**
+     * 返回主键名称
+     *
+     * @param table
+     * @param quotation
+     * @return
+     */
+    private String findTablePrimaryKey(Table table, String quotation) {
+        if (null != table) {
+            List<Field> column = table.getColumn();
+            if (!CollectionUtils.isEmpty(column)) {
+                for (Field c : column) {
+                    if (c.isPk()) {
+                        return new StringBuilder(quotation).append(c.getName()).append(quotation).toString();
+                    }
+                }
+            }
+        }
+        if(!TableTypeEnum.isView(table.getType())){
+            throw new ConnectorException("Table primary key can not be empty.");
+        }
+        return "";
+    }
+
+    private boolean isPk(Map<String, List<String>> tables, String tableName, String name) {
+        List<String> pk = tables.get(tableName);
+        return !CollectionUtils.isEmpty(pk) && pk.contains(name);
+    }
+
+    private List<String> findTablePrimaryKeys(DatabaseMetaData md, String tableName) throws SQLException {
+        //根据表名获得主键结果集
+        ResultSet rs = null;
+        List<String> primaryKeys = new ArrayList<>();
+        try {
+            rs = md.getPrimaryKeys(null, null, tableName);
+            while (rs.next()) {
+                primaryKeys.add(rs.getString("COLUMN_NAME"));
+            }
+        } finally {
+            DatabaseUtil.close(rs);
+        }
+        return primaryKeys;
+    }
+
 }

+ 11 - 15
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseConnectorMapper.java

@@ -4,27 +4,32 @@ import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.util.DatabaseUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.dao.EmptyResultDataAccessException;
 
 import java.sql.Connection;
-import java.sql.SQLException;
 
 public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig, Connection> {
-    protected DatabaseConfig config;
-    protected DatabaseTemplate template;
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+    private DatabaseConfig config;
 
-    public DatabaseConnectorMapper(DatabaseConfig config) throws SQLException {
+    public DatabaseConnectorMapper(DatabaseConfig config) {
         this.config = config;
-        template = new DatabaseTemplate(DatabaseUtil.getConnection(config));
     }
 
     public <T> T execute(HandleCallback callback) {
+        Connection connection = null;
         try {
-            return (T) callback.apply(template);
+            connection = DatabaseUtil.getConnection(config);
+            return (T) callback.apply(new DatabaseTemplate(connection));
         } catch (EmptyResultDataAccessException e) {
             throw e;
         } catch (Exception e) {
+            logger.error(e.getMessage());
             throw new ConnectorException(e.getMessage());
+        } finally {
+            DatabaseUtil.close(connection);
         }
     }
 
@@ -33,13 +38,4 @@ public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig,
         return config;
     }
 
-    @Override
-    public Connection getConnection() {
-        return template.getConnection();
-    }
-
-    @Override
-    public void close() {
-        DatabaseUtil.close(getConnection());
-    }
 }

+ 7 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnectorMapper.java

@@ -3,11 +3,14 @@ package org.dbsyncer.connector.sqlserver;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
+import org.dbsyncer.connector.database.DatabaseTemplate;
 import org.dbsyncer.connector.database.HandleCallback;
+import org.dbsyncer.connector.util.DatabaseUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.dao.EmptyResultDataAccessException;
 
+import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -33,10 +36,12 @@ public final class SqlServerConnectorMapper extends DatabaseConnectorMapper {
         final Lock connectionLock = lock;
         boolean locked = false;
         Object apply = null;
+        Connection connection = null;
         try {
             locked = connectionLock.tryLock(60, TimeUnit.SECONDS);
             if (locked) {
-                apply = callback.apply(template);
+                connection = DatabaseUtil.getConnection(getConfig());
+                apply = callback.apply(new DatabaseTemplate(connection));
             }
         } catch (EmptyResultDataAccessException e) {
             throw e;
@@ -45,6 +50,7 @@ public final class SqlServerConnectorMapper extends DatabaseConnectorMapper {
             throw new ConnectorException(e.getMessage());
         } finally {
             if (locked) {
+                DatabaseUtil.close(connection);
                 connectionLock.unlock();
             }
         }

+ 3 - 105
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java

@@ -1,23 +1,11 @@
 package org.dbsyncer.connector.util;
 
-import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.Field;
-import org.dbsyncer.connector.config.MetaInfo;
-import org.dbsyncer.connector.config.Table;
-import org.dbsyncer.connector.database.DatabaseTemplate;
-import org.dbsyncer.connector.enums.TableTypeEnum;
-import org.springframework.jdbc.support.rowset.ResultSetWrappingSqlRowSet;
-import org.springframework.jdbc.support.rowset.SqlRowSet;
-import org.springframework.jdbc.support.rowset.SqlRowSetMetaData;
 
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
 
 public abstract class DatabaseUtil {
 
@@ -38,94 +26,4 @@ public abstract class DatabaseUtil {
         }
     }
 
-    /**
-     * 获取数据库表元数据信息
-     *
-     * @param databaseTemplate
-     * @param metaSql          查询元数据
-     * @param tableName        表名
-     * @return
-     */
-    public static MetaInfo getMetaInfo(DatabaseTemplate databaseTemplate, String metaSql, String tableName) throws SQLException {
-        SqlRowSet sqlRowSet = databaseTemplate.queryForRowSet(metaSql);
-        ResultSetWrappingSqlRowSet rowSet = (ResultSetWrappingSqlRowSet) sqlRowSet;
-        SqlRowSetMetaData metaData = rowSet.getMetaData();
-
-        // 查询表字段信息
-        int columnCount = metaData.getColumnCount();
-        if (1 > columnCount) {
-            throw new ConnectorException("查询表字段不能为空.");
-        }
-        List<Field> fields = new ArrayList<>(columnCount);
-        Map<String, List<String>> tables = new HashMap<>();
-        try {
-            DatabaseMetaData md = databaseTemplate.getConnection().getMetaData();
-            String name = null;
-            String label = null;
-            String typeName = null;
-            String table = null;
-            int columnType;
-            boolean pk;
-            for (int i = 1; i <= columnCount; i++) {
-                table = StringUtil.isNotBlank(tableName) ? tableName : metaData.getTableName(i);
-                if (null == tables.get(table)) {
-                    tables.putIfAbsent(table, findTablePrimaryKeys(md, table));
-                }
-                name = metaData.getColumnName(i);
-                label = metaData.getColumnLabel(i);
-                typeName = metaData.getColumnTypeName(i);
-                columnType = metaData.getColumnType(i);
-                pk = isPk(tables, table, name);
-                fields.add(new Field(label, typeName, columnType, pk));
-            }
-        } finally {
-            tables.clear();
-        }
-        return new MetaInfo().setColumn(fields);
-    }
-
-    /**
-     * 返回主键名称
-     *
-     * @param table
-     * @param quotation
-     * @return
-     */
-    public static String findTablePrimaryKey(Table table, String quotation) {
-        if (null != table) {
-            List<Field> column = table.getColumn();
-            if (!CollectionUtils.isEmpty(column)) {
-                for (Field c : column) {
-                    if (c.isPk()) {
-                        return new StringBuilder(quotation).append(c.getName()).append(quotation).toString();
-                    }
-                }
-            }
-        }
-        if(!TableTypeEnum.isView(table.getType())){
-            throw new ConnectorException("Table primary key can not be empty.");
-        }
-        return "";
-    }
-
-    private static boolean isPk(Map<String, List<String>> tables, String tableName, String name) {
-        List<String> pk = tables.get(tableName);
-        return !CollectionUtils.isEmpty(pk) && pk.contains(name);
-    }
-
-    private static List<String> findTablePrimaryKeys(DatabaseMetaData md, String tableName) throws SQLException {
-        //根据表名获得主键结果集
-        ResultSet rs = null;
-        List<String> primaryKeys = new ArrayList<>();
-        try {
-            rs = md.getPrimaryKeys(null, null, tableName);
-            while (rs.next()) {
-                primaryKeys.add(rs.getString("COLUMN_NAME"));
-            }
-        } finally {
-            close(rs);
-        }
-        return primaryKeys;
-    }
-
 }

+ 0 - 8
pom.xml

@@ -41,7 +41,6 @@
         <commons-fileupload.version>1.4</commons-fileupload.version>
         <commons-io.version>2.5</commons-io.version>
         <lucene-analyzers-smartcn.version>7.7.0</lucene-analyzers-smartcn.version>
-        <druid.version>1.2.8</druid.version>
         <ojdbc6.version>11.2.0.4.0-atlassian-hosted</ojdbc6.version>
         <mysql.version>5.1.40</mysql.version>
         <mysql-binlog.version>0.21.0</mysql-binlog.version>
@@ -130,13 +129,6 @@
                 <version>${lucene-analyzers-smartcn.version}</version>
             </dependency>
 
-            <!-- druid数据源 -->
-            <dependency>
-                <groupId>com.alibaba</groupId>
-                <artifactId>druid</artifactId>
-                <version>${druid.version}</version>
-            </dependency>
-
             <!-- mysql-driver -->
             <dependency>
                 <groupId>mysql</groupId>