AE86 3 rokov pred
rodič
commit
1bbf20a010

+ 0 - 16
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/JDBCUtil.java

@@ -6,7 +6,6 @@ import org.slf4j.LoggerFactory;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.sql.Statement;
 
 public abstract class JDBCUtil {
 
@@ -21,16 +20,6 @@ public abstract class JDBCUtil {
         return DriverManager.getConnection(url, username, password);
     }
 
-    public static void close(Statement statement) {
-        if (statement != null) {
-            try {
-                statement.close();
-            } catch (SQLException e) {
-                logger.error(e.getClass() + " >> " + e.getLocalizedMessage());
-            }
-        }
-    }
-
     public static void close(Connection conn) {
         if (null != conn) {
             try {
@@ -41,9 +30,4 @@ public abstract class JDBCUtil {
         }
     }
 
-    public static void close(Statement statement, Connection conn) {
-        close(statement);
-        close(conn);
-    }
-
 }

+ 21 - 27
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -18,7 +18,6 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.sql.*;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -48,10 +47,8 @@ public class SqlServerExtractor extends AbstractExtractor {
     private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT * FROM cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
 
     private static final String LSN_POSITION = "position";
-    private static final long DEFAULT_POLL_INTERVAL_MILLIS = 360;
-    private static final int PREPARED_STATEMENT_CACHE_CAPACITY = 500;
+    private static final long DEFAULT_POLL_INTERVAL_MILLIS = 36000;
     private static final int OFFSET_COLUMNS = 4;
-    private final Map<String, PreparedStatement> preparedStatementCache = new ConcurrentHashMap<>(PREPARED_STATEMENT_CACHE_CAPACITY);
     private final Lock connectLock = new ReentrantLock();
     private volatile boolean connected;
     private static Set<String> tables;
@@ -69,8 +66,8 @@ public class SqlServerExtractor extends AbstractExtractor {
                 return;
             }
             connected = true;
-            connection = connect();
-            tables = readTables();
+            connect();
+            readTables();
             Assert.isTrue(!CollectionUtils.isEmpty(tables), "No tables available");
 
             boolean enabledServerAgent = queryAndMap(IS_SERVER_AGENT_RUNNING, rs -> "Running.".equals(rs.getString(1)));
@@ -78,8 +75,9 @@ public class SqlServerExtractor extends AbstractExtractor {
 
             enableDBCDC();
             enableTableCDC();
-            changeTables = readChangeTables();
+            readChangeTables();
             readLastLsn();
+            JDBCUtil.close(connection);
 
             worker = new Worker();
             worker.setName(new StringBuilder("cdc-parser-").append(getTrustedServerNameAE()).append("_").append(RandomUtils.nextInt(100)).toString());
@@ -104,8 +102,6 @@ public class SqlServerExtractor extends AbstractExtractor {
                     worker = null;
                 }
                 disableTableCDC();
-                preparedStatementCache.values().forEach(this::close);
-                preparedStatementCache.clear();
                 JDBCUtil.close(connection);
                 connected = false;
             } finally {
@@ -124,13 +120,13 @@ public class SqlServerExtractor extends AbstractExtractor {
         }
     }
 
-    private Connection connect() throws SQLException {
+    private void connect() throws SQLException {
         final DatabaseConfig config = (DatabaseConfig) connectorConfig;
         String driverClassName = config.getDriverClassName();
         String username = config.getUsername();
         String password = config.getPassword();
         String url = config.getUrl();
-        return JDBCUtil.getConnection(driverClassName, url, username, password);
+        connection = JDBCUtil.getConnection(driverClassName, url, username, password);
     }
 
     private String getTrustedServerNameAE() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
@@ -154,8 +150,8 @@ public class SqlServerExtractor extends AbstractExtractor {
         lastLsn = Lsn.valueOf(map.get(LSN_POSITION));
     }
 
-    private Set<String> readTables() {
-        return queryAndMapList(GET_TABLE_LIST, rs -> {
+    private void readTables() {
+        tables = queryAndMapList(GET_TABLE_LIST, rs -> {
             Set<String> table = new LinkedHashSet<>();
             while (rs.next()) {
                 if (filterTable.contains(rs.getString(1))) {
@@ -166,8 +162,8 @@ public class SqlServerExtractor extends AbstractExtractor {
         });
     }
 
-    private Set<SqlServerChangeTable> readChangeTables() {
-        return queryAndMapList(GET_TABLES_CDC_ENABLED, rs -> {
+    private void readChangeTables() {
+        changeTables = queryAndMapList(GET_TABLES_CDC_ENABLED, rs -> {
             final Set<SqlServerChangeTable> tables = new HashSet<>();
             while (rs.next()) {
                 SqlServerChangeTable changeTable = new SqlServerChangeTable(
@@ -319,10 +315,11 @@ public class SqlServerExtractor extends AbstractExtractor {
     }
 
     private <T> T query(String sql, StatementPreparer statementPreparer, ResultSetMapper<T> mapper) {
+        PreparedStatement ps = null;
         ResultSet rs = null;
         T apply = null;
         try {
-            final PreparedStatement ps = createPreparedStatement(sql);
+            ps = connection.prepareStatement(sql);
             if (null != statementPreparer) {
                 statementPreparer.accept(ps);
             }
@@ -333,22 +330,12 @@ public class SqlServerExtractor extends AbstractExtractor {
         } catch (Exception e) {
             logger.error(e.getMessage());
         } finally {
+            close(ps);
             close(rs);
         }
         return apply;
     }
 
-    private PreparedStatement createPreparedStatement(String preparedQueryString) {
-        return preparedStatementCache.computeIfAbsent(preparedQueryString, query -> {
-            try {
-                logger.info("Inserting prepared statement '{}' removed from the cache", query);
-                return connection.prepareStatement(query);
-            } catch (SQLException e) {
-                throw new ListenerException(e);
-            }
-        });
-    }
-
     enum TableOperation {
         /**
          * 插入
@@ -399,6 +386,11 @@ public class SqlServerExtractor extends AbstractExtractor {
         @Override
         public void run() {
             while (!isInterrupted() && connected) {
+                try {
+                    connect();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage());
+                }
                 try {
                     Lsn stopLsn = queryAndMap(GET_MAX_LSN, rs -> new Lsn(rs.getBytes(1)));
                     if (!stopLsn.isAvailable()) {
@@ -419,6 +411,8 @@ public class SqlServerExtractor extends AbstractExtractor {
                     break;
                 } catch (Exception e) {
                     logger.error(e.getMessage());
+                } finally {
+                    JDBCUtil.close(connection);
                 }
             }
         }