Kaynağa Gözat

!117 merge
Merge pull request !117 from AE86/V_1.0.0_RC

AE86 2 yıl önce
ebeveyn
işleme
0e4bbc8f59
20 değiştirilmiş dosya ile 1602 ekleme ve 331 silme
  1. 6 20
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/AbstractChecker.java
  2. 16 14
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java
  3. 8 2
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConnectorServiceImpl.java
  4. 2 2
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java
  5. 3 3
      dbsyncer-common/pom.xml
  6. 10 8
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/JsonUtil.java
  7. 12 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  8. 260 80
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseTemplate.java
  9. 10 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java
  10. 17 8
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleOtherValueMapper.java
  11. 4 4
      dbsyncer-listener/src/main/test/ESClientTest.java
  12. 7 8
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/PreloadTemplate.java
  13. 17 18
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  14. 125 12
      dbsyncer-parser/src/main/resources/Connector.json
  15. 346 59
      dbsyncer-parser/src/main/resources/Mapping.json
  16. 720 66
      dbsyncer-parser/src/main/resources/TableGroup.json
  17. 9 12
      dbsyncer-parser/src/main/test/ConnectorParserTest.java
  18. 21 0
      dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/proxy/ProxyApplicationContextImpl.java
  19. 1 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java
  20. 8 7
      pom.xml

+ 6 - 20
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/AbstractChecker.java

@@ -1,7 +1,5 @@
 package org.dbsyncer.biz.checker;
 
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
 import org.dbsyncer.biz.enums.SafeInfoEnum;
 import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.CollectionUtils;
@@ -20,7 +18,10 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.Assert;
 
 import java.time.Instant;
-import java.util.*;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * @author AE86
@@ -75,14 +76,14 @@ public abstract class AbstractChecker implements Checker {
         // 过滤条件
         String filterJson = params.get("filter");
         if (StringUtil.isNotBlank(filterJson)) {
-            List<Filter> list = jsonToList(filterJson, Filter.class);
+            List<Filter> list = JsonUtil.jsonToArray(filterJson, Filter.class);
             model.setFilter(list);
         }
 
         // 转换配置
         String convertJson = params.get("convert");
         if (StringUtil.isNotBlank(convertJson)) {
-            List<Convert> convert = jsonToList(convertJson, Convert.class);
+            List<Convert> convert = JsonUtil.jsonToArray(convertJson, Convert.class);
             model.setConvert(convert);
         }
 
@@ -118,19 +119,4 @@ public abstract class AbstractChecker implements Checker {
         logger.info("params:{}", checkParams);
     }
 
-    private <T> List<T> jsonToList(String json, Class<T> valueType) {
-        JSONArray array = JsonUtil.parseArray(json);
-        if (null != array) {
-            List<T> list = new ArrayList<>();
-            int length = array.size();
-            for (int i = 0; i < length; i++) {
-                JSONObject obj = array.getJSONObject(i);
-                T t = JsonUtil.jsonToObj(obj.toString(), valueType);
-                list.add(t);
-            }
-            return list;
-        }
-        return Collections.EMPTY_LIST;
-    }
-
 }

+ 16 - 14
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -1,7 +1,5 @@
 package org.dbsyncer.biz.checker.impl.tablegroup;
 
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
 import org.dbsyncer.biz.BizException;
 import org.dbsyncer.biz.checker.AbstractChecker;
 import org.dbsyncer.biz.checker.ConnectorConfigChecker;
@@ -25,7 +23,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * @author AE86
@@ -134,9 +136,9 @@ public class TableGroupChecker extends AbstractChecker {
         MetaInfo metaInfo = manager.getMetaInfo(connectorId, tableName);
         Assert.notNull(metaInfo, "无法获取连接器表信息.");
         // 自定义主键
-        if(StringUtil.isNotBlank(primaryKey) && !CollectionUtils.isEmpty(metaInfo.getColumn())){
-            for(Field field : metaInfo.getColumn()){
-                if(StringUtil.equals(field.getName(), primaryKey)){
+        if (StringUtil.isNotBlank(primaryKey) && !CollectionUtils.isEmpty(metaInfo.getColumn())) {
+            for (Field field : metaInfo.getColumn()) {
+                if (StringUtil.equals(field.getName(), primaryKey)) {
                     field.setPk(true);
                     break;
                 }
@@ -199,28 +201,28 @@ public class TableGroupChecker extends AbstractChecker {
      * @return
      */
     private void setFieldMapping(TableGroup tableGroup, String json) {
-        JSONArray mapping = JsonUtil.parseArray(json);
-        if (null == mapping) {
+        List<Map> mappings = JsonUtil.parseList(json);
+        if (null == mappings) {
             throw new BizException("映射关系不能为空");
         }
 
         final Map<String, Field> sMap = PickerUtil.convert2Map(tableGroup.getSourceTable().getColumn());
         final Map<String, Field> tMap = PickerUtil.convert2Map(tableGroup.getTargetTable().getColumn());
-        int length = mapping.size();
+        int length = mappings.size();
         List<FieldMapping> list = new ArrayList<>();
-        JSONObject row = null;
+        Map row = null;
         Field s = null;
         Field t = null;
         for (int i = 0; i < length; i++) {
-            row = mapping.getJSONObject(i);
-            s = sMap.get(row.getString("source"));
-            t = tMap.get(row.getString("target"));
+            row = mappings.get(i);
+            s = sMap.get(row.get("source"));
+            t = tMap.get(row.get("target"));
             if (null == s && null == t) {
                 continue;
             }
 
             if (null != t) {
-                t.setPk(row.getBoolean("pk"));
+                t.setPk((Boolean) row.get("pk"));
             }
             list.add(new FieldMapping(s, t));
         }

+ 8 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConnectorServiceImpl.java

@@ -18,7 +18,13 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -52,7 +58,7 @@ public class ConnectorServiceImpl extends BaseServiceImpl implements ConnectorSe
         Connector connector = getConnector(id);
         Assert.notNull(connector, "The connector id is invalid.");
 
-        Map params = JsonUtil.parseObject(JsonUtil.objToJson(connector.getConfig())).getInnerMap();
+        Map params = JsonUtil.parseMap(connector.getConfig());
         params.put(ConfigConstant.CONFIG_MODEL_NAME, connector.getName() + "(复制)");
         ConfigModel model = connectorChecker.checkAddConfigModel(params);
         log(LogType.ConnectorLog.COPY, model);

+ 2 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -90,7 +90,7 @@ public class DataSyncServiceImpl implements DataSyncService {
         if (null == bytes) {
             if (prettyBytes) {
                 String json = (String) row.get(ConfigConstant.CONFIG_MODEL_JSON);
-                return JsonUtil.parseObject(json).toJavaObject(Map.class);
+                return JsonUtil.parseMap(json);
             }
             return Collections.EMPTY_MAP;
         }
@@ -143,7 +143,7 @@ public class DataSyncServiceImpl implements DataSyncService {
             // 有修改同步值
             String retryDataParams = params.get("retryDataParams");
             if (StringUtil.isNotBlank(retryDataParams)) {
-                JsonUtil.parseObject(retryDataParams).getInnerMap().forEach((k, v) -> binlogData.put(k, convertValue(binlogData.get(k), (String) v)));
+                JsonUtil.parseMap(retryDataParams).forEach((k, v) -> binlogData.put(k, convertValue(binlogData.get(k), (String) v)));
             }
             writerBufferActuator.offer(new WriterRequest(tableGroupId, event, binlogData));
             monitor.removeData(metaId, messageId);

+ 3 - 3
dbsyncer-common/pom.xml

@@ -51,10 +51,10 @@
             <artifactId>commons-io</artifactId>
         </dependency>
 
-        <!-- fastjson -->
+        <!-- fastjson2 -->
         <dependency>
-            <groupId>com.alibaba</groupId>
-            <artifactId>fastjson</artifactId>
+            <groupId>com.alibaba.fastjson2</groupId>
+            <artifactId>fastjson2</artifactId>
         </dependency>
 
     </dependencies>

+ 10 - 8
dbsyncer-common/src/main/java/org/dbsyncer/common/util/JsonUtil.java

@@ -1,16 +1,14 @@
 package org.dbsyncer.common.util;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.fastjson2.JSON;
 
 import java.util.List;
+import java.util.Map;
 
 public abstract class JsonUtil {
 
     public static String objToJson(Object obj) {
-        return JSON.toJSONString(obj, SerializerFeature.DisableCircularReferenceDetect);
+        return JSON.toJSONString(obj);
     }
 
     public static <T> T jsonToObj(String json, Class<T> valueType) {
@@ -21,11 +19,15 @@ public abstract class JsonUtil {
         return JSON.parseArray(json, valueType);
     }
 
-    public static JSONObject parseObject(String json) {
+    public static Map parseMap(Object obj) {
+        return parseMap(objToJson(obj));
+    }
+
+    public static Map parseMap(String json) {
         return JSON.parseObject(json);
     }
 
-    public static JSONArray parseArray(String json) {
-        return JSON.parseArray(json);
+    public static List parseList(String json) {
+        return JSON.parseArray(json).toList(Map.class);
     }
 }

+ 12 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -7,7 +7,11 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
-import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.config.CommandConfig;
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.ReaderConfig;
+import org.dbsyncer.connector.config.SqlBuilderConfig;
+import org.dbsyncer.connector.config.WriterBatchConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
 import org.dbsyncer.connector.enums.OperationEnum;
@@ -30,7 +34,13 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public abstract class AbstractDatabaseConnector extends AbstractConnector implements Connector<DatabaseConnectorMapper, DatabaseConfig>, Database {

+ 260 - 80
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseTemplate.java

@@ -5,17 +5,49 @@ import org.slf4j.LoggerFactory;
 import org.springframework.dao.DataAccessException;
 import org.springframework.dao.InvalidDataAccessApiUsageException;
 import org.springframework.dao.support.DataAccessUtils;
+import org.springframework.jdbc.InvalidResultSetAccessException;
 import org.springframework.jdbc.SQLWarningException;
 import org.springframework.jdbc.UncategorizedSQLException;
-import org.springframework.jdbc.core.*;
+import org.springframework.jdbc.core.ArgumentPreparedStatementSetter;
+import org.springframework.jdbc.core.ArgumentTypePreparedStatementSetter;
+import org.springframework.jdbc.core.BatchPreparedStatementSetter;
+import org.springframework.jdbc.core.CallableStatementCallback;
+import org.springframework.jdbc.core.CallableStatementCreator;
+import org.springframework.jdbc.core.ColumnMapRowMapper;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.InterruptibleBatchPreparedStatementSetter;
+import org.springframework.jdbc.core.JdbcOperations;
+import org.springframework.jdbc.core.ParameterDisposer;
+import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter;
+import org.springframework.jdbc.core.PreparedStatementCallback;
+import org.springframework.jdbc.core.PreparedStatementCreator;
+import org.springframework.jdbc.core.PreparedStatementSetter;
+import org.springframework.jdbc.core.ResultSetExtractor;
+import org.springframework.jdbc.core.ResultSetSupportingSqlParameter;
+import org.springframework.jdbc.core.RowCallbackHandler;
+import org.springframework.jdbc.core.RowMapper;
+import org.springframework.jdbc.core.RowMapperResultSetExtractor;
+import org.springframework.jdbc.core.SingleColumnRowMapper;
+import org.springframework.jdbc.core.SqlOutParameter;
+import org.springframework.jdbc.core.SqlParameter;
+import org.springframework.jdbc.core.SqlParameterValue;
+import org.springframework.jdbc.core.SqlProvider;
+import org.springframework.jdbc.core.SqlReturnResultSet;
+import org.springframework.jdbc.core.SqlReturnType;
+import org.springframework.jdbc.core.SqlReturnUpdateCount;
+import org.springframework.jdbc.core.SqlRowSetResultSetExtractor;
+import org.springframework.jdbc.core.SqlTypeValue;
+import org.springframework.jdbc.core.StatementCallback;
+import org.springframework.jdbc.core.StatementCreatorUtils;
 import org.springframework.jdbc.datasource.ConnectionProxy;
-import org.springframework.jdbc.datasource.DataSourceUtils;
-import org.springframework.jdbc.support.JdbcAccessor;
 import org.springframework.jdbc.support.JdbcUtils;
 import org.springframework.jdbc.support.KeyHolder;
+import org.springframework.jdbc.support.SQLExceptionTranslator;
+import org.springframework.jdbc.support.SQLStateSQLExceptionTranslator;
 import org.springframework.jdbc.support.rowset.SqlRowSet;
 import org.springframework.lang.Nullable;
 import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
 import org.springframework.util.LinkedCaseInsensitiveMap;
 import org.springframework.util.StringUtils;
 
@@ -23,13 +55,41 @@ import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
-import java.sql.*;
-import java.util.*;
-
-public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
+import java.sql.BatchUpdateException;
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public class DatabaseTemplate implements JdbcOperations {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    private Connection connection;
+
+    public DatabaseTemplate(Connection connection) {
+        this.connection = connection;
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    private final SQLExceptionTranslator exceptionTranslator = new SQLStateSQLExceptionTranslator();
+
     private static final String RETURN_RESULT_SET_PREFIX = "#result-set-";
 
     private static final String RETURN_UPDATE_COUNT_PREFIX = "#update-count-";
@@ -78,16 +138,10 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
      */
     private boolean resultsMapCaseInsensitive = false;
 
-    private Connection connection;
-
-    public DatabaseTemplate(Connection connection) {
-        this.connection = connection;
-    }
-
     /**
      * Set whether or not we want to ignore SQLWarnings.
      * <p>Default is "true", swallowing and logging all warnings. Switch this flag
-     * to "false" to make the JdbcTemplate throw a SQLWarningException instead.
+     * to "false" to make the JdbcTemplate throw an SQLWarningException instead.
      *
      * @see java.sql.SQLWarning
      * @see org.springframework.jdbc.SQLWarningException
@@ -219,10 +273,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return this.resultsMapCaseInsensitive;
     }
 
-    public Connection getConnection() {
-        return connection;
-    }
-//-------------------------------------------------------------------------
+    //-------------------------------------------------------------------------
     // Methods dealing with a plain java.sql.Connection
     //-------------------------------------------------------------------------
 
@@ -231,16 +282,12 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
     public <T> T execute(ConnectionCallback<T> action) throws DataAccessException {
         Assert.notNull(action, "Callback object must not be null");
 
-        Connection con = connection;
         try {
-            // Create close-suppressing Connection proxy, also preparing returned Statements.
-            Connection conToUse = createConnectionProxy(con);
-            return action.doInConnection(conToUse);
+            return action.doInConnection(connection);
         } catch (SQLException ex) {
             // Release Connection early, to avoid potential connection pool deadlock
             // in the case when the exception translator hasn't been initialized yet.
             String sql = getSql(action);
-            con = null;
             throw translateException("ConnectionCallback", sql, ex);
         }
     }
@@ -264,19 +311,18 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
                 new DatabaseTemplate.CloseSuppressingInvocationHandler(con));
     }
 
+
     //-------------------------------------------------------------------------
     // Methods dealing with static SQL (java.sql.Statement)
     //-------------------------------------------------------------------------
 
-    @Override
     @Nullable
-    public <T> T execute(StatementCallback<T> action) throws DataAccessException {
+    private <T> T execute(StatementCallback<T> action, boolean closeResources) throws DataAccessException {
         Assert.notNull(action, "Callback object must not be null");
 
-        Connection con = connection;
         Statement stmt = null;
         try {
-            stmt = con.createStatement();
+            stmt = connection.createStatement();
             applyStatementSettings(stmt);
             T result = action.doInStatement(stmt);
             handleWarnings(stmt);
@@ -289,10 +335,18 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             stmt = null;
             throw translateException("StatementCallback", sql, ex);
         } finally {
-            JdbcUtils.closeStatement(stmt);
+            if (closeResources) {
+                JdbcUtils.closeStatement(stmt);
+            }
         }
     }
 
+    @Override
+    @Nullable
+    public <T> T execute(StatementCallback<T> action) throws DataAccessException {
+        return execute(action, true);
+    }
+
     @Override
     public void execute(final String sql) throws DataAccessException {
         if (logger.isDebugEnabled()) {
@@ -316,7 +370,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             }
         }
 
-        execute(new ExecuteStatementCallback());
+        execute(new ExecuteStatementCallback(), true);
     }
 
     @Override
@@ -350,7 +404,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             }
         }
 
-        return execute(new QueryStatementCallback());
+        return execute(new QueryStatementCallback(), true);
     }
 
     @Override
@@ -363,6 +417,27 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return result(query(sql, new RowMapperResultSetExtractor<>(rowMapper)));
     }
 
+    @Override
+    public <T> Stream<T> queryForStream(String sql, RowMapper<T> rowMapper) throws DataAccessException {
+        class StreamStatementCallback implements StatementCallback<Stream<T>>, SqlProvider {
+            @Override
+            public Stream<T> doInStatement(Statement stmt) throws SQLException {
+                ResultSet rs = stmt.executeQuery(sql);
+                return new DatabaseTemplate.ResultSetSpliterator<>(rs, rowMapper).stream().onClose(() -> {
+                    JdbcUtils.closeResultSet(rs);
+                    JdbcUtils.closeStatement(stmt);
+                });
+            }
+
+            @Override
+            public String getSql() {
+                return sql;
+            }
+        }
+
+        return result(execute(new StreamStatementCallback(), false));
+    }
+
     @Override
     public Map<String, Object> queryForMap(String sql) throws DataAccessException {
         return result(queryForObject(sql, getColumnMapRowMapper()));
@@ -422,7 +497,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             }
         }
 
-        return updateCount(execute(new UpdateStatementCallback()));
+        return updateCount(execute(new UpdateStatementCallback(), true));
     }
 
     @Override
@@ -486,7 +561,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             }
         }
 
-        int[] result = execute(new BatchUpdateStatementCallback());
+        int[] result = execute(new BatchUpdateStatementCallback(), true);
         Assert.state(result != null, "No update counts");
         return result;
     }
@@ -496,9 +571,8 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
     // Methods dealing with prepared statements
     //-------------------------------------------------------------------------
 
-    @Override
     @Nullable
-    public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
+    private <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action, boolean closeResources)
             throws DataAccessException {
 
         Assert.notNull(psc, "PreparedStatementCreator must not be null");
@@ -508,10 +582,9 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));
         }
 
-        Connection con = connection;
         PreparedStatement ps = null;
         try {
-            ps = psc.createPreparedStatement(con);
+            ps = psc.createPreparedStatement(connection);
             applyStatementSettings(ps);
             T result = action.doInPreparedStatement(ps);
             handleWarnings(ps);
@@ -526,20 +599,29 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             psc = null;
             JdbcUtils.closeStatement(ps);
             ps = null;
-            con = null;
             throw translateException("PreparedStatementCallback", sql, ex);
         } finally {
-            if (psc instanceof ParameterDisposer) {
-                ((ParameterDisposer) psc).cleanupParameters();
+            if (closeResources) {
+                if (psc instanceof ParameterDisposer) {
+                    ((ParameterDisposer) psc).cleanupParameters();
+                }
+                JdbcUtils.closeStatement(ps);
             }
-            JdbcUtils.closeStatement(ps);
         }
     }
 
+    @Override
+    @Nullable
+    public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
+            throws DataAccessException {
+
+        return execute(psc, action, true);
+    }
+
     @Override
     @Nullable
     public <T> T execute(String sql, PreparedStatementCallback<T> action) throws DataAccessException {
-        return execute(new DatabaseTemplate.SimplePreparedStatementCreator(sql), action);
+        return execute(new DatabaseTemplate.SimplePreparedStatementCreator(sql), action, true);
     }
 
     /**
@@ -580,7 +662,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
                     }
                 }
             }
-        });
+        }, true);
     }
 
     @Override
@@ -601,6 +683,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return query(sql, newArgTypePreparedStatementSetter(args, argTypes), rse);
     }
 
+    @Deprecated
     @Override
     @Nullable
     public <T> T query(String sql, @Nullable Object[] args, ResultSetExtractor<T> rse) throws DataAccessException {
@@ -628,8 +711,9 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         query(sql, newArgTypePreparedStatementSetter(args, argTypes), rch);
     }
 
+    @Deprecated
     @Override
-    public void query(String sql, Object[] args, RowCallbackHandler rch) throws DataAccessException {
+    public void query(String sql, @Nullable Object[] args, RowCallbackHandler rch) throws DataAccessException {
         query(sql, newArgPreparedStatementSetter(args), rch);
     }
 
@@ -653,6 +737,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return result(query(sql, args, argTypes, new RowMapperResultSetExtractor<>(rowMapper)));
     }
 
+    @Deprecated
     @Override
     public <T> List<T> query(String sql, @Nullable Object[] args, RowMapper<T> rowMapper) throws DataAccessException {
         return result(query(sql, args, new RowMapperResultSetExtractor<>(rowMapper)));
@@ -663,6 +748,53 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return result(query(sql, args, new RowMapperResultSetExtractor<>(rowMapper)));
     }
 
+    /**
+     * Query using a prepared statement, allowing for a PreparedStatementCreator
+     * and a PreparedStatementSetter. Most other query methods use this method,
+     * but application code will always work with either a creator or a setter.
+     *
+     * @param psc       a callback that creates a PreparedStatement given a Connection
+     * @param pss       a callback that knows how to set values on the prepared statement.
+     *                  If this is {@code null}, the SQL will be assumed to contain no bind parameters.
+     * @param rowMapper a callback that will map one object per row
+     * @return the result Stream, containing mapped objects, needing to be
+     * closed once fully processed (e.g. through a try-with-resources clause)
+     * @throws DataAccessException if the query fails
+     * @since 5.3
+     */
+    public <T> Stream<T> queryForStream(PreparedStatementCreator psc, @Nullable PreparedStatementSetter pss,
+                                        RowMapper<T> rowMapper) throws DataAccessException {
+
+        return result(execute(psc, ps -> {
+            if (pss != null) {
+                pss.setValues(ps);
+            }
+            ResultSet rs = ps.executeQuery();
+            return new DatabaseTemplate.ResultSetSpliterator<>(rs, rowMapper).stream().onClose(() -> {
+                JdbcUtils.closeResultSet(rs);
+                if (pss instanceof ParameterDisposer) {
+                    ((ParameterDisposer) pss).cleanupParameters();
+                }
+                JdbcUtils.closeStatement(ps);
+            });
+        }, false));
+    }
+
+    @Override
+    public <T> Stream<T> queryForStream(PreparedStatementCreator psc, RowMapper<T> rowMapper) throws DataAccessException {
+        return queryForStream(psc, null, rowMapper);
+    }
+
+    @Override
+    public <T> Stream<T> queryForStream(String sql, @Nullable PreparedStatementSetter pss, RowMapper<T> rowMapper) throws DataAccessException {
+        return queryForStream(new DatabaseTemplate.SimplePreparedStatementCreator(sql), pss, rowMapper);
+    }
+
+    @Override
+    public <T> Stream<T> queryForStream(String sql, RowMapper<T> rowMapper, @Nullable Object... args) throws DataAccessException {
+        return queryForStream(new DatabaseTemplate.SimplePreparedStatementCreator(sql), newArgPreparedStatementSetter(args), rowMapper);
+    }
+
     @Override
     @Nullable
     public <T> T queryForObject(String sql, Object[] args, int[] argTypes, RowMapper<T> rowMapper)
@@ -672,6 +804,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return DataAccessUtils.nullableSingleResult(results);
     }
 
+    @Deprecated
     @Override
     @Nullable
     public <T> T queryForObject(String sql, @Nullable Object[] args, RowMapper<T> rowMapper) throws DataAccessException {
@@ -694,8 +827,9 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return queryForObject(sql, args, argTypes, getSingleColumnRowMapper(requiredType));
     }
 
+    @Deprecated
     @Override
-    public <T> T queryForObject(String sql, Object[] args, Class<T> requiredType) throws DataAccessException {
+    public <T> T queryForObject(String sql, @Nullable Object[] args, Class<T> requiredType) throws DataAccessException {
         return queryForObject(sql, args, getSingleColumnRowMapper(requiredType));
     }
 
@@ -719,8 +853,9 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         return query(sql, args, argTypes, getSingleColumnRowMapper(elementType));
     }
 
+    @Deprecated
     @Override
-    public <T> List<T> queryForList(String sql, Object[] args, Class<T> elementType) throws DataAccessException {
+    public <T> List<T> queryForList(String sql, @Nullable Object[] args, Class<T> elementType) throws DataAccessException {
         return query(sql, args, getSingleColumnRowMapper(elementType));
     }
 
@@ -769,7 +904,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
                     ((ParameterDisposer) pss).cleanupParameters();
                 }
             }
-        }));
+        }, true));
     }
 
     @Override
@@ -802,7 +937,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
                 logger.trace("SQL update affected " + rows + " rows and returned " + generatedKeys.size() + " keys");
             }
             return rows;
-        }));
+        }, true));
     }
 
     @Override
@@ -972,10 +1107,9 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             logger.debug("Calling stored procedure" + (sql != null ? " [" + sql + "]" : ""));
         }
 
-        Connection con = connection;
         CallableStatement cs = null;
         try {
-            cs = csc.createCallableStatement(con);
+            cs = csc.createCallableStatement(connection);
             applyStatementSettings(cs);
             T result = action.doInCallableStatement(cs);
             handleWarnings(cs);
@@ -990,7 +1124,6 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
             csc = null;
             JdbcUtils.closeStatement(cs);
             cs = null;
-            con = null;
             throw translateException("CallableStatementCallback", sql, ex);
         } finally {
             if (csc instanceof ParameterDisposer) {
@@ -1117,7 +1250,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
     protected Map<String, Object> extractOutputParameters(CallableStatement cs, List<SqlParameter> parameters)
             throws SQLException {
 
-        Map<String, Object> results = new LinkedHashMap<>(parameters.size());
+        Map<String, Object> results = CollectionUtils.newLinkedHashMap(parameters.size());
         int sqlColIndex = 1;
         for (SqlParameter param : parameters) {
             if (param instanceof SqlOutParameter) {
@@ -1248,7 +1381,6 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         if (maxRows != -1) {
             stmt.setMaxRows(maxRows);
         }
-        DataSourceUtils.applyTimeout(stmt, getDataSource(), getQueryTimeout());
     }
 
     /**
@@ -1277,7 +1409,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
     }
 
     /**
-     * Throw a SQLWarningException if we're not ignoring warnings,
+     * Throw an SQLWarningException if we're not ignoring warnings,
      * otherwise log the warnings at debug level.
      *
      * @param stmt the current JDBC statement
@@ -1300,7 +1432,7 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
     }
 
     /**
-     * Throw a SQLWarningException if encountering an actual warning.
+     * Throw an SQLWarningException if encountering an actual warning.
      *
      * @param warning the warnings object from the current statement.
      *                May be {@code null}, in which case this method does nothing.
@@ -1319,19 +1451,17 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
      * @param sql  the SQL query or update that caused the problem (may be {@code null})
      * @param ex   the offending {@code SQLException}
      * @return a DataAccessException wrapping the {@code SQLException} (never {@code null})
-     * @see #getExceptionTranslator()
      * @since 5.0
      */
     protected DataAccessException translateException(String task, @Nullable String sql, SQLException ex) {
-        DataAccessException dae = getExceptionTranslator().translate(task, sql, ex);
+        DataAccessException dae = this.exceptionTranslator.translate(task, sql, ex);
         return (dae != null ? dae : new UncategorizedSQLException(task, sql, ex));
     }
 
-
     /**
      * Determine SQL from potential provider object.
      *
-     * @param sqlProvider object which is potentially a SqlProvider
+     * @param sqlProvider object which is potentially an SqlProvider
      * @return the SQL string, or {@code null} if not known
      * @see SqlProvider
      */
@@ -1374,28 +1504,25 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
             // Invocation on ConnectionProxy interface coming in...
 
-            if (method.getName().equals("equals")) {
-                // Only consider equal when proxies are identical.
-                return (proxy == args[0]);
-            } else if (method.getName().equals("hashCode")) {
-                // Use hashCode of PersistenceManager proxy.
-                return System.identityHashCode(proxy);
-            } else if (method.getName().equals("unwrap")) {
-                if (((Class<?>) args[0]).isInstance(proxy)) {
-                    return proxy;
-                }
-            } else if (method.getName().equals("isWrapperFor")) {
-                if (((Class<?>) args[0]).isInstance(proxy)) {
-                    return true;
-                }
-            } else if (method.getName().equals("close")) {
-                // Handle close method: suppress, not valid.
-                return null;
-            } else if (method.getName().equals("isClosed")) {
-                return false;
-            } else if (method.getName().equals("getTargetConnection")) {
-                // Handle getTargetConnection method: return underlying Connection.
-                return this.target;
+            switch (method.getName()) {
+                case "equals":
+                    // Only consider equal when proxies are identical.
+                    return (proxy == args[0]);
+                case "hashCode":
+                    // Use hashCode of PersistenceManager proxy.
+                    return System.identityHashCode(proxy);
+                case "close":
+                    // Handle close method: suppress, not valid.
+                    return null;
+                case "isClosed":
+                    return false;
+                case "getTargetConnection":
+                    // Handle getTargetConnection method: return underlying Connection.
+                    return this.target;
+                case "unwrap":
+                    return (((Class<?>) args[0]).isInstance(proxy) ? proxy : this.target.unwrap((Class<?>) args[0]));
+                case "isWrapperFor":
+                    return (((Class<?>) args[0]).isInstance(proxy) || this.target.isWrapperFor((Class<?>) args[0]));
             }
 
             // Invoke method on target Connection.
@@ -1487,4 +1614,57 @@ public class DatabaseTemplate extends JdbcAccessor implements JdbcOperations {
         }
     }
 
+
+    /**
+     * Spliterator for queryForStream adaptation of a ResultSet to a Stream.
+     *
+     * @since 5.3
+     */
+    private static class ResultSetSpliterator<T> implements Spliterator<T> {
+
+        private final ResultSet rs;
+
+        private final RowMapper<T> rowMapper;
+
+        private int rowNum = 0;
+
+        public ResultSetSpliterator(ResultSet rs, RowMapper<T> rowMapper) {
+            this.rs = rs;
+            this.rowMapper = rowMapper;
+        }
+
+        @Override
+        public boolean tryAdvance(Consumer<? super T> action) {
+            try {
+                if (this.rs.next()) {
+                    action.accept(this.rowMapper.mapRow(this.rs, this.rowNum++));
+                    return true;
+                }
+                return false;
+            } catch (SQLException ex) {
+                throw new InvalidResultSetAccessException(ex);
+            }
+        }
+
+        @Override
+        @Nullable
+        public Spliterator<T> trySplit() {
+            return null;
+        }
+
+        @Override
+        public long estimateSize() {
+            return Long.MAX_VALUE;
+        }
+
+        @Override
+        public int characteristics() {
+            return Spliterator.ORDERED;
+        }
+
+        public Stream<T> stream() {
+            return StreamSupport.stream(this, false);
+        }
+    }
+
 }

+ 10 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -32,7 +32,7 @@ import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.indices.GetIndexRequest;
 import org.elasticsearch.client.indices.GetIndexResponse;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.BoolQueryBuilder;
@@ -45,7 +45,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public final class ESConnector extends AbstractConnector implements Connector<ESConnectorMapper, ESConfig> {
@@ -96,7 +101,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             ESConfig config = connectorMapper.getConfig();
             GetIndexRequest request = new GetIndexRequest(config.getIndex());
             GetIndexResponse indexResponse = connectorMapper.getConnection().indices().get(request, RequestOptions.DEFAULT);
-            MappingMetaData mappingMetaData = indexResponse.getMappings().get(config.getIndex());
+            MappingMetadata mappingMetaData = indexResponse.getMappings().get(config.getIndex());
             List<Table> tables = new ArrayList<>();
             tables.add(new Table(mappingMetaData.type()));
             return tables;
@@ -113,7 +118,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         try {
             GetIndexRequest request = new GetIndexRequest(config.getIndex());
             GetIndexResponse indexResponse = connectorMapper.getConnection().indices().get(request, RequestOptions.DEFAULT);
-            MappingMetaData mappingMetaData = indexResponse.getMappings().get(config.getIndex());
+            MappingMetadata mappingMetaData = indexResponse.getMappings().get(config.getIndex());
             Map<String, Object> propertiesMap = mappingMetaData.getSourceAsMap();
             Map<String, Map> properties = (Map<String, Map>) propertiesMap.get(ESUtil.PROPERTIES);
             if (CollectionUtils.isEmpty(properties)) {
@@ -141,7 +146,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             builder.size(0);
             SearchRequest request = new SearchRequest(new String[] {config.getIndex()}, builder);
             SearchResponse response = connectorMapper.getConnection().search(request, RequestOptions.DEFAULT);
-            return response.getHits().getTotalHits();
+            return response.getHits().getTotalHits().value;
         } catch (IOException e) {
             logger.error(e.getMessage());
             throw new ConnectorException(e);

+ 17 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleOtherValueMapper.java

@@ -7,6 +7,7 @@ import org.dbsyncer.common.spi.ConnectorMapper;
 import org.dbsyncer.connector.AbstractValueMapper;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
+import org.dbsyncer.connector.util.DatabaseUtil;
 
 import java.sql.Struct;
 
@@ -32,14 +33,22 @@ public final class OracleOtherValueMapper extends AbstractValueMapper<Struct> {
     protected Struct convert(ConnectorMapper connectorMapper, Object val) throws Exception {
         // SqlServer Geometry
         if (val instanceof byte[]) {
-            SimpleConnection connection = (SimpleConnection) connectorMapper.getConnection();
-            OracleConnection conn = connection.unwrap(OracleConnection.class);
-            // TODO 兼容Oracle STRUCT 字节数组
-            Geometry geometry = Geometry.deserialize((byte[]) val);
-            Double x = geometry.getX();
-            Double y = geometry.getY();
-            JGeometry jGeometry = new JGeometry(x, y, 0);
-            return JGeometry.store(jGeometry, conn);
+            Object conn = connectorMapper.getConnection();
+            if (conn instanceof SimpleConnection) {
+                SimpleConnection connection = null;
+                try {
+                    connection = (SimpleConnection) conn;
+                    OracleConnection oracleConnection = connection.unwrap(OracleConnection.class);
+                    // TODO 兼容Oracle STRUCT 字节数组
+                    Geometry geometry = Geometry.deserialize((byte[]) val);
+                    Double x = geometry.getX();
+                    Double y = geometry.getY();
+                    JGeometry jGeometry = new JGeometry(x, y, 0);
+                    return JGeometry.store(jGeometry, oracleConnection);
+                } finally {
+                    DatabaseUtil.close(connection);
+                }
+            }
         }
         throw new ConnectorException(String.format("%s can not find type [%s], val [%s]", getClass().getSimpleName(), val.getClass(), val));
     }

+ 4 - 4
dbsyncer-listener/src/main/test/ESClientTest.java

@@ -17,7 +17,7 @@ import org.elasticsearch.client.indices.CreateIndexRequest;
 import org.elasticsearch.client.indices.CreateIndexResponse;
 import org.elasticsearch.client.indices.GetIndexRequest;
 import org.elasticsearch.client.indices.GetIndexResponse;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -147,7 +147,7 @@ public class ESClientTest {
         }
 
         // 字段信息
-        MappingMetaData mappingMetaData = indexResponse.getMappings().get(indexName);
+        MappingMetadata mappingMetaData = indexResponse.getMappings().get(indexName);
         Map<String, Object> propertiesMap = mappingMetaData.getSourceAsMap();
         Map<String, Map> properties = (Map<String, Map>) propertiesMap.get(ESUtil.PROPERTIES);
         logger.info(properties.toString());
@@ -240,7 +240,7 @@ public class ESClientTest {
         SearchRequest rq = new SearchRequest(new String[]{indexName}, sourceBuilder);
         SearchResponse searchResponse = client.search(rq, RequestOptions.DEFAULT);
         SearchHits hits = searchResponse.getHits();
-        long totalHits = hits.getTotalHits();
+        long totalHits = hits.getTotalHits().value;
         logger.info("result:{}", totalHits);
         SearchHit[] searchHits = hits.getHits();
         for (SearchHit hit : searchHits) {
@@ -256,7 +256,7 @@ public class ESClientTest {
         SearchRequest request = new SearchRequest(new String[]{indexName}, sourceBuilder);
         SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);
         SearchHits hits = searchResponse.getHits();
-        long totalHits = hits.getTotalHits();
+        long totalHits = hits.getTotalHits().value;
         logger.info("result:{}", totalHits);
     }
 }

+ 7 - 8
dbsyncer-manager/src/main/java/org/dbsyncer/manager/template/PreloadTemplate.java

@@ -1,6 +1,5 @@
 package org.dbsyncer.manager.template;
 
-import com.alibaba.fastjson.JSONObject;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
@@ -85,7 +84,7 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
     }
 
     public void reload(String json) {
-        Map<String, JSONObject> map = JsonUtil.jsonToObj(json, Map.class);
+        Map<String, Map> map = JsonUtil.jsonToObj(json, Map.class);
         if (CollectionUtils.isEmpty(map)) {
             return;
         }
@@ -105,16 +104,16 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
         launch();
     }
 
-    private void reload(Map<String, JSONObject> map, CommandEnum commandEnum) {
+    private void reload(Map<String, Map> map, CommandEnum commandEnum) {
         reload(map, commandEnum, commandEnum.getModelType());
     }
 
-    private void reload(Map<String, JSONObject> map, CommandEnum commandEnum, String groupId) {
-        JSONObject config = map.get(groupId);
+    private void reload(Map<String, Map> map, CommandEnum commandEnum, String groupId) {
+        Map config = map.get(groupId);
         if (null == config) {
             return;
         }
-        Group group = JsonUtil.jsonToObj(config.toJSONString(), Group.class);
+        Group group = JsonUtil.jsonToObj(config.toString(), Group.class);
         if (null == group) {
             return;
         }
@@ -125,8 +124,8 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
         }
 
         for (String e : index) {
-            JSONObject m = map.get(e);
-            ConfigModel model = (ConfigModel) commandEnum.getCommandExecutor().execute(new PreloadCommand(parser, m.toJSONString()));
+            Map m = map.get(e);
+            ConfigModel model = (ConfigModel) commandEnum.getCommandExecutor().execute(new PreloadCommand(parser, m.toString()));
             operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_ADD, commandEnum.getGroupStrategyEnum()));
             // Load tableGroups
             if (CommandEnum.PRELOAD_MAPPING == commandEnum) {

+ 17 - 18
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -1,7 +1,5 @@
 package org.dbsyncer.parser;
 
-import com.alibaba.fastjson.JSONException;
-import com.alibaba.fastjson.JSONObject;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
@@ -28,7 +26,13 @@ import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.event.FullRefreshEvent;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
-import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.model.BatchWriter;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Picker;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.Task;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.strategy.ParserStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
@@ -176,21 +180,16 @@ public class ParserFactory implements Parser {
 
     @Override
     public Connector parseConnector(String json) {
-        try {
-            JSONObject conn = JsonUtil.parseObject(json);
-            JSONObject config = (JSONObject) conn.remove("config");
-            Connector connector = JsonUtil.jsonToObj(conn.toString(), Connector.class);
-            Assert.notNull(connector, "Connector can not be null.");
-            String connectorType = config.getString("connectorType");
-            Class<?> configClass = ConnectorEnum.getConfigClass(connectorType);
-            AbstractConnectorConfig obj = (AbstractConnectorConfig) JsonUtil.jsonToObj(config.toString(), configClass);
-            connector.setConfig(obj);
-
-            return connector;
-        } catch (JSONException e) {
-            logger.error(e.getMessage());
-            throw new ParserException(e.getMessage());
-        }
+        Map conn = JsonUtil.parseMap(json);
+        Map config = (Map) conn.remove("config");
+        Connector connector = JsonUtil.jsonToObj(conn.toString(), Connector.class);
+        Assert.notNull(connector, "Connector can not be null.");
+        String connectorType = (String) config.get("connectorType");
+        Class<?> configClass = ConnectorEnum.getConfigClass(connectorType);
+        AbstractConnectorConfig obj = (AbstractConnectorConfig) JsonUtil.jsonToObj(config.toString(), configClass);
+        connector.setConfig(obj);
+
+        return connector;
     }
 
     @Override

+ 125 - 12
dbsyncer-parser/src/main/resources/Connector.json

@@ -1,15 +1,128 @@
 {
-  "id": "100",
+  "config":{
+    "connectorType":"Mysql",
+    "password":"123",
+    "properties":{
+
+    },
+    "url":"jdbc:mysql://127.0.0.1:3305/test?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false&autoReconnect=true&failOverReadOnly=false",
+    "username":"root"
+  },
+  "createTime":1670169249093,
+  "id":"1049111399101501440",
+  "name":"Mysql",
+  "table":[
+    {
+      "count":0,
+      "name":"aqi_stations",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"dbs_test03",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"ds1",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"ds1_copy1",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"mdt_partnerrole",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"my_big_table",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"my_big_table_copy1",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"my_file",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"my_mark",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"my_org",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"my_user",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"my_visit",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"nc_customer",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"product",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"product_copy",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"shop_goods",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"shop_goods_class",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"shop_goods_spec",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"tb_jy_visit",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"tb_jy_visit_copy1",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"vote_records",
+      "type":"TABLE"
+    },
+    {
+      "count":0,
+      "name":"my_view",
+      "type":"VIEW"
+    }
+  ],
   "type":"connector",
-  "name": "人力资源系统",
-  "createTime": "2019-10-08 21:32:00",
-  "updateTime": "2019-10-08 21:35:00",
-  "table":["MY_USER", "T_MY_USER", "table_999"],
-  "config": {
-    "connectorType": "Mysql",
-    "driverClassName": "com.mysql.jdbc.Driver",
-    "url": "jdbc:mysql://127.0.0.1:3306/test?rewriteBatchedStatements=true&seUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false",
-    "username": "root",
-    "password": "123"
-  }
+  "updateTime":1672237644641
 }

+ 346 - 59
dbsyncer-parser/src/main/resources/Mapping.json

@@ -1,73 +1,360 @@
 {
-  "id": "11111",
-  "type":"mapping",
-  "name": "人力资源系统_审计系统",
-  "createTime": "2019-10-08 21:32:00",
-  "updateTime": "2019-10-08 21:35:00",
-  "sourceConnectorId": "100",
-  "targetConnectorId": "666",
-  "sourceColumn":[
-    {
-      "name": "ID",
-      "typeName": "VARCHAR",
-      "type": 12,
-      "pk":true
-    }
+  "batchNum":1000,
+  "convert":[
+
   ],
-  "targetColumn":[
-    {
-      "name": "ID",
-      "typeName": "VARCHAR",
-      "type": 12,
-      "pk":true
-    }
+  "createTime":1671809807457,
+  "filter":[
+
   ],
+  "id":"1055992399609860096",
+  "listener":{
+    "banDelete":false,
+    "banInsert":false,
+    "banUpdate":false,
+    "cron":"*/30 * * * * ?",
+    "delete":"D",
+    "eventFieldName":"",
+    "insert":"I",
+    "listenerType":"log",
+    "update":"U"
+  },
+  "metaId":"1055992399609860097",
+  "model":"increment",
+  "name":"同步测试",
   "params":{
-    "ORACLE_ROW_ID":"RID"
+
   },
-  "filter": [
+  "readNum":10000,
+  "sourceColumn":[
+    {
+      "name":"id",
+      "pk":true,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"visit_cust_id",
+      "pk":false,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"visit_type",
+      "pk":false,
+      "type":1,
+      "typeName":"CHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"visit_user_id",
+      "pk":false,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"visit_day",
+      "pk":false,
+      "type":1,
+      "typeName":"CHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"start_time",
+      "pk":false,
+      "type":93,
+      "typeName":"DATETIME",
+      "unmodifiabled":false
+    },
+    {
+      "name":"start_lat",
+      "pk":false,
+      "type":8,
+      "typeName":"DOUBLE",
+      "unmodifiabled":false
+    },
+    {
+      "name":"start_lng",
+      "pk":false,
+      "type":8,
+      "typeName":"DOUBLE",
+      "unmodifiabled":false
+    },
     {
-      "name": "AGE",
-      "operation": "and",
-      "filter": ">",
-      "value": "0"
+      "name":"start_distance",
+      "pk":false,
+      "type":8,
+      "typeName":"DOUBLE",
+      "unmodifiabled":false
     },
     {
-      "name": "NAME",
-      "operation": "or",
-      "filter": "=",
-      "value": "hello"
+      "name":"end_time",
+      "pk":false,
+      "type":93,
+      "typeName":"DATETIME",
+      "unmodifiabled":false
+    },
+    {
+      "name":"end_lat",
+      "pk":false,
+      "type":8,
+      "typeName":"DOUBLE",
+      "unmodifiabled":false
+    },
+    {
+      "name":"end_lng",
+      "pk":false,
+      "type":8,
+      "typeName":"DOUBLE",
+      "unmodifiabled":false
+    },
+    {
+      "name":"end_distance",
+      "pk":false,
+      "type":8,
+      "typeName":"DOUBLE",
+      "unmodifiabled":false
+    },
+    {
+      "name":"visit_status",
+      "pk":false,
+      "type":1,
+      "typeName":"CHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"pre_end_time",
+      "pk":false,
+      "type":93,
+      "typeName":"DATETIME",
+      "unmodifiabled":false
+    },
+    {
+      "name":"create_by",
+      "pk":false,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"create_by_name",
+      "pk":false,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"create_time",
+      "pk":false,
+      "type":93,
+      "typeName":"DATETIME",
+      "unmodifiabled":false
+    },
+    {
+      "name":"update_by",
+      "pk":false,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"update_by_name",
+      "pk":false,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"update_time",
+      "pk":false,
+      "type":93,
+      "typeName":"DATETIME",
+      "unmodifiabled":false
+    },
+    {
+      "name":"version",
+      "pk":false,
+      "type":4,
+      "typeName":"INT",
+      "unmodifiabled":false
+    },
+    {
+      "name":"is_del",
+      "pk":false,
+      "type":4,
+      "typeName":"INT",
+      "unmodifiabled":false
     }
   ],
-  "convert": [
+  "sourceConnectorId":"1049111399101501440",
+  "targetColumn":[
+    {
+      "name":"id",
+      "pk":true,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
     {
-      "name":"NAME",
-      "convertName":"替换",
-      "convertCode":"REPLACE",
-      "args":"A,B"
+      "name":"visit_cust_id",
+      "pk":false,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"visit_type",
+      "pk":false,
+      "type":1,
+      "typeName":"CHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"visit_user_id",
+      "pk":false,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"visit_day",
+      "pk":false,
+      "type":1,
+      "typeName":"CHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"start_time",
+      "pk":false,
+      "type":93,
+      "typeName":"DATETIME",
+      "unmodifiabled":false
+    },
+    {
+      "name":"start_lat",
+      "pk":false,
+      "type":8,
+      "typeName":"DOUBLE",
+      "unmodifiabled":false
+    },
+    {
+      "name":"start_lng",
+      "pk":false,
+      "type":8,
+      "typeName":"DOUBLE",
+      "unmodifiabled":false
+    },
+    {
+      "name":"start_distance",
+      "pk":false,
+      "type":8,
+      "typeName":"DOUBLE",
+      "unmodifiabled":false
+    },
+    {
+      "name":"end_time",
+      "pk":false,
+      "type":93,
+      "typeName":"DATETIME",
+      "unmodifiabled":false
+    },
+    {
+      "name":"end_lat",
+      "pk":false,
+      "type":8,
+      "typeName":"DOUBLE",
+      "unmodifiabled":false
+    },
+    {
+      "name":"end_lng",
+      "pk":false,
+      "type":8,
+      "typeName":"DOUBLE",
+      "unmodifiabled":false
+    },
+    {
+      "name":"end_distance",
+      "pk":false,
+      "type":8,
+      "typeName":"DOUBLE",
+      "unmodifiabled":false
+    },
+    {
+      "name":"visit_status",
+      "pk":false,
+      "type":1,
+      "typeName":"CHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"pre_end_time",
+      "pk":false,
+      "type":93,
+      "typeName":"DATETIME",
+      "unmodifiabled":false
+    },
+    {
+      "name":"create_by",
+      "pk":false,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"create_by_name",
+      "pk":false,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"create_time",
+      "pk":false,
+      "type":93,
+      "typeName":"DATETIME",
+      "unmodifiabled":false
+    },
+    {
+      "name":"update_by",
+      "pk":false,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"update_by_name",
+      "pk":false,
+      "type":12,
+      "typeName":"VARCHAR",
+      "unmodifiabled":false
+    },
+    {
+      "name":"update_time",
+      "pk":false,
+      "type":93,
+      "typeName":"DATETIME",
+      "unmodifiabled":false
+    },
+    {
+      "name":"version",
+      "pk":false,
+      "type":4,
+      "typeName":"INT",
+      "unmodifiabled":false
+    },
+    {
+      "name":"is_del",
+      "pk":false,
+      "type":4,
+      "typeName":"INT",
+      "unmodifiabled":false
     }
   ],
-  "plugin": {
-    "name":"用户信息转换插件",
-    "className": "com.xx.hr.convert.MyImpl"
-  },
-  "model":"full",
-  "listener": {
-    "listenerType": "Polling",
-    "period": 30,
-    "eventFieldName": "MyEvent",
-    "update": [
-      "U",
-      "M"
-    ],
-    "insert": [
-      "I"
-    ],
-    "delete": [
-      "D"
-    ]
-  },
-  "metaId":"1",
-  "readNum": 10000,
-  "batchNum": 200
+  "targetConnectorId":"1049111399101501440",
+  "threadNum":32,
+  "type":"mapping",
+  "updateTime":1673452598774
 }

+ 720 - 66
dbsyncer-parser/src/main/resources/TableGroup.json

@@ -1,81 +1,735 @@
 {
-  "id":"tableGroupId_0",
-  "type":"tableGroup",
-  "name": "tableGroup",
-  "createTime": "2019-10-08 21:32:00",
-  "updateTime": "2019-10-08 21:35:00",
-  "mappingId": "11111",
-  "sourceTable":{
-    "name":"MY_USER",
-    "column":[
-      {
-        "name": "ID",
-        "typeName": "VARCHAR",
-        "type": 12,
-        "pk":true
-      }
-    ]
-  },
-  "targetTable":{
-    "name":"T_MY_USER",
-    "column":[
-      {
-        "name": "ID",
-        "typeName": "VARCHAR",
-        "type": 12,
-        "pk":true
-      }
-    ]
+  "command":{
+    "QUERY_CURSOR":"SELECT `id`, `visit_cust_id`, `visit_type`, `visit_user_id`, `visit_day`, `start_time`, `start_lat`, `start_lng`, `start_distance`, `end_time`, `end_lat`, `end_lng`, `end_distance`, `visit_status`, `pre_end_time`, `create_by`, `create_by_name`, `create_time`, `update_by`, `update_by_name`, `update_time`, `version`, `is_del` FROM `tb_jy_visit` ORDER BY `id` LIMIT ?",
+    "DELETE":"DELETE FROM `tb_jy_visit_copy1` WHERE `id`=?",
+    "QUERY":"SELECT `id`, `visit_cust_id`, `visit_type`, `visit_user_id`, `visit_day`, `start_time`, `start_lat`, `start_lng`, `start_distance`, `end_time`, `end_lat`, `end_lng`, `end_distance`, `visit_status`, `pre_end_time`, `create_by`, `create_by_name`, `create_time`, `update_by`, `update_by_name`, `update_time`, `version`, `is_del` FROM `tb_jy_visit` WHERE `id` > ? ORDER BY `id` LIMIT ?",
+    "QUERY_COUNT":"SELECT COUNT(1) FROM (SELECT 1 FROM `tb_jy_visit` GROUP BY `id`) DBSYNCER_T",
+    "INSERT":"INSERT INTO `tb_jy_visit_copy1`(`id`, `visit_cust_id`, `visit_type`, `visit_user_id`, `visit_day`, `start_time`, `start_lat`, `start_lng`, `start_distance`, `end_time`, `end_lat`, `end_lng`, `end_distance`, `visit_status`, `pre_end_time`, `create_by`, `create_by_name`, `create_time`, `update_by`, `update_by_name`, `update_time`, `version`, `is_del`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+    "UPDATE":"UPDATE `tb_jy_visit_copy1` SET `visit_cust_id`=?,`visit_type`=?,`visit_user_id`=?,`visit_day`=?,`start_time`=?,`start_lat`=?,`start_lng`=?,`start_distance`=?,`end_time`=?,`end_lat`=?,`end_lng`=?,`end_distance`=?,`visit_status`=?,`pre_end_time`=?,`create_by`=?,`create_by_name`=?,`create_time`=?,`update_by`=?,`update_by_name`=?,`update_time`=?,`version`=?,`is_del`=? WHERE `id`=?",
+    "QUERY_COUNT_EXIST":"SELECT COUNT(1) FROM `tb_jy_visit_copy1` WHERE `id` = ?"
   },
+  "convert":[
+
+  ],
+  "createTime":1672238741208,
   "fieldMapping":[
     {
       "source":{
-        "name": "ID",
-        "typeName": "VARCHAR",
-        "type": 12,
-        "pk":true
+        "name":"id",
+        "pk":true,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
       },
       "target":{
-        "name": "ID",
-        "typeName": "VARCHAR",
-        "type": 12,
-        "pk":true
+        "name":"id",
+        "pk":true,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
       }
-    }
-  ],
-  "command":{
-    "QUERY":"SELECT * FROM MY_USER where name = ?",
-    "INSERT":"INSERT INTO MY_USER(id,name)values(?,?)",
-    "UPDATE":"UPDATE MY_USER set name=? where MY_USER.id=?",
-    "DELETE":"DELETE MY_USER WHERE MY_USER.id=?"
-  },
-  "params":{
-    "ORACLE_ROW_ID":"RID"
-  },
-  "filter": [
+    },
     {
-      "name": "AGE",
-      "operation": "and",
-      "filter": ">",
-      "value": "0"
+      "source":{
+        "name":"visit_cust_id",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"visit_cust_id",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      }
     },
     {
-      "name": "NAME",
-      "operation": "or",
-      "filter": "=",
-      "value": "hello"
-    }
-  ],
-  "convert": [
+      "source":{
+        "name":"visit_type",
+        "pk":false,
+        "type":1,
+        "typeName":"CHAR",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"visit_type",
+        "pk":false,
+        "type":1,
+        "typeName":"CHAR",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"visit_user_id",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"visit_user_id",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"visit_day",
+        "pk":false,
+        "type":1,
+        "typeName":"CHAR",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"visit_day",
+        "pk":false,
+        "type":1,
+        "typeName":"CHAR",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"start_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"start_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"start_lat",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"start_lat",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"start_lng",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"start_lng",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"start_distance",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"start_distance",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"end_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"end_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"end_lat",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"end_lat",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"end_lng",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"end_lng",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"end_distance",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"end_distance",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"visit_status",
+        "pk":false,
+        "type":1,
+        "typeName":"CHAR",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"visit_status",
+        "pk":false,
+        "type":1,
+        "typeName":"CHAR",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"pre_end_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"pre_end_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"create_by",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"create_by",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"create_by_name",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"create_by_name",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"create_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"create_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"update_by",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"update_by",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"update_by_name",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"update_by_name",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"update_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"update_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      }
+    },
+    {
+      "source":{
+        "name":"version",
+        "pk":false,
+        "type":4,
+        "typeName":"INT",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"version",
+        "pk":false,
+        "type":4,
+        "typeName":"INT",
+        "unmodifiabled":false
+      }
+    },
     {
-      "name":"NAME",
-      "convertName":"替换",
-      "convertCode":"REPLACE",
-      "args":"A,B"
+      "source":{
+        "name":"is_del",
+        "pk":false,
+        "type":4,
+        "typeName":"INT",
+        "unmodifiabled":false
+      },
+      "target":{
+        "name":"is_del",
+        "pk":false,
+        "type":4,
+        "typeName":"INT",
+        "unmodifiabled":false
+      }
     }
   ],
-  "plugin": {
-    "name":"用户信息转换插件",
-    "className": "com.xx.hr.convert.MyImpl"
-  }
+  "filter":[
+
+  ],
+  "id":"1057791478157414400",
+  "index":1,
+  "mappingId":"1055992399609860096",
+  "name":"tableGroup",
+  "params":{
+
+  },
+  "sourceTable":{
+    "column":[
+      {
+        "name":"id",
+        "pk":true,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"visit_cust_id",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"visit_type",
+        "pk":false,
+        "type":1,
+        "typeName":"CHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"visit_user_id",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"visit_day",
+        "pk":false,
+        "type":1,
+        "typeName":"CHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"start_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      {
+        "name":"start_lat",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      {
+        "name":"start_lng",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      {
+        "name":"start_distance",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      {
+        "name":"end_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      {
+        "name":"end_lat",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      {
+        "name":"end_lng",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      {
+        "name":"end_distance",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      {
+        "name":"visit_status",
+        "pk":false,
+        "type":1,
+        "typeName":"CHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"pre_end_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      {
+        "name":"create_by",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"create_by_name",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"create_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      {
+        "name":"update_by",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"update_by_name",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"update_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      {
+        "name":"version",
+        "pk":false,
+        "type":4,
+        "typeName":"INT",
+        "unmodifiabled":false
+      },
+      {
+        "name":"is_del",
+        "pk":false,
+        "type":4,
+        "typeName":"INT",
+        "unmodifiabled":false
+      }
+    ],
+    "count":0,
+    "name":"tb_jy_visit",
+    "primaryKey":"",
+    "type":"TABLE"
+  },
+  "targetTable":{
+    "column":[
+      {
+        "name":"id",
+        "pk":true,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"visit_cust_id",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"visit_type",
+        "pk":false,
+        "type":1,
+        "typeName":"CHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"visit_user_id",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"visit_day",
+        "pk":false,
+        "type":1,
+        "typeName":"CHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"start_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      {
+        "name":"start_lat",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      {
+        "name":"start_lng",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      {
+        "name":"start_distance",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      {
+        "name":"end_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      {
+        "name":"end_lat",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      {
+        "name":"end_lng",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      {
+        "name":"end_distance",
+        "pk":false,
+        "type":8,
+        "typeName":"DOUBLE",
+        "unmodifiabled":false
+      },
+      {
+        "name":"visit_status",
+        "pk":false,
+        "type":1,
+        "typeName":"CHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"pre_end_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      {
+        "name":"create_by",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"create_by_name",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"create_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      {
+        "name":"update_by",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"update_by_name",
+        "pk":false,
+        "type":12,
+        "typeName":"VARCHAR",
+        "unmodifiabled":false
+      },
+      {
+        "name":"update_time",
+        "pk":false,
+        "type":93,
+        "typeName":"DATETIME",
+        "unmodifiabled":false
+      },
+      {
+        "name":"version",
+        "pk":false,
+        "type":4,
+        "typeName":"INT",
+        "unmodifiabled":false
+      },
+      {
+        "name":"is_del",
+        "pk":false,
+        "type":4,
+        "typeName":"INT",
+        "unmodifiabled":false
+      }
+    ],
+    "count":0,
+    "name":"tb_jy_visit_copy1",
+    "primaryKey":"",
+    "type":"TABLE"
+  },
+  "type":"tableGroup",
+  "updateTime":1673452597645
 }

+ 9 - 12
dbsyncer-parser/src/main/test/ConnectorParserTest.java

@@ -1,5 +1,3 @@
-import com.alibaba.fastjson.JSONException;
-import com.alibaba.fastjson.JSONObject;
 import org.apache.commons.io.FileUtils;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.util.JsonUtil;
@@ -12,6 +10,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.util.Map;
 
 /**
  * @author AE86
@@ -21,17 +20,17 @@ import java.net.URL;
 public class ConnectorParserTest {
 
     @Test
-    public void testConnector() throws IOException, JSONException {
+    public void testConnector() throws IOException {
         String json = readJson("Connector.json");
         System.out.println(json);
 
         // 解析基本信息
-        JSONObject conn = JsonUtil.parseObject(json);
-        JSONObject config = (JSONObject) conn.remove("config");
+        Map conn = JsonUtil.parseMap(json);
+        Map config = (Map) conn.remove("config");
         Connector connector = JsonUtil.jsonToObj(conn.toString(), Connector.class);
 
         // 解析配置
-        String connectorType = config.getString("connectorType");
+        String connectorType = (String) config.get("connectorType");
         Class<?> configClass = ConnectorEnum.getConfigClass(connectorType);
         Object obj = JsonUtil.jsonToObj(config.toString(), configClass);
         connector.setConfig((AbstractConnectorConfig) obj);
@@ -39,23 +38,21 @@ public class ConnectorParserTest {
     }
 
     @Test
-    public void testMapping() throws IOException, JSONException {
+    public void testMapping() throws IOException {
         String json = readJson("Mapping.json");
         System.out.println(json);
 
         // 解析基本信息
-        JSONObject map = JsonUtil.parseObject(json);
-        Mapping mapping = JsonUtil.jsonToObj(map.toString(), Mapping.class);
+        Mapping mapping = JsonUtil.jsonToObj(json, Mapping.class);
         System.out.println(mapping);
     }
 
     @Test
-    public void testTableGroup() throws IOException, JSONException {
+    public void testTableGroup() throws IOException {
         String json = readJson("TableGroup.json");
         System.out.println(json);
         // 解析基本信息
-        JSONObject group = JsonUtil.parseObject(json);
-        TableGroup tableGroup = JsonUtil.jsonToObj(group.toString(), TableGroup.class);
+        TableGroup tableGroup = JsonUtil.jsonToObj(json, TableGroup.class);
         System.out.println(tableGroup);
     }
 

+ 21 - 0
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/proxy/ProxyApplicationContextImpl.java

@@ -9,6 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
 import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationEvent;
 import org.springframework.context.MessageSourceResolvable;
 import org.springframework.context.NoSuchMessageException;
 import org.springframework.core.ResolvableType;
@@ -88,6 +89,16 @@ public class ProxyApplicationContextImpl implements ProxyApplicationContext {
         return applicationContext.getBeanDefinitionNames();
     }
 
+    @Override
+    public <T> ObjectProvider<T> getBeanProvider(Class<T> aClass, boolean b) {
+        return applicationContext.getBeanProvider(aClass, b);
+    }
+
+    @Override
+    public <T> ObjectProvider<T> getBeanProvider(ResolvableType resolvableType, boolean b) {
+        return applicationContext.getBeanProvider(resolvableType, b);
+    }
+
     @Override
     public String[] getBeanNamesForType(ResolvableType type) {
         return applicationContext.getBeanNamesForType(type);
@@ -133,6 +144,11 @@ public class ProxyApplicationContextImpl implements ProxyApplicationContext {
         return applicationContext.findAnnotationOnBean(beanName, annotationType);
     }
 
+    @Override
+    public <A extends Annotation> A findAnnotationOnBean(String s, Class<A> aClass, boolean b) throws NoSuchBeanDefinitionException {
+        return applicationContext.findAnnotationOnBean(s, aClass, b);
+    }
+
     @Override
     public Object getBean(String name) throws BeansException {
         return applicationContext.getBean(name);
@@ -247,4 +263,9 @@ public class ProxyApplicationContextImpl implements ProxyApplicationContext {
     public ClassLoader getClassLoader() {
         return applicationContext.getClassLoader();
     }
+
+    @Override
+    public void publishEvent(ApplicationEvent event) {
+        applicationContext.publishEvent(event);
+    }
 }

+ 1 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -129,7 +129,7 @@ public class Shard {
         final IndexSearcher searcher = getSearcher();
         final TopDocs topDocs = getTopDocs(searcher, option.getQuery(), MAX_SIZE, sort);
         Paging paging = new Paging(pageNum, pageSize);
-        paging.setTotal(topDocs.totalHits);
+        paging.setTotal(topDocs.totalHits.value);
         if (option.isQueryTotal()) {
             return paging;
         }

+ 8 - 7
pom.xml

@@ -37,10 +37,10 @@
         <!-- maven 编译插件版本 -->
         <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
         <java.version>1.8</java.version>
-        <spring-boot.version>2.2.0.RELEASE</spring-boot.version>
+        <spring-boot.version>2.5.14</spring-boot.version>
         <commons-fileupload.version>1.4</commons-fileupload.version>
         <commons-io.version>2.5</commons-io.version>
-        <lucene-analyzers-smartcn.version>7.7.0</lucene-analyzers-smartcn.version>
+        <lucene-analyzers-smartcn.version>8.8.0</lucene-analyzers-smartcn.version>
         <ojdbc6.version>11.2.0.4.0-atlassian-hosted</ojdbc6.version>
         <sdoapi.version>11.2.0</sdoapi.version>
         <!--<mysql.version>5.1.40</mysql.version>-->
@@ -51,7 +51,7 @@
         <postgresql.version>42.3.3</postgresql.version>
         <postgis-jdbc.version>2.5.1</postgis-jdbc.version>
         <kafka.version>0.9.0.0</kafka.version>
-        <fastjson.version>1.2.75</fastjson.version>
+        <fastjson2.version>2.0.22</fastjson2.version>
         <protobuf.version>3.21.1</protobuf.version>
         <log4j2.version>2.17.1</log4j2.version>
         <junit.version>4.12</junit.version>
@@ -116,11 +116,11 @@
                 <version>${commons-io.version}</version>
             </dependency>
 
-            <!-- fastjson -->
+            <!-- fastjson2 -->
             <dependency>
-                <groupId>com.alibaba</groupId>
-                <artifactId>fastjson</artifactId>
-                <version>${fastjson.version}</version>
+                <groupId>com.alibaba.fastjson2</groupId>
+                <artifactId>fastjson2</artifactId>
+                <version>${fastjson2.version}</version>
             </dependency>
 
             <!-- protobuf -->
@@ -151,6 +151,7 @@
                 <version>${ojdbc6.version}</version>
             </dependency>
 
+            <!-- 如出现无法下载,请切换maven仓库地址:http://www.datanucleus.org/downloads/maven2/ -->
             <dependency>
                 <groupId>oracle</groupId>
                 <artifactId>sdoapi</artifactId>