AE86 3 lat temu
rodzic
commit
c2c5378ad6

+ 5 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/JDBCUtil.java

@@ -13,8 +13,11 @@ public abstract class JDBCUtil {
     private final static Logger logger = LoggerFactory.getLogger(JDBCUtil.class);
 
     public static Connection getConnection(String driver, String url, String username, String password) throws SQLException {
-        // com.mysql.jdbc.JDBC4Connection 
-        // 不需要显式调用 Class.forName(driver), DriverManager.getConnection会自动加载合适的驱动
+        try {
+            Class.forName(driver);
+        } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+        }
         return DriverManager.getConnection(url, username, password);
     }
 

+ 25 - 10
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -7,6 +7,8 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.util.DatabaseUtil;
+import org.dbsyncer.connector.util.JDBCUtil;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.slf4j.Logger;
@@ -17,6 +19,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.sql.*;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -47,7 +50,9 @@ public class SqlServerExtractor extends AbstractExtractor {
 
     private static final String LSN_POSITION = "position";
     private static final long DEFAULT_POLL_INTERVAL_MILLIS = 360;
+    private static final int PREPARED_STATEMENT_CACHE_CAPACITY = 500;
     private static final int OFFSET_COLUMNS = 4;
+    private final Map<String, PreparedStatement> preparedStatementCache = new ConcurrentHashMap<>(PREPARED_STATEMENT_CACHE_CAPACITY);
     private final Lock connectLock = new ReentrantLock();
     private volatile boolean connected;
     private static Set<String> tables;
@@ -64,7 +69,7 @@ public class SqlServerExtractor extends AbstractExtractor {
                 logger.error("SqlServerExtractor is already started");
                 return;
             }
-
+            connected = true;
             connection = connect();
             tables = readTables();
             Assert.isTrue(!CollectionUtils.isEmpty(tables), "No tables available");
@@ -81,8 +86,6 @@ public class SqlServerExtractor extends AbstractExtractor {
             worker.setName(new StringBuilder("cdc-parser-").append(getTrustedServerNameAE()).append("_").append(RandomUtils.nextInt(100)).toString());
             worker.setDaemon(false);
             worker.start();
-
-            connected = true;
         } catch (Exception e) {
             close();
             logger.error("启动失败:{}", e.getMessage());
@@ -102,10 +105,12 @@ public class SqlServerExtractor extends AbstractExtractor {
                     worker = null;
                 }
                 disableTableCDC();
-                if (null != connection) {
-                    close(connection);
-                }
+                preparedStatementCache.values().forEach(this::close);
+                preparedStatementCache.clear();
+                DatabaseUtil.close(connection);
                 connected = false;
+            } catch (SQLException e) {
+                logger.error(e.getMessage());
             } finally {
                 connectLock.unlock();
             }
@@ -124,10 +129,11 @@ public class SqlServerExtractor extends AbstractExtractor {
 
     private Connection connect() throws SQLException {
         final DatabaseConfig config = (DatabaseConfig) connectorConfig;
+        String driverClassName = config.getDriverClassName();
         String username = config.getUsername();
         String password = config.getPassword();
         String url = config.getUrl();
-        return DriverManager.getConnection(url, username, password);
+        return JDBCUtil.getConnection(driverClassName, url, username, password);
     }
 
     private String getTrustedServerNameAE() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
@@ -316,11 +322,10 @@ public class SqlServerExtractor extends AbstractExtractor {
     }
 
     private <T> T query(String sql, StatementPreparer statementPreparer, ResultSetMapper<T> mapper) {
-        PreparedStatement ps = null;
         ResultSet rs = null;
         T apply = null;
         try {
-            ps = connection.prepareStatement(sql);
+            final PreparedStatement ps = createPreparedStatement(sql);
             if (null != statementPreparer) {
                 statementPreparer.accept(ps);
             }
@@ -332,11 +337,21 @@ public class SqlServerExtractor extends AbstractExtractor {
             logger.error(e.getMessage());
         } finally {
             close(rs);
-            close(ps);
         }
         return apply;
     }
 
+    private PreparedStatement createPreparedStatement(String preparedQueryString) {
+        return preparedStatementCache.computeIfAbsent(preparedQueryString, query -> {
+            try {
+                logger.info("Inserting prepared statement '{}' removed from the cache", query);
+                return connection.prepareStatement(query);
+            } catch (SQLException e) {
+                throw new ListenerException(e);
+            }
+        });
+    }
+
     enum TableOperation {
         /**
          * 插入