Bläddra i källkod

修改超时断连

AE86 3 år sedan
förälder
incheckning
96ec05cb05

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BigintSetter.java

@@ -28,6 +28,11 @@ public class BigintSetter extends AbstractSetter<Long> {
             ps.setLong(i, bitInt.longValue());
             return;
         }
+        if (val instanceof Integer) {
+            Integer integer = (Integer) val;
+            ps.setLong(i, integer);
+            return;
+        }
         throw new ConnectorException(String.format("BigintSetter can not find type [%s], val [%s]", type, val));
     }
 }

+ 16 - 27
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -18,7 +18,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -48,12 +47,9 @@ public class SqlServerExtractor extends AbstractExtractor {
 
     private static final String LSN_POSITION = "position";
     private static final long DEFAULT_POLL_INTERVAL_MILLIS = 300;
-    private static final int PREPARED_STATEMENT_CACHE_CAPACITY = 500;
     private static final int OFFSET_COLUMNS = 4;
-    private final Map<String, PreparedStatement> preparedStatementCache = new ConcurrentHashMap<>(PREPARED_STATEMENT_CACHE_CAPACITY);
     private final Lock connectLock = new ReentrantLock();
     private volatile boolean connected;
-    private volatile boolean connectionClosed;
     private static Set<String> tables;
     private static Set<SqlServerChangeTable> changeTables;
     private DatabaseConnectorMapper connectorMapper;
@@ -103,8 +99,6 @@ public class SqlServerExtractor extends AbstractExtractor {
                 worker.interrupt();
                 worker = null;
             }
-            preparedStatementCache.values().forEach(this::close);
-            preparedStatementCache.clear();
             connected = false;
         }
     }
@@ -125,7 +119,6 @@ public class SqlServerExtractor extends AbstractExtractor {
             connectorMapper = (DatabaseConnectorMapper) connectorFactory.connect(cfg);
             serverName = cfg.getUrl();
             schema = cfg.getSchema();
-            connectionClosed = false;
         }
     }
 
@@ -296,35 +289,26 @@ public class SqlServerExtractor extends AbstractExtractor {
     }
 
     private <T> T query(String preparedQuerySql, StatementPreparer statementPreparer, ResultSetMapper<T> mapper) {
-        if (connectionClosed) {
-            connect();
-            return null;
-        }
         Object execute = connectorMapper.execute(databaseTemplate -> {
-            if (!preparedStatementCache.containsKey(preparedQuerySql)) {
-                preparedStatementCache.putIfAbsent(preparedQuerySql, databaseTemplate.getConnection().prepareStatement(preparedQuerySql));
-            }
-            PreparedStatement ps = preparedStatementCache.get(preparedQuerySql);
-            if (ps.getConnection().isClosed() || ps.isClosed()) {
-                preparedStatementCache.clear();
-                connectionClosed = true;
-                return null;
-            }
-            if (null != statementPreparer) {
-                statementPreparer.accept(ps);
-            }
+            PreparedStatement ps = null;
             ResultSet rs = null;
+            T apply = null;
             try {
+                ps = databaseTemplate.getConnection().prepareStatement(preparedQuerySql);
+                if (null != statementPreparer) {
+                    statementPreparer.accept(ps);
+                }
                 rs = ps.executeQuery();
-                return mapper.apply(rs);
+                apply = mapper.apply(rs);
             } catch (SQLServerException e) {
                 // 为过程或函数 cdc.fn_cdc_get_all_changes_ ...  提供的参数数目不足。
             } catch (Exception e) {
                 logger.error(e.getMessage());
             } finally {
                 close(rs);
+                close(ps);
             }
-            return null;
+            return apply;
         });
         return (T) execute;
     }
@@ -345,8 +329,13 @@ public class SqlServerExtractor extends AbstractExtractor {
 
                     lastLsn = stopLsn;
                     snapshot.put(LSN_POSITION, lastLsn.toString());
-                } catch (InterruptedException e) {
-                    break;
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                    try {
+                        TimeUnit.SECONDS.sleep(1);
+                    } catch (InterruptedException ex) {
+                        logger.error(ex.getMessage());
+                    }
                 }
             }
         }

+ 3 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -91,6 +91,7 @@ public class Shard {
     }
 
     public void close() throws IOException {
+        indexWriter.flush();
         indexWriter.commit();
         indexReader.close();
         indexWriter.close();
@@ -194,6 +195,8 @@ public class Shard {
     private void execute(Object value, Callback callback) throws IOException {
         if (null != value && indexWriter.isOpen()) {
             callback.execute();
+            indexWriter.flush();
+            indexWriter.commit();
             return;
         }
         logger.error(value.toString());