Quellcode durchsuchen

升级springboot版本(spring,spring-security)& 修复连接未回收

AE86 vor 2 Jahren
Ursprung
Commit
d84cb7d402

+ 12 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -7,7 +7,11 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
-import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.config.CommandConfig;
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.ReaderConfig;
+import org.dbsyncer.connector.config.SqlBuilderConfig;
+import org.dbsyncer.connector.config.WriterBatchConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
 import org.dbsyncer.connector.enums.OperationEnum;
@@ -30,7 +34,13 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public abstract class AbstractDatabaseConnector extends AbstractConnector implements Connector<DatabaseConnectorMapper, DatabaseConfig>, Database {

+ 260 - 80
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseTemplate.java

@@ -5,17 +5,49 @@ import org.slf4j.LoggerFactory;
 import org.springframework.dao.DataAccessException;
 import org.springframework.dao.InvalidDataAccessApiUsageException;
 import org.springframework.dao.support.DataAccessUtils;
+import org.springframework.jdbc.InvalidResultSetAccessException;
 import org.springframework.jdbc.SQLWarningException;
 import org.springframework.jdbc.UncategorizedSQLException;
-import org.springframework.jdbc.core.*;
+import org.springframework.jdbc.core.ArgumentPreparedStatementSetter;
+import org.springframework.jdbc.core.ArgumentTypePreparedStatementSetter;
+import org.springframework.jdbc.core.BatchPreparedStatementSetter;
+import org.springframework.jdbc.core.CallableStatementCallback;
+import org.springframework.jdbc.core.CallableStatementCreator;
+import org.springframework.jdbc.core.ColumnMapRowMapper;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.InterruptibleBatchPreparedStatementSetter;
+import org.springframework.jdbc.core.JdbcOperations;
+import org.springframework.jdbc.core.ParameterDisposer;
+import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter;
+import org.springframework.jdbc.core.PreparedStatementCallback;
+import org.springframework.jdbc.core.PreparedStatementCreator;
+import org.springframework.jdbc.core.PreparedStatementSetter;
+import org.springframework.jdbc.core.ResultSetExtractor;
+import org.springframework.jdbc.core.ResultSetSupportingSqlParameter;
+import org.springframework.jdbc.core.RowCallbackHandler;
+import org.springframework.jdbc.core.RowMapper;
+import org.springframework.jdbc.core.RowMapperResultSetExtractor;
+import org.springframework.jdbc.core.SingleColumnRowMapper;
+import org.springframework.jdbc.core.SqlOutParameter;
+import org.springframework.jdbc.core.SqlParameter;
+import org.springframework.jdbc.core.SqlParameterValue;
+import org.springframework.jdbc.core.SqlProvider;
+import org.springframework.jdbc.core.SqlReturnResultSet;
+import org.springframework.jdbc.core.SqlReturnType;
+import org.springframework.jdbc.core.SqlReturnUpdateCount;
+import org.springframework.jdbc.core.SqlRowSetResultSetExtractor;
+import org.springframework.jdbc.core.SqlTypeValue;
+import org.springframework.jdbc.core.StatementCallback;
+import org.springframework.jdbc.core.StatementCreatorUtils;
 import org.springframework.jdbc.datasource.ConnectionProxy;
-import org.springframework.jdbc.datasource.DataSourceUtils;
-import org.springframework.jdbc.support.JdbcAccessor;
 import org.springframework.jdbc.support.JdbcUtils;
 import org.springframework.jdbc.support.KeyHolder;
+import org.springframework.jdbc.support.SQLExceptionTranslator;
+import org.springframework.jdbc.support.SQLStateSQLExceptionTranslator;
 import org.springframework.jdbc.support.rowset.SqlRowSet;
 import org.springframework.lang.Nullable;
 import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
 import org.springframework.util.LinkedCaseInsensitiveMap;
 import org.springframework.util.StringUtils;
 
@@ -23,13 +55,41 @@ import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
-import java.sql.*;
-import java.util.*;
-
-public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
+import java.sql.BatchUpdateException;
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public class DatabaseTemplate implements JdbcOperations {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    private Connection connection;
+
+    public DatabaseTemplate(Connection connection) {
+        this.connection = connection;
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    private final SQLExceptionTranslator exceptionTranslator = new SQLStateSQLExceptionTranslator();
+
     private static final String RETURN_RESULT_SET_PREFIX = "#result-set-";
 
     private static final String RETURN_UPDATE_COUNT_PREFIX = "#update-count-";
@@ -78,16 +138,10 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
      */
     private boolean resultsMapCaseInsensitive = false;
 
-    private Connection connection;
-
-    public DatabaseTemplate(Connection connection) {
-        this.connection = connection;
-    }
-
     /**
      * Set whether or not we want to ignore SQLWarnings.
      * <p>Default is "true", swallowing and logging all warnings. Switch this flag
-     * to "false" to make the JdbcTemplate throw a SQLWarningException instead.
+     * to "false" to make the JdbcTemplate throw an SQLWarningException instead.
      *
      * @see java.sql.SQLWarning
      * @see org.springframework.jdbc.SQLWarningException
@@ -219,10 +273,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return this.resultsMapCaseInsensitive;
     }
 
-    public Connection getConnection() {
-        return connection;
-    }
-//-------------------------------------------------------------------------
+    //-------------------------------------------------------------------------
     // Methods dealing with a plain java.sql.Connection
     //-------------------------------------------------------------------------
 
@@ -231,16 +282,12 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
     public <T> T execute(ConnectionCallback<T> action) throws DataAccessException {
         Assert.notNull(action, "Callback object must not be null");
 
-        Connection con = connection;
         try {
-            // Create close-suppressing Connection proxy, also preparing returned Statements.
-            Connection conToUse = createConnectionProxy(con);
-            return action.doInConnection(conToUse);
+            return action.doInConnection(connection);
         } catch (SQLException ex) {
             // Release Connection early, to avoid potential connection pool deadlock
             // in the case when the exception translator hasn't been initialized yet.
             String sql = getSql(action);
-            con = null;
             throw translateException("ConnectionCallback", sql, ex);
         }
     }
@@ -264,19 +311,18 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
                 new DatabaseTemplate.CloseSuppressingInvocationHandler(con));
     }
 
+
     //-------------------------------------------------------------------------
     // Methods dealing with static SQL (java.sql.Statement)
     //-------------------------------------------------------------------------
 
-    @Override
     @Nullable
-    public <T> T execute(StatementCallback<T> action) throws DataAccessException {
+    private <T> T execute(StatementCallback<T> action, boolean closeResources) throws DataAccessException {
         Assert.notNull(action, "Callback object must not be null");
 
-        Connection con = connection;
         Statement stmt = null;
         try {
-            stmt = con.createStatement();
+            stmt = connection.createStatement();
             applyStatementSettings(stmt);
             T result = action.doInStatement(stmt);
             handleWarnings(stmt);
@@ -289,10 +335,18 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             stmt = null;
             throw translateException("StatementCallback", sql, ex);
         } finally {
-            JdbcUtils.closeStatement(stmt);
+            if (closeResources) {
+                JdbcUtils.closeStatement(stmt);
+            }
         }
     }
 
+    @Override
+    @Nullable
+    public <T> T execute(StatementCallback<T> action) throws DataAccessException {
+        return execute(action, true);
+    }
+
     @Override
     public void execute(final String sql) throws DataAccessException {
         if (logger.isDebugEnabled()) {
@@ -316,7 +370,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             }
         }
 
-        execute(new ExecuteStatementCallback());
+        execute(new ExecuteStatementCallback(), true);
     }
 
     @Override
@@ -350,7 +404,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             }
         }
 
-        return execute(new QueryStatementCallback());
+        return execute(new QueryStatementCallback(), true);
     }
 
     @Override
@@ -363,6 +417,27 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return result(query(sql, new RowMapperResultSetExtractor<>(rowMapper)));
     }
 
+    @Override
+    public <T> Stream<T> queryForStream(String sql, RowMapper<T> rowMapper) throws DataAccessException {
+        class StreamStatementCallback implements StatementCallback<Stream<T>>, SqlProvider {
+            @Override
+            public Stream<T> doInStatement(Statement stmt) throws SQLException {
+                ResultSet rs = stmt.executeQuery(sql);
+                return new DatabaseTemplate.ResultSetSpliterator<>(rs, rowMapper).stream().onClose(() -> {
+                    JdbcUtils.closeResultSet(rs);
+                    JdbcUtils.closeStatement(stmt);
+                });
+            }
+
+            @Override
+            public String getSql() {
+                return sql;
+            }
+        }
+
+        return result(execute(new StreamStatementCallback(), false));
+    }
+
     @Override
     public Map<String, Object> queryForMap(String sql) throws DataAccessException {
         return result(queryForObject(sql, getColumnMapRowMapper()));
@@ -422,7 +497,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             }
         }
 
-        return updateCount(execute(new UpdateStatementCallback()));
+        return updateCount(execute(new UpdateStatementCallback(), true));
     }
 
     @Override
@@ -486,7 +561,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             }
         }
 
-        int[] result = execute(new BatchUpdateStatementCallback());
+        int[] result = execute(new BatchUpdateStatementCallback(), true);
         Assert.state(result != null, "No update counts");
         return result;
     }
@@ -496,9 +571,8 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
     // Methods dealing with prepared statements
     //-------------------------------------------------------------------------
 
-    @Override
     @Nullable
-    public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
+    private <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action, boolean closeResources)
             throws DataAccessException {
 
         Assert.notNull(psc, "PreparedStatementCreator must not be null");
@@ -508,10 +582,9 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));
         }
 
-        Connection con = connection;
         PreparedStatement ps = null;
         try {
-            ps = psc.createPreparedStatement(con);
+            ps = psc.createPreparedStatement(connection);
             applyStatementSettings(ps);
             T result = action.doInPreparedStatement(ps);
             handleWarnings(ps);
@@ -526,20 +599,29 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             psc = null;
             JdbcUtils.closeStatement(ps);
             ps = null;
-            con = null;
             throw translateException("PreparedStatementCallback", sql, ex);
         } finally {
-            if (psc instanceof ParameterDisposer) {
-                ((ParameterDisposer) psc).cleanupParameters();
+            if (closeResources) {
+                if (psc instanceof ParameterDisposer) {
+                    ((ParameterDisposer) psc).cleanupParameters();
+                }
+                JdbcUtils.closeStatement(ps);
             }
-            JdbcUtils.closeStatement(ps);
         }
     }
 
+    @Override
+    @Nullable
+    public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
+            throws DataAccessException {
+
+        return execute(psc, action, true);
+    }
+
     @Override
     @Nullable
     public <T> T execute(String sql, PreparedStatementCallback<T> action) throws DataAccessException {
-        return execute(new DatabaseTemplate.SimplePreparedStatementCreator(sql), action);
+        return execute(new DatabaseTemplate.SimplePreparedStatementCreator(sql), action, true);
     }
 
     /**
@@ -580,7 +662,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
                     }
                 }
             }
-        });
+        }, true);
     }
 
     @Override
@@ -601,6 +683,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return query(sql, newArgTypePreparedStatementSetter(args, argTypes), rse);
     }
 
+    @Deprecated
     @Override
     @Nullable
     public <T> T query(String sql, @Nullable Object[] args, ResultSetExtractor<T> rse) throws DataAccessException {
@@ -628,8 +711,9 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         query(sql, newArgTypePreparedStatementSetter(args, argTypes), rch);
     }
 
+    @Deprecated
     @Override
-    public void query(String sql, Object[] args, RowCallbackHandler rch) throws DataAccessException {
+    public void query(String sql, @Nullable Object[] args, RowCallbackHandler rch) throws DataAccessException {
         query(sql, newArgPreparedStatementSetter(args), rch);
     }
 
@@ -653,6 +737,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return result(query(sql, args, argTypes, new RowMapperResultSetExtractor<>(rowMapper)));
     }
 
+    @Deprecated
     @Override
     public <T> List<T> query(String sql, @Nullable Object[] args, RowMapper<T> rowMapper) throws DataAccessException {
         return result(query(sql, args, new RowMapperResultSetExtractor<>(rowMapper)));
@@ -663,6 +748,53 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return result(query(sql, args, new RowMapperResultSetExtractor<>(rowMapper)));
     }
 
+    /**
+     * Query using a prepared statement, allowing for a PreparedStatementCreator
+     * and a PreparedStatementSetter. Most other query methods use this method,
+     * but application code will always work with either a creator or a setter.
+     *
+     * @param psc       a callback that creates a PreparedStatement given a Connection
+     * @param pss       a callback that knows how to set values on the prepared statement.
+     *                  If this is {@code null}, the SQL will be assumed to contain no bind parameters.
+     * @param rowMapper a callback that will map one object per row
+     * @return the result Stream, containing mapped objects, needing to be
+     * closed once fully processed (e.g. through a try-with-resources clause)
+     * @throws DataAccessException if the query fails
+     * @since 5.3
+     */
+    public <T> Stream<T> queryForStream(PreparedStatementCreator psc, @Nullable PreparedStatementSetter pss,
+                                        RowMapper<T> rowMapper) throws DataAccessException {
+
+        return result(execute(psc, ps -> {
+            if (pss != null) {
+                pss.setValues(ps);
+            }
+            ResultSet rs = ps.executeQuery();
+            return new DatabaseTemplate.ResultSetSpliterator<>(rs, rowMapper).stream().onClose(() -> {
+                JdbcUtils.closeResultSet(rs);
+                if (pss instanceof ParameterDisposer) {
+                    ((ParameterDisposer) pss).cleanupParameters();
+                }
+                JdbcUtils.closeStatement(ps);
+            });
+        }, false));
+    }
+
+    @Override
+    public <T> Stream<T> queryForStream(PreparedStatementCreator psc, RowMapper<T> rowMapper) throws DataAccessException {
+        return queryForStream(psc, null, rowMapper);
+    }
+
+    @Override
+    public <T> Stream<T> queryForStream(String sql, @Nullable PreparedStatementSetter pss, RowMapper<T> rowMapper) throws DataAccessException {
+        return queryForStream(new DatabaseTemplate.SimplePreparedStatementCreator(sql), pss, rowMapper);
+    }
+
+    @Override
+    public <T> Stream<T> queryForStream(String sql, RowMapper<T> rowMapper, @Nullable Object... args) throws DataAccessException {
+        return queryForStream(new DatabaseTemplate.SimplePreparedStatementCreator(sql), newArgPreparedStatementSetter(args), rowMapper);
+    }
+
     @Override
     @Nullable
     public <T> T queryForObject(String sql, Object[] args, int[] argTypes, RowMapper<T> rowMapper)
@@ -672,6 +804,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return DataAccessUtils.nullableSingleResult(results);
     }
 
+    @Deprecated
     @Override
     @Nullable
     public <T> T queryForObject(String sql, @Nullable Object[] args, RowMapper<T> rowMapper) throws DataAccessException {
@@ -694,8 +827,9 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return queryForObject(sql, args, argTypes, getSingleColumnRowMapper(requiredType));
     }
 
+    @Deprecated
     @Override
-    public <T> T queryForObject(String sql, Object[] args, Class<T> requiredType) throws DataAccessException {
+    public <T> T queryForObject(String sql, @Nullable Object[] args, Class<T> requiredType) throws DataAccessException {
         return queryForObject(sql, args, getSingleColumnRowMapper(requiredType));
     }
 
@@ -719,8 +853,9 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return query(sql, args, argTypes, getSingleColumnRowMapper(elementType));
     }
 
+    @Deprecated
     @Override
-    public <T> List<T> queryForList(String sql, Object[] args, Class<T> elementType) throws DataAccessException {
+    public <T> List<T> queryForList(String sql, @Nullable Object[] args, Class<T> elementType) throws DataAccessException {
         return query(sql, args, getSingleColumnRowMapper(elementType));
     }
 
@@ -769,7 +904,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
                     ((ParameterDisposer) pss).cleanupParameters();
                 }
             }
-        }));
+        }, true));
     }
 
     @Override
@@ -802,7 +937,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
                 logger.trace("SQL update affected " + rows + " rows and returned " + generatedKeys.size() + " keys");
             }
             return rows;
-        }));
+        }, true));
     }
 
     @Override
@@ -972,10 +1107,9 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             logger.debug("Calling stored procedure" + (sql != null ? " [" + sql + "]" : ""));
         }
 
-        Connection con = connection;
         CallableStatement cs = null;
         try {
-            cs = csc.createCallableStatement(con);
+            cs = csc.createCallableStatement(connection);
             applyStatementSettings(cs);
             T result = action.doInCallableStatement(cs);
             handleWarnings(cs);
@@ -990,7 +1124,6 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             csc = null;
             JdbcUtils.closeStatement(cs);
             cs = null;
-            con = null;
             throw translateException("CallableStatementCallback", sql, ex);
         } finally {
             if (csc instanceof ParameterDisposer) {
@@ -1117,7 +1250,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
     protected Map<String, Object> extractOutputParameters(CallableStatement cs, List<SqlParameter> parameters)
             throws SQLException {
 
-        Map<String, Object> results = new LinkedHashMap<>(parameters.size());
+        Map<String, Object> results = CollectionUtils.newLinkedHashMap(parameters.size());
         int sqlColIndex = 1;
         for (SqlParameter param : parameters) {
             if (param instanceof SqlOutParameter) {
@@ -1248,7 +1381,6 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         if (maxRows != -1) {
             stmt.setMaxRows(maxRows);
         }
-        DataSourceUtils.applyTimeout(stmt, getDataSource(), getQueryTimeout());
     }
 
     /**
@@ -1277,7 +1409,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
     }
 
     /**
-     * Throw a SQLWarningException if we're not ignoring warnings,
+     * Throw an SQLWarningException if we're not ignoring warnings,
      * otherwise log the warnings at debug level.
      *
      * @param stmt the current JDBC statement
@@ -1300,7 +1432,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
     }
 
     /**
-     * Throw a SQLWarningException if encountering an actual warning.
+     * Throw an SQLWarningException if encountering an actual warning.
      *
      * @param warning the warnings object from the current statement.
      *                May be {@code null}, in which case this method does nothing.
@@ -1319,19 +1451,17 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
      * @param sql  the SQL query or update that caused the problem (may be {@code null})
      * @param ex   the offending {@code SQLException}
      * @return a DataAccessException wrapping the {@code SQLException} (never {@code null})
-     * @see #getExceptionTranslator()
      * @since 5.0
      */
     protected DataAccessException translateException(String task, @Nullable String sql, SQLException ex) {
-        DataAccessException dae = getExceptionTranslator().translate(task, sql, ex);
+        DataAccessException dae = this.exceptionTranslator.translate(task, sql, ex);
         return (dae != null ? dae : new UncategorizedSQLException(task, sql, ex));
     }
 
-
     /**
      * Determine SQL from potential provider object.
      *
-     * @param sqlProvider object which is potentially a SqlProvider
+     * @param sqlProvider object which is potentially an SqlProvider
      * @return the SQL string, or {@code null} if not known
      * @see SqlProvider
      */
@@ -1374,28 +1504,25 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
             // Invocation on ConnectionProxy interface coming in...
 
-            if (method.getName().equals("equals")) {
-                // Only consider equal when proxies are identical.
-                return (proxy == args[0]);
-            } else if (method.getName().equals("hashCode")) {
-                // Use hashCode of PersistenceManager proxy.
-                return System.identityHashCode(proxy);
-            } else if (method.getName().equals("unwrap")) {
-                if (((Class<?>) args[0]).isInstance(proxy)) {
-                    return proxy;
-                }
-            } else if (method.getName().equals("isWrapperFor")) {
-                if (((Class<?>) args[0]).isInstance(proxy)) {
-                    return true;
-                }
-            } else if (method.getName().equals("close")) {
-                // Handle close method: suppress, not valid.
-                return null;
-            } else if (method.getName().equals("isClosed")) {
-                return false;
-            } else if (method.getName().equals("getTargetConnection")) {
-                // Handle getTargetConnection method: return underlying Connection.
-                return this.target;
+            switch (method.getName()) {
+                case "equals":
+                    // Only consider equal when proxies are identical.
+                    return (proxy == args[0]);
+                case "hashCode":
+                    // Use hashCode of PersistenceManager proxy.
+                    return System.identityHashCode(proxy);
+                case "close":
+                    // Handle close method: suppress, not valid.
+                    return null;
+                case "isClosed":
+                    return false;
+                case "getTargetConnection":
+                    // Handle getTargetConnection method: return underlying Connection.
+                    return this.target;
+                case "unwrap":
+                    return (((Class<?>) args[0]).isInstance(proxy) ? proxy : this.target.unwrap((Class<?>) args[0]));
+                case "isWrapperFor":
+                    return (((Class<?>) args[0]).isInstance(proxy) || this.target.isWrapperFor((Class<?>) args[0]));
             }
 
             // Invoke method on target Connection.
@@ -1487,4 +1614,57 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         }
     }
 
+
+    /**
+     * Spliterator for queryForStream adaptation of a ResultSet to a Stream.
+     *
+     * @since 5.3
+     */
+    private static class ResultSetSpliterator<T> implements Spliterator<T> {
+
+        private final ResultSet rs;
+
+        private final RowMapper<T> rowMapper;
+
+        private int rowNum = 0;
+
+        public ResultSetSpliterator(ResultSet rs, RowMapper<T> rowMapper) {
+            this.rs = rs;
+            this.rowMapper = rowMapper;
+        }
+
+        @Override
+        public boolean tryAdvance(Consumer<? super T> action) {
+            try {
+                if (this.rs.next()) {
+                    action.accept(this.rowMapper.mapRow(this.rs, this.rowNum++));
+                    return true;
+                }
+                return false;
+            } catch (SQLException ex) {
+                throw new InvalidResultSetAccessException(ex);
+            }
+        }
+
+        @Override
+        @Nullable
+        public Spliterator<T> trySplit() {
+            return null;
+        }
+
+        @Override
+        public long estimateSize() {
+            return Long.MAX_VALUE;
+        }
+
+        @Override
+        public int characteristics() {
+            return Spliterator.ORDERED;
+        }
+
+        public Stream<T> stream() {
+            return StreamSupport.stream(this, false);
+        }
+    }
+
 }

+ 10 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -32,7 +32,7 @@ import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.indices.GetIndexRequest;
 import org.elasticsearch.client.indices.GetIndexResponse;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.BoolQueryBuilder;
@@ -45,7 +45,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public final class ESConnector extends AbstractConnector implements Connector<ESConnectorMapper, ESConfig> {
@@ -96,7 +101,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             ESConfig config = connectorMapper.getConfig();
             GetIndexRequest request = new GetIndexRequest(config.getIndex());
             GetIndexResponse indexResponse = connectorMapper.getConnection().indices().get(request, RequestOptions.DEFAULT);
-            MappingMetaData mappingMetaData = indexResponse.getMappings().get(config.getIndex());
+            MappingMetadata mappingMetaData = indexResponse.getMappings().get(config.getIndex());
             List<Table> tables = new ArrayList<>();
             tables.add(new Table(mappingMetaData.type()));
             return tables;
@@ -113,7 +118,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         try {
             GetIndexRequest request = new GetIndexRequest(config.getIndex());
             GetIndexResponse indexResponse = connectorMapper.getConnection().indices().get(request, RequestOptions.DEFAULT);
-            MappingMetaData mappingMetaData = indexResponse.getMappings().get(config.getIndex());
+            MappingMetadata mappingMetaData = indexResponse.getMappings().get(config.getIndex());
             Map<String, Object> propertiesMap = mappingMetaData.getSourceAsMap();
             Map<String, Map> properties = (Map<String, Map>) propertiesMap.get(ESUtil.PROPERTIES);
             if (CollectionUtils.isEmpty(properties)) {
@@ -141,7 +146,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             builder.size(0);
             SearchRequest request = new SearchRequest(new String[] {config.getIndex()}, builder);
             SearchResponse response = connectorMapper.getConnection().search(request, RequestOptions.DEFAULT);
-            return response.getHits().getTotalHits();
+            return response.getHits().getTotalHits().value;
         } catch (IOException e) {
             logger.error(e.getMessage());
             throw new ConnectorException(e);

+ 17 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleOtherValueMapper.java

@@ -7,6 +7,7 @@ import org.dbsyncer.common.spi.ConnectorMapper;
 import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
+import org.dbsyncer.connector.util.DatabaseUtil;
 
 import java.sql.Struct;
 
@@ -32,14 +33,22 @@ public final class OracleOtherValueMapper extends AbstractValueMapper<Struct> {
     protected Struct convert(ConnectorMapper connectorMapper, Object val) throws Exception {
         // SqlServer Geometry
         if (val instanceof byte[]) {
-            SimpleConnection connection = (SimpleConnection) connectorMapper.getConnection();
-            OracleConnection conn = connection.unwrap(OracleConnection.class);
-            // TODO 兼容Oracle STRUCT 字节数组
-            Geometry geometry = Geometry.deserialize((byte[]) val);
-            Double x = geometry.getX();
-            Double y = geometry.getY();
-            JGeometry jGeometry = new JGeometry(x, y, 0);
-            return JGeometry.store(jGeometry, conn);
+            Object conn = connectorMapper.getConnection();
+            if (conn instanceof SimpleConnection) {
+                SimpleConnection connection = null;
+                try {
+                    connection = (SimpleConnection) conn;
+                    OracleConnection oracleConnection = connection.unwrap(OracleConnection.class);
+                    // TODO 兼容Oracle STRUCT 字节数组
+                    Geometry geometry = Geometry.deserialize((byte[]) val);
+                    Double x = geometry.getX();
+                    Double y = geometry.getY();
+                    JGeometry jGeometry = new JGeometry(x, y, 0);
+                    return JGeometry.store(jGeometry, oracleConnection);
+                } finally {
+                    DatabaseUtil.close(connection);
+                }
+            }
         }
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }

+ 4 - 4
dbsyncer-listener/src/main/test/ESClientTest.java

@@ -17,7 +17,7 @@ import org.elasticsearch.client.indices.CreateIndexRequest;
 import org.elasticsearch.client.indices.CreateIndexResponse;
 import org.elasticsearch.client.indices.GetIndexRequest;
 import org.elasticsearch.client.indices.GetIndexResponse;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -147,7 +147,7 @@ public class ESClientTest {
         }
 
         // 字段信息
-        MappingMetaData mappingMetaData = indexResponse.getMappings().get(indexName);
+        MappingMetadata mappingMetaData = indexResponse.getMappings().get(indexName);
         Map<String, Object> propertiesMap = mappingMetaData.getSourceAsMap();
         Map<String, Map> properties = (Map<String, Map>) propertiesMap.get(ESUtil.PROPERTIES);
         logger.info(properties.toString());
@@ -240,7 +240,7 @@ public class ESClientTest {
         SearchRequest rq = new SearchRequest(new String[]{indexName}, sourceBuilder);
         SearchResponse searchResponse = client.search(rq, RequestOptions.DEFAULT);
         SearchHits hits = searchResponse.getHits();
-        long totalHits = hits.getTotalHits();
+        long totalHits = hits.getTotalHits().value;
         logger.info("result:{}", totalHits);
         SearchHit[] searchHits = hits.getHits();
         for (SearchHit hit : searchHits) {
@@ -256,7 +256,7 @@ public class ESClientTest {
         SearchRequest request = new SearchRequest(new String[]{indexName}, sourceBuilder);
         SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);
         SearchHits hits = searchResponse.getHits();
-        long totalHits = hits.getTotalHits();
+        long totalHits = hits.getTotalHits().value;
         logger.info("result:{}", totalHits);
     }
 }

+ 21 - 0
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/proxy/ProxyApplicationContextImpl.java

@@ -9,6 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
 import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationEvent;
 import org.springframework.context.MessageSourceResolvable;
 import org.springframework.context.NoSuchMessageException;
 import org.springframework.core.ResolvableType;
@@ -88,6 +89,16 @@ public class ProxyApplicationContextImpl implements ProxyApplicationContext {
         return applicationContext.getBeanDefinitionNames();
     }
 
+    @Override
+    public <T> ObjectProvider<T> getBeanProvider(Class<T> aClass, boolean b) {
+        return applicationContext.getBeanProvider(aClass, b);
+    }
+
+    @Override
+    public <T> ObjectProvider<T> getBeanProvider(ResolvableType resolvableType, boolean b) {
+        return applicationContext.getBeanProvider(resolvableType, b);
+    }
+
     @Override
     public String[] getBeanNamesForType(ResolvableType type) {
         return applicationContext.getBeanNamesForType(type);
@@ -133,6 +144,11 @@ public class ProxyApplicationContextImpl implements ProxyApplicationContext {
         return applicationContext.findAnnotationOnBean(beanName, annotationType);
     }
 
+    @Override
+    public <A extends Annotation> A findAnnotationOnBean(String s, Class<A> aClass, boolean b) throws NoSuchBeanDefinitionException {
+        return applicationContext.findAnnotationOnBean(s, aClass, b);
+    }
+
     @Override
     public Object getBean(String name) throws BeansException {
         return applicationContext.getBean(name);
@@ -247,4 +263,9 @@ public class ProxyApplicationContextImpl implements ProxyApplicationContext {
     public ClassLoader getClassLoader() {
         return applicationContext.getClassLoader();
     }
+
+    @Override
+    public void publishEvent(ApplicationEvent event) {
+        applicationContext.publishEvent(event);
+    }
 }

+ 1 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -129,7 +129,7 @@ public class Shard {
         final IndexSearcher searcher = getSearcher();
         final TopDocs topDocs = getTopDocs(searcher, option.getQuery(), MAX_SIZE, sort);
         Paging paging = new Paging(pageNum, pageSize);
-        paging.setTotal(topDocs.totalHits);
+        paging.setTotal(topDocs.totalHits.value);
         if (option.isQueryTotal()) {
             return paging;
         }

+ 3 - 2
pom.xml

@@ -37,10 +37,10 @@
         <!-- maven 编译插件版本 -->
         <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
         <java.version>1.8</java.version>
-        <spring-boot.version>2.2.0.RELEASE</spring-boot.version>
+        <spring-boot.version>2.5.14</spring-boot.version>
         <commons-fileupload.version>1.4</commons-fileupload.version>
         <commons-io.version>2.5</commons-io.version>
-        <lucene-analyzers-smartcn.version>7.7.0</lucene-analyzers-smartcn.version>
+        <lucene-analyzers-smartcn.version>8.8.0</lucene-analyzers-smartcn.version>
         <ojdbc6.version>11.2.0.4.0-atlassian-hosted</ojdbc6.version>
         <sdoapi.version>11.2.0</sdoapi.version>
         <!--<mysql.version>5.1.40</mysql.version>-->
@@ -151,6 +151,7 @@
                 <version>${ojdbc6.version}</version>
             </dependency>
 
+            <!-- 如出现无法下载,请切换maven仓库地址:http://www.datanucleus.org/downloads/maven2/ -->
             <dependency>
                 <groupId>oracle</groupId>
                 <artifactId>sdoapi</artifactId>