AE86 hace 1 año
padre
commit
7fd023d952

+ 22 - 22
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -111,14 +111,14 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
     }
     }
 
 
     @Override
     @Override
-    public void disconnect(ESConnectorInstance connectorMapper) {
-        connectorMapper.close();
+    public void disconnect(ESConnectorInstance connectorInstance) {
+        connectorInstance.close();
     }
     }
 
 
     @Override
     @Override
-    public boolean isAlive(ESConnectorInstance connectorMapper) {
+    public boolean isAlive(ESConnectorInstance connectorInstance) {
         try {
         try {
-            RestHighLevelClient client = connectorMapper.getConnection();
+            RestHighLevelClient client = connectorInstance.getConnection();
             return client.ping(RequestOptions.DEFAULT);
             return client.ping(RequestOptions.DEFAULT);
         } catch (IOException e) {
         } catch (IOException e) {
             logger.error(e.getMessage());
             logger.error(e.getMessage());
@@ -132,15 +132,15 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
     }
     }
 
 
     @Override
     @Override
-    public List<Table> getTable(ESConnectorInstance connectorMapper) {
+    public List<Table> getTable(ESConnectorInstance connectorInstance) {
         try {
         try {
-            ESConfig config = connectorMapper.getConfig();
+            ESConfig config = connectorInstance.getConfig();
             GetIndexRequest request = new GetIndexRequest(config.getIndex());
             GetIndexRequest request = new GetIndexRequest(config.getIndex());
-            GetIndexResponse indexResponse = connectorMapper.getConnection().indices().get(request, RequestOptions.DEFAULT);
+            GetIndexResponse indexResponse = connectorInstance.getConnection().indices().get(request, RequestOptions.DEFAULT);
             MappingMetadata mappingMetaData = indexResponse.getMappings().get(config.getIndex());
             MappingMetadata mappingMetaData = indexResponse.getMappings().get(config.getIndex());
             List<Table> tables = new ArrayList<>();
             List<Table> tables = new ArrayList<>();
             // 6.x 版本
             // 6.x 版本
-            if (Version.V_7_0_0.after(connectorMapper.getVersion())) {
+            if (Version.V_7_0_0.after(connectorInstance.getVersion())) {
                 Map<String, Object> sourceMap = XContentHelper.convertToMap(mappingMetaData.source().compressedReference(), true, null).v2();
                 Map<String, Object> sourceMap = XContentHelper.convertToMap(mappingMetaData.source().compressedReference(), true, null).v2();
                 sourceMap.keySet().forEach(tableName -> tables.add(new Table(tableName)));
                 sourceMap.keySet().forEach(tableName -> tables.add(new Table(tableName)));
                 return tables;
                 return tables;
@@ -156,15 +156,15 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
     }
     }
 
 
     @Override
     @Override
-    public MetaInfo getMetaInfo(ESConnectorInstance connectorMapper, String tableName) {
+    public MetaInfo getMetaInfo(ESConnectorInstance connectorInstance, String tableName) {
         List<Field> fields = new ArrayList<>();
         List<Field> fields = new ArrayList<>();
         try {
         try {
-            ESConfig config = connectorMapper.getConfig();
+            ESConfig config = connectorInstance.getConfig();
             GetIndexRequest request = new GetIndexRequest(config.getIndex());
             GetIndexRequest request = new GetIndexRequest(config.getIndex());
-            GetIndexResponse indexResponse = connectorMapper.getConnection().indices().get(request, RequestOptions.DEFAULT);
+            GetIndexResponse indexResponse = connectorInstance.getConnection().indices().get(request, RequestOptions.DEFAULT);
             MappingMetadata mappingMetaData = indexResponse.getMappings().get(config.getIndex());
             MappingMetadata mappingMetaData = indexResponse.getMappings().get(config.getIndex());
             // 6.x 版本
             // 6.x 版本
-            if (Version.V_7_0_0.after(connectorMapper.getVersion())) {
+            if (Version.V_7_0_0.after(connectorInstance.getVersion())) {
                 Map<String, Object> sourceMap = XContentHelper.convertToMap(mappingMetaData.source().compressedReference(), true, null).v2();
                 Map<String, Object> sourceMap = XContentHelper.convertToMap(mappingMetaData.source().compressedReference(), true, null).v2();
                 parseProperties(fields, (Map) sourceMap.get(tableName));
                 parseProperties(fields, (Map) sourceMap.get(tableName));
                 return new MetaInfo().setColumn(fields);
                 return new MetaInfo().setColumn(fields);
@@ -180,19 +180,19 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
     }
     }
 
 
     @Override
     @Override
-    public long getCount(ESConnectorInstance connectorMapper, Map<String, String> command) {
+    public long getCount(ESConnectorInstance connectorInstance, Map<String, String> command) {
         try {
         try {
-            ESConfig config = connectorMapper.getConfig();
+            ESConfig config = connectorInstance.getConfig();
             SearchSourceBuilder builder = new SearchSourceBuilder();
             SearchSourceBuilder builder = new SearchSourceBuilder();
             genSearchSourceBuilder(builder, command);
             genSearchSourceBuilder(builder, command);
             // 7.x 版本以上
             // 7.x 版本以上
-            if (Version.V_7_0_0.onOrBefore(connectorMapper.getVersion())) {
+            if (Version.V_7_0_0.onOrBefore(connectorInstance.getVersion())) {
                 builder.trackTotalHits(true);
                 builder.trackTotalHits(true);
             }
             }
             builder.from(0);
             builder.from(0);
             builder.size(0);
             builder.size(0);
             SearchRequest request = new SearchRequest(new String[]{config.getIndex()}, builder);
             SearchRequest request = new SearchRequest(new String[]{config.getIndex()}, builder);
-            SearchResponse response = connectorMapper.getConnection().searchWithVersion(request, RequestOptions.DEFAULT);
+            SearchResponse response = connectorInstance.getConnection().searchWithVersion(request, RequestOptions.DEFAULT);
             return response.getHits().getTotalHits().value;
             return response.getHits().getTotalHits().value;
         } catch (IOException e) {
         } catch (IOException e) {
             logger.error(e.getMessage());
             logger.error(e.getMessage());
@@ -201,8 +201,8 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
     }
     }
 
 
     @Override
     @Override
-    public Result reader(ESConnectorInstance connectorMapper, ReaderConfig config) {
-        ESConfig cfg = connectorMapper.getConfig();
+    public Result reader(ESConnectorInstance connectorInstance, ReaderConfig config) {
+        ESConfig cfg = connectorInstance.getConfig();
         SearchSourceBuilder builder = new SearchSourceBuilder();
         SearchSourceBuilder builder = new SearchSourceBuilder();
         genSearchSourceBuilder(builder, config.getCommand());
         genSearchSourceBuilder(builder, config.getCommand());
         builder.from((config.getPageIndex() - 1) * config.getPageSize());
         builder.from((config.getPageIndex() - 1) * config.getPageSize());
@@ -220,7 +220,7 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
 
 
         try {
         try {
             SearchRequest rq = new SearchRequest(new String[]{cfg.getIndex()}, builder);
             SearchRequest rq = new SearchRequest(new String[]{cfg.getIndex()}, builder);
-            SearchResponse searchResponse = connectorMapper.getConnection().searchWithVersion(rq, RequestOptions.DEFAULT);
+            SearchResponse searchResponse = connectorInstance.getConnection().searchWithVersion(rq, RequestOptions.DEFAULT);
             SearchHits hits = searchResponse.getHits();
             SearchHits hits = searchResponse.getHits();
             SearchHit[] searchHits = hits.getHits();
             SearchHit[] searchHits = hits.getHits();
             List<Map<String, Object>> list = new ArrayList<>();
             List<Map<String, Object>> list = new ArrayList<>();
@@ -235,7 +235,7 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
     }
     }
 
 
     @Override
     @Override
-    public Result writer(ESConnectorInstance connectorMapper, WriterBatchConfig config) {
+    public Result writer(ESConnectorInstance connectorInstance, WriterBatchConfig config) {
         List<Map> data = config.getData();
         List<Map> data = config.getData();
         if (CollectionUtils.isEmpty(data) || CollectionUtils.isEmpty(config.getFields())) {
         if (CollectionUtils.isEmpty(data) || CollectionUtils.isEmpty(config.getFields())) {
             logger.error("writer data can not be empty.");
             logger.error("writer data can not be empty.");
@@ -243,7 +243,7 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
         }
         }
 
 
         final Result result = new Result();
         final Result result = new Result();
-        final ESConfig cfg = connectorMapper.getConfig();
+        final ESConfig cfg = connectorInstance.getConfig();
         final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
         final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
         try {
         try {
             BulkRequest request = new BulkRequest();
             BulkRequest request = new BulkRequest();
@@ -251,7 +251,7 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
             final String pk = pkFields.get(0).getName();
             final String pk = pkFields.get(0).getName();
             data.forEach(row -> addRequest(request, cfg.getIndex(), config.getTableName(), config.getEvent(), String.valueOf(row.get(pk)), row));
             data.forEach(row -> addRequest(request, cfg.getIndex(), config.getTableName(), config.getEvent(), String.valueOf(row.get(pk)), row));
 
 
-            BulkResponse response = connectorMapper.getConnection().bulkWithVersion(request, RequestOptions.DEFAULT);
+            BulkResponse response = connectorInstance.getConnection().bulkWithVersion(request, RequestOptions.DEFAULT);
             RestStatus restStatus = response.status();
             RestStatus restStatus = response.status();
             if (restStatus.getStatus() != RestStatus.OK.getStatus()) {
             if (restStatus.getStatus() != RestStatus.OK.getStatus()) {
                 throw new ConnectorException(String.format("error code:%s", restStatus.getStatus()));
                 throw new ConnectorException(String.format("error code:%s", restStatus.getStatus()));

+ 16 - 16
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnector.java

@@ -78,20 +78,20 @@ public final class FileConnector extends AbstractConnector implements ConnectorS
     }
     }
 
 
     @Override
     @Override
-    public void disconnect(FileConnectorInstance connectorMapper) {
+    public void disconnect(FileConnectorInstance connectorInstance) {
 
 
     }
     }
 
 
     @Override
     @Override
-    public boolean isAlive(FileConnectorInstance connectorMapper) {
-        String fileDir = connectorMapper.getConnection();
+    public boolean isAlive(FileConnectorInstance connectorInstance) {
+        String fileDir = connectorInstance.getConnection();
         boolean alive = new File(fileDir).exists();
         boolean alive = new File(fileDir).exists();
         if (!alive) {
         if (!alive) {
             logger.warn("can not find fileDir:{}", fileDir);
             logger.warn("can not find fileDir:{}", fileDir);
             return false;
             return false;
         }
         }
-        for (FileSchema fileSchema : connectorMapper.getFileSchemaList()) {
-            String filePath = connectorMapper.getFilePath(fileSchema.getFileName());
+        for (FileSchema fileSchema : connectorInstance.getFileSchemaList()) {
+            String filePath = connectorInstance.getFilePath(fileSchema.getFileName());
             if (!new File(filePath).exists()) {
             if (!new File(filePath).exists()) {
                 logger.warn("can not find file:{}", filePath);
                 logger.warn("can not find file:{}", filePath);
                 alive = false;
                 alive = false;
@@ -113,18 +113,18 @@ public final class FileConnector extends AbstractConnector implements ConnectorS
     }
     }
 
 
     @Override
     @Override
-    public List<Table> getTable(FileConnectorInstance connectorMapper) {
-        return connectorMapper.getFileSchemaList().stream().map(fileSchema -> new Table(fileSchema.getFileName())).collect(Collectors.toList());
+    public List<Table> getTable(FileConnectorInstance connectorInstance) {
+        return connectorInstance.getFileSchemaList().stream().map(fileSchema -> new Table(fileSchema.getFileName())).collect(Collectors.toList());
     }
     }
 
 
     @Override
     @Override
-    public MetaInfo getMetaInfo(FileConnectorInstance connectorMapper, String tableName) {
-        FileSchema fileSchema = connectorMapper.getFileSchema(tableName);
+    public MetaInfo getMetaInfo(FileConnectorInstance connectorInstance, String tableName) {
+        FileSchema fileSchema = connectorInstance.getFileSchema(tableName);
         return new MetaInfo().setColumn(fileSchema.getFields());
         return new MetaInfo().setColumn(fileSchema.getFields());
     }
     }
 
 
     @Override
     @Override
-    public long getCount(FileConnectorInstance connectorMapper, Map<String, String> command) {
+    public long getCount(FileConnectorInstance connectorInstance, Map<String, String> command) {
         AtomicLong count = new AtomicLong();
         AtomicLong count = new AtomicLong();
         FileReader reader = null;
         FileReader reader = null;
         try {
         try {
@@ -143,12 +143,12 @@ public final class FileConnector extends AbstractConnector implements ConnectorS
     }
     }
 
 
     @Override
     @Override
-    public Result reader(FileConnectorInstance connectorMapper, ReaderConfig config) {
+    public Result reader(FileConnectorInstance connectorInstance, ReaderConfig config) {
         List<Map<String, Object>> list = new ArrayList<>();
         List<Map<String, Object>> list = new ArrayList<>();
         FileReader reader = null;
         FileReader reader = null;
         try {
         try {
-            FileConfig fileConfig = connectorMapper.getConfig();
-            FileSchema fileSchema = connectorMapper.getFileSchema(config.getCommand().get(FILE_NAME));
+            FileConfig fileConfig = connectorInstance.getConfig();
+            FileSchema fileSchema = connectorInstance.getFileSchema(config.getCommand().get(FILE_NAME));
             final List<Field> fields = fileSchema.getFields();
             final List<Field> fields = fileSchema.getFields();
             Assert.notEmpty(fields, "The fields of file schema is empty.");
             Assert.notEmpty(fields, "The fields of file schema is empty.");
             final char separator = fileConfig.getSeparator();
             final char separator = fileConfig.getSeparator();
@@ -178,7 +178,7 @@ public final class FileConnector extends AbstractConnector implements ConnectorS
     }
     }
 
 
     @Override
     @Override
-    public Result writer(FileConnectorInstance connectorMapper, WriterBatchConfig config) {
+    public Result writer(FileConnectorInstance connectorInstance, WriterBatchConfig config) {
         List<Map> data = config.getData();
         List<Map> data = config.getData();
         if (CollectionUtils.isEmpty(data)) {
         if (CollectionUtils.isEmpty(data)) {
             logger.error("writer data can not be empty.");
             logger.error("writer data can not be empty.");
@@ -186,12 +186,12 @@ public final class FileConnector extends AbstractConnector implements ConnectorS
         }
         }
 
 
         final List<Field> fields = config.getFields();
         final List<Field> fields = config.getFields();
-        final String separator = new String(new char[]{connectorMapper.getConfig().getSeparator()});
+        final String separator = new String(new char[]{connectorInstance.getConfig().getSeparator()});
 
 
         Result result = new Result();
         Result result = new Result();
         OutputStream output = null;
         OutputStream output = null;
         try {
         try {
-            final String filePath = connectorMapper.getFilePath(config.getCommand().get(FILE_NAME));
+            final String filePath = connectorInstance.getFilePath(config.getCommand().get(FILE_NAME));
             output = new FileOutputStream(filePath, true);
             output = new FileOutputStream(filePath, true);
             List<String> lines = data.stream().map(row -> {
             List<String> lines = data.stream().map(row -> {
                 List<String> array = new ArrayList<>();
                 List<String> array = new ArrayList<>();

+ 13 - 13
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -58,13 +58,13 @@ public class KafkaConnector extends AbstractConnector implements ConnectorServic
     }
     }
 
 
     @Override
     @Override
-    public void disconnect(KafkaConnectorInstance connectorMapper) {
-        connectorMapper.close();
+    public void disconnect(KafkaConnectorInstance connectorInstance) {
+        connectorInstance.close();
     }
     }
 
 
     @Override
     @Override
-    public boolean isAlive(KafkaConnectorInstance connectorMapper) {
-        return connectorMapper.getConnection().ping();
+    public boolean isAlive(KafkaConnectorInstance connectorInstance) {
+        return connectorInstance.getConnection().ping();
     }
     }
 
 
     @Override
     @Override
@@ -73,31 +73,31 @@ public class KafkaConnector extends AbstractConnector implements ConnectorServic
     }
     }
 
 
     @Override
     @Override
-    public List<Table> getTable(KafkaConnectorInstance connectorMapper) {
+    public List<Table> getTable(KafkaConnectorInstance connectorInstance) {
         List<Table> topics = new ArrayList<>();
         List<Table> topics = new ArrayList<>();
-        topics.add(new Table(connectorMapper.getConfig().getTopic()));
+        topics.add(new Table(connectorInstance.getConfig().getTopic()));
         return topics;
         return topics;
     }
     }
 
 
     @Override
     @Override
-    public MetaInfo getMetaInfo(KafkaConnectorInstance connectorMapper, String tableName) {
-        KafkaConfig config = connectorMapper.getConfig();
+    public MetaInfo getMetaInfo(KafkaConnectorInstance connectorInstance, String tableName) {
+        KafkaConfig config = connectorInstance.getConfig();
         List<Field> fields = JsonUtil.jsonToArray(config.getFields(), Field.class);
         List<Field> fields = JsonUtil.jsonToArray(config.getFields(), Field.class);
         return new MetaInfo().setColumn(fields);
         return new MetaInfo().setColumn(fields);
     }
     }
 
 
     @Override
     @Override
-    public long getCount(KafkaConnectorInstance connectorMapper, Map<String, String> command) {
+    public long getCount(KafkaConnectorInstance connectorInstance, Map<String, String> command) {
         return 0;
         return 0;
     }
     }
 
 
     @Override
     @Override
-    public Result reader(KafkaConnectorInstance connectorMapper, ReaderConfig config) {
+    public Result reader(KafkaConnectorInstance connectorInstance, ReaderConfig config) {
         throw new ConnectorException("Full synchronization is not supported");
         throw new ConnectorException("Full synchronization is not supported");
     }
     }
 
 
     @Override
     @Override
-    public Result writer(KafkaConnectorInstance connectorMapper, WriterBatchConfig config) {
+    public Result writer(KafkaConnectorInstance connectorInstance, WriterBatchConfig config) {
         List<Map> data = config.getData();
         List<Map> data = config.getData();
         if (CollectionUtils.isEmpty(data) || CollectionUtils.isEmpty(config.getFields())) {
         if (CollectionUtils.isEmpty(data) || CollectionUtils.isEmpty(config.getFields())) {
             logger.error("writer data can not be empty.");
             logger.error("writer data can not be empty.");
@@ -105,13 +105,13 @@ public class KafkaConnector extends AbstractConnector implements ConnectorServic
         }
         }
 
 
         Result result = new Result();
         Result result = new Result();
-        final KafkaConfig cfg = connectorMapper.getConfig();
+        final KafkaConfig cfg = connectorInstance.getConfig();
         final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
         final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
         try {
         try {
             String topic = cfg.getTopic();
             String topic = cfg.getTopic();
             // 默认取第一个主键
             // 默认取第一个主键
             final String pk = pkFields.get(0).getName();
             final String pk = pkFields.get(0).getName();
-            data.forEach(row -> connectorMapper.getConnection().send(topic, String.valueOf(row.get(pk)), row));
+            data.forEach(row -> connectorInstance.getConnection().send(topic, String.valueOf(row.get(pk)), row));
             result.addSuccessData(data);
             result.addSuccessData(data);
         } catch (Exception e) {
         } catch (Exception e) {
             // 记录错误数据
             // 记录错误数据

+ 6 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -36,10 +36,10 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
     }
     }
 
 
     @Override
     @Override
-    public List<Table> getTable(DatabaseConnectorInstance connectorMapper) {
-        DatabaseConfig config = connectorMapper.getConfig();
-        List<Table> tables = getTables(connectorMapper, String.format(QUERY_TABLE, config.getSchema()), TableTypeEnum.TABLE);
-        tables.addAll(getTables(connectorMapper, QUERY_VIEW, TableTypeEnum.VIEW));
+    public List<Table> getTable(DatabaseConnectorInstance connectorInstance) {
+        DatabaseConfig config = connectorInstance.getConfig();
+        List<Table> tables = getTables(connectorInstance, String.format(QUERY_TABLE, config.getSchema()), TableTypeEnum.TABLE);
+        tables.addAll(getTables(connectorInstance, QUERY_VIEW, TableTypeEnum.VIEW));
         return tables;
         return tables;
     }
     }
 
 
@@ -101,8 +101,8 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
                 buildTableName(table.getName()));
                 buildTableName(table.getName()));
     }
     }
 
 
-    private List<Table> getTables(DatabaseConnectorInstance connectorMapper, String sql, TableTypeEnum type) {
-        List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
+    private List<Table> getTables(DatabaseConnectorInstance connectorInstance, String sql, TableTypeEnum type) {
+        List<String> tableNames = connectorInstance.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
         if (!CollectionUtils.isEmpty(tableNames)) {
         if (!CollectionUtils.isEmpty(tableNames)) {
             return tableNames.stream().map(name -> new Table(name, type.getCode())).collect(Collectors.toList());
             return tableNames.stream().map(name -> new Table(name, type.getCode())).collect(Collectors.toList());
         }
         }

+ 19 - 19
dbsyncer-connector/src/main/test/ConnectionTest.java

@@ -32,10 +32,10 @@ public class ConnectionTest {
 
 
     @Test
     @Test
     public void testByte() {
     public void testByte() {
-        final DatabaseConnectorInstance connectorMapper = new DatabaseConnectorInstance(createOracleConfig());
+        final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createOracleConfig());
 
 
         String executeSql = "UPDATE \"my_user\" SET \"name\"=?,\"clo\"=? WHERE \"id\"=?";
         String executeSql = "UPDATE \"my_user\" SET \"name\"=?,\"clo\"=? WHERE \"id\"=?";
-        int[] execute = connectorMapper.execute(databaseTemplate ->
+        int[] execute = connectorInstance.execute(databaseTemplate ->
                 databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
                 databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
                     @Override
                     @Override
                     public void setValues(PreparedStatement ps, int i) {
                     public void setValues(PreparedStatement ps, int i) {
@@ -64,7 +64,7 @@ public class ConnectionTest {
 
 
     @Test
     @Test
     public void testConnection() throws InterruptedException {
     public void testConnection() throws InterruptedException {
-        final DatabaseConnectorInstance connectorMapper = new DatabaseConnectorInstance(createSqlServerConfig());
+        final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createSqlServerConfig());
 
 
         // 模拟并发
         // 模拟并发
         final int threadSize = 100;
         final int threadSize = 100;
@@ -80,7 +80,7 @@ public class ConnectionTest {
                     // 模拟操作
                     // 模拟操作
                     System.out.println(String.format("%s %s:%s", LocalDateTime.now(), Thread.currentThread().getName(), k));
                     System.out.println(String.format("%s %s:%s", LocalDateTime.now(), Thread.currentThread().getName(), k));
 
 
-                    Object execute = connectorMapper.execute(tem -> tem.queryForObject("select 1", Integer.class));
+                    Object execute = connectorInstance.execute(tem -> tem.queryForObject("select 1", Integer.class));
                     System.out.println(String.format("%s %s:%s execute=>%s", LocalDateTime.now(), Thread.currentThread().getName(), k, execute));
                     System.out.println(String.format("%s %s:%s execute=>%s", LocalDateTime.now(), Thread.currentThread().getName(), k, execute));
 
 
                 } catch (InterruptedException e) {
                 } catch (InterruptedException e) {
@@ -109,7 +109,7 @@ public class ConnectionTest {
 
 
     @Test
     @Test
     public void testBatchInsert() {
     public void testBatchInsert() {
-        final DatabaseConnectorInstance connectorMapper = new DatabaseConnectorInstance(createMysqlConfig());
+        final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
 
 
         long begin = Instant.now().toEpochMilli();
         long begin = Instant.now().toEpochMilli();
         final int threadSize = 10;
         final int threadSize = 10;
@@ -131,14 +131,14 @@ public class ConnectionTest {
 
 
             if (i % 10000 == 0) {
             if (i % 10000 == 0) {
                 System.out.println(i + "-----------------正在处理");
                 System.out.println(i + "-----------------正在处理");
-                batchUpdate(connectorMapper, pool, sql, dataList, 1000);
+                batchUpdate(connectorInstance, pool, sql, dataList, 1000);
                 dataList.clear();
                 dataList.clear();
             }
             }
         }
         }
 
 
         if(!CollectionUtils.isEmpty(dataList)){
         if(!CollectionUtils.isEmpty(dataList)){
             System.out.println("-----------------正在处理剩余数据");
             System.out.println("-----------------正在处理剩余数据");
-            batchUpdate(connectorMapper, pool, sql, dataList, 1000);
+            batchUpdate(connectorInstance, pool, sql, dataList, 1000);
         }
         }
 
 
         pool.shutdown();
         pool.shutdown();
@@ -147,7 +147,7 @@ public class ConnectionTest {
 
 
     @Test
     @Test
     public void testBatchUpdate() {
     public void testBatchUpdate() {
-        final DatabaseConnectorInstance connectorMapper = new DatabaseConnectorInstance(createMysqlConfig());
+        final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
 
 
         long begin = Instant.now().toEpochMilli();
         long begin = Instant.now().toEpochMilli();
         final int threadSize = 10;
         final int threadSize = 10;
@@ -167,14 +167,14 @@ public class ConnectionTest {
 
 
                 if (i % 10000 == 0) {
                 if (i % 10000 == 0) {
                     System.out.println(i + "-----------------正在处理");
                     System.out.println(i + "-----------------正在处理");
-                    batchUpdate(connectorMapper, pool, sql, dataList, 1000);
+                    batchUpdate(connectorInstance, pool, sql, dataList, 1000);
                     dataList.clear();
                     dataList.clear();
                 }
                 }
             }
             }
 
 
             if (!CollectionUtils.isEmpty(dataList)) {
             if (!CollectionUtils.isEmpty(dataList)) {
                 System.out.println("-----------------正在处理剩余数据");
                 System.out.println("-----------------正在处理剩余数据");
-                batchUpdate(connectorMapper, pool, sql, dataList, 1000);
+                batchUpdate(connectorInstance, pool, sql, dataList, 1000);
             }
             }
             k--;
             k--;
         }
         }
@@ -185,7 +185,7 @@ public class ConnectionTest {
 
 
     @Test
     @Test
     public void testBatchDelete() {
     public void testBatchDelete() {
-        final DatabaseConnectorInstance connectorMapper = new DatabaseConnectorInstance(createMysqlConfig());
+        final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
 
 
         long begin = Instant.now().toEpochMilli();
         long begin = Instant.now().toEpochMilli();
         final int threadSize = 10;
         final int threadSize = 10;
@@ -201,14 +201,14 @@ public class ConnectionTest {
 
 
             if (i % 10000 == 0) {
             if (i % 10000 == 0) {
                 System.out.println(i + "-----------------正在处理");
                 System.out.println(i + "-----------------正在处理");
-                batchUpdate(connectorMapper, pool, sql, dataList, 1000);
+                batchUpdate(connectorInstance, pool, sql, dataList, 1000);
                 dataList.clear();
                 dataList.clear();
             }
             }
         }
         }
 
 
         if (!CollectionUtils.isEmpty(dataList)) {
         if (!CollectionUtils.isEmpty(dataList)) {
             System.out.println("-----------------正在处理剩余数据");
             System.out.println("-----------------正在处理剩余数据");
-            batchUpdate(connectorMapper, pool, sql, dataList, 1000);
+            batchUpdate(connectorInstance, pool, sql, dataList, 1000);
         }
         }
 
 
         pool.shutdown();
         pool.shutdown();
@@ -226,7 +226,7 @@ public class ConnectionTest {
         return s.toString();
         return s.toString();
     }
     }
 
 
-    private void batchUpdate(DatabaseConnectorInstance connectorMapper, ExecutorService pool, String sql, List<Object[]> dataList, int batchSize) {
+    private void batchUpdate(DatabaseConnectorInstance connectorInstance, ExecutorService pool, String sql, List<Object[]> dataList, int batchSize) {
         int total = dataList.size();
         int total = dataList.size();
         int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
         int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
         final CountDownLatch latch = new CountDownLatch(taskSize);
         final CountDownLatch latch = new CountDownLatch(taskSize);
@@ -245,7 +245,7 @@ public class ConnectionTest {
 
 
             pool.submit(() -> {
             pool.submit(() -> {
                 try {
                 try {
-                    connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, data));
+                    connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, data));
                 } catch (Exception e) {
                 } catch (Exception e) {
                     logger.error(e.getMessage());
                     logger.error(e.getMessage());
                 } finally {
                 } finally {
@@ -279,8 +279,8 @@ public class ConnectionTest {
     public void testGetColumnsDetails() {
     public void testGetColumnsDetails() {
         final String schema = "root";
         final String schema = "root";
         final String tableNamePattern = "sw_test";
         final String tableNamePattern = "sw_test";
-        final DatabaseConnectorInstance connectorMapper = new DatabaseConnectorInstance(createMysqlConfig());
-        connectorMapper.execute(databaseTemplate -> {
+        final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
+        connectorInstance.execute(databaseTemplate -> {
             SimpleConnection connection = databaseTemplate.getSimpleConnection();
             SimpleConnection connection = databaseTemplate.getSimpleConnection();
             Connection conn = connection.getConnection();
             Connection conn = connection.getConnection();
             String databaseCatalog = conn.getCatalog();
             String databaseCatalog = conn.getCatalog();
@@ -299,9 +299,9 @@ public class ConnectionTest {
     }
     }
 
 
     private List<Table> getTables(DatabaseConfig config, final String catalog, final String schema, final String tableNamePattern) {
     private List<Table> getTables(DatabaseConfig config, final String catalog, final String schema, final String tableNamePattern) {
-        final DatabaseConnectorInstance connectorMapper = new DatabaseConnectorInstance(config);
+        final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(config);
         List<Table> tables = new ArrayList<>();
         List<Table> tables = new ArrayList<>();
-        connectorMapper.execute(databaseTemplate -> {
+        connectorInstance.execute(databaseTemplate -> {
             SimpleConnection connection = databaseTemplate.getSimpleConnection();
             SimpleConnection connection = databaseTemplate.getSimpleConnection();
             Connection conn = connection.getConnection();
             Connection conn = connection.getConnection();
             String databaseCatalog = null == catalog ? conn.getCatalog() : catalog;
             String databaseCatalog = null == catalog ? conn.getCatalog() : catalog;

+ 5 - 5
dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/PreloadTemplate.java

@@ -83,8 +83,8 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
         // Load plugins
         // Load plugins
         pluginFactory.loadPlugins();
         pluginFactory.loadPlugins();
 
 
-        // Load connectorMappers
-        loadConnectorMapper();
+        // Load connectorInstances
+        loadConnectorInstance();
 
 
         // Launch drivers
         // Launch drivers
         launch();
         launch();
@@ -111,8 +111,8 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
         Stream.of(CommandEnum.PRELOAD_SYSTEM, CommandEnum.PRELOAD_USER, CommandEnum.PRELOAD_CONNECTOR, CommandEnum.PRELOAD_MAPPING,
         Stream.of(CommandEnum.PRELOAD_SYSTEM, CommandEnum.PRELOAD_USER, CommandEnum.PRELOAD_CONNECTOR, CommandEnum.PRELOAD_MAPPING,
                 CommandEnum.PRELOAD_META, CommandEnum.PRELOAD_PROJECT_GROUP).forEach(commandEnum -> reload(map, commandEnum));
                 CommandEnum.PRELOAD_META, CommandEnum.PRELOAD_PROJECT_GROUP).forEach(commandEnum -> reload(map, commandEnum));
 
 
-        // Load connectorMappers
-        loadConnectorMapper();
+        // Load connectorInstances
+        loadConnectorInstance();
 
 
         // Launch drivers
         // Launch drivers
         launch();
         launch();
@@ -193,7 +193,7 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
         }
         }
     }
     }
 
 
-    private void loadConnectorMapper() {
+    private void loadConnectorInstance() {
         List<Connector> list = profileComponent.getConnectorAll();
         List<Connector> list = profileComponent.getConnectorAll();
         if (!CollectionUtils.isEmpty(list)) {
         if (!CollectionUtils.isEmpty(list)) {
             list.forEach(connector -> {
             list.forEach(connector -> {

+ 5 - 5
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDQLConnector.java

@@ -26,8 +26,8 @@ import java.util.Map;
 public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
 public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
 
 
     @Override
     @Override
-    public List<Table> getTable(DatabaseConnectorInstance connectorMapper) {
-        DatabaseConfig cfg = connectorMapper.getConfig();
+    public List<Table> getTable(DatabaseConnectorInstance connectorInstance) {
+        DatabaseConfig cfg = connectorInstance.getConfig();
         List<SqlTable> sqlTables = cfg.getSqlTables();
         List<SqlTable> sqlTables = cfg.getSqlTables();
         List<Table> tables = new ArrayList<>();
         List<Table> tables = new ArrayList<>();
         if (!CollectionUtils.isEmpty(sqlTables)) {
         if (!CollectionUtils.isEmpty(sqlTables)) {
@@ -44,8 +44,8 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
     }
     }
 
 
     @Override
     @Override
-    public MetaInfo getMetaInfo(DatabaseConnectorInstance connectorMapper, String sqlName) {
-        DatabaseConfig cfg = connectorMapper.getConfig();
+    public MetaInfo getMetaInfo(DatabaseConnectorInstance connectorInstance, String sqlName) {
+        DatabaseConfig cfg = connectorInstance.getConfig();
         List<SqlTable> sqlTables = cfg.getSqlTables();
         List<SqlTable> sqlTables = cfg.getSqlTables();
         for (SqlTable s : sqlTables) {
         for (SqlTable s : sqlTables) {
             if (StringUtil.equals(s.getSqlName(), sqlName)) {
             if (StringUtil.equals(s.getSqlName(), sqlName)) {
@@ -54,7 +54,7 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
                 sql = sql.replace("\r", " ");
                 sql = sql.replace("\r", " ");
                 sql = sql.replace("\n", " ");
                 sql = sql.replace("\n", " ");
                 String queryMetaSql = StringUtil.contains(sql, " WHERE ") ? s.getSql() + " AND 1!=1 " : s.getSql() + " WHERE 1!=1 ";
                 String queryMetaSql = StringUtil.contains(sql, " WHERE ") ? s.getSql() + " AND 1!=1 " : s.getSql() + " WHERE 1!=1 ";
-                return connectorMapper.execute(databaseTemplate -> super.getMetaInfo(databaseTemplate, queryMetaSql, getSchema(cfg), s.getTable()));
+                return connectorInstance.execute(databaseTemplate -> super.getMetaInfo(databaseTemplate, queryMetaSql, getSchema(cfg), s.getTable()));
             }
             }
         }
         }
         return null;
         return null;

+ 29 - 29
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDatabaseConnector.java

@@ -78,13 +78,13 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     }
     }
 
 
     @Override
     @Override
-    public void disconnect(DatabaseConnectorInstance connectorMapper) {
-        connectorMapper.close();
+    public void disconnect(DatabaseConnectorInstance connectorInstance) {
+        connectorInstance.close();
     }
     }
 
 
     @Override
     @Override
-    public boolean isAlive(DatabaseConnectorInstance connectorMapper) {
-        Integer count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(getValidationQuery(), Integer.class));
+    public boolean isAlive(DatabaseConnectorInstance connectorInstance) {
+        Integer count = connectorInstance.execute(databaseTemplate -> databaseTemplate.queryForObject(getValidationQuery(), Integer.class));
         return null != count && count > 0;
         return null != count && count > 0;
     }
     }
 
 
@@ -94,15 +94,15 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     }
     }
 
 
     @Override
     @Override
-    public List<Table> getTable(DatabaseConnectorInstance connectorMapper) {
-        return getTable(connectorMapper, null, getSchema(connectorMapper.getConfig()), null);
+    public List<Table> getTable(DatabaseConnectorInstance connectorInstance) {
+        return getTable(connectorInstance, null, getSchema(connectorInstance.getConfig()), null);
     }
     }
 
 
     @Override
     @Override
-    public MetaInfo getMetaInfo(DatabaseConnectorInstance connectorMapper, String tableNamePattern) {
+    public MetaInfo getMetaInfo(DatabaseConnectorInstance connectorInstance, String tableNamePattern) {
         List<Field> fields = new ArrayList<>();
         List<Field> fields = new ArrayList<>();
-        final String schema = getSchema(connectorMapper.getConfig());
-        connectorMapper.execute(databaseTemplate -> {
+        final String schema = getSchema(connectorInstance.getConfig());
+        connectorInstance.execute(databaseTemplate -> {
             SimpleConnection connection = databaseTemplate.getSimpleConnection();
             SimpleConnection connection = databaseTemplate.getSimpleConnection();
             Connection conn = connection.getConnection();
             Connection conn = connection.getConnection();
             String catalog = conn.getCatalog();
             String catalog = conn.getCatalog();
@@ -122,7 +122,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     }
     }
 
 
     @Override
     @Override
-    public long getCount(DatabaseConnectorInstance connectorMapper, Map<String, String> command) {
+    public long getCount(DatabaseConnectorInstance connectorInstance, Map<String, String> command) {
         if (CollectionUtils.isEmpty(command)) {
         if (CollectionUtils.isEmpty(command)) {
             return 0L;
             return 0L;
         }
         }
@@ -134,14 +134,14 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         }
         }
 
 
         // 2、返回结果集
         // 2、返回结果集
-        return connectorMapper.execute(databaseTemplate -> {
+        return connectorInstance.execute(databaseTemplate -> {
             Long count = databaseTemplate.queryForObject(queryCountSql, Long.class);
             Long count = databaseTemplate.queryForObject(queryCountSql, Long.class);
             return count == null ? 0 : count;
             return count == null ? 0 : count;
         });
         });
     }
     }
 
 
     @Override
     @Override
-    public Result reader(DatabaseConnectorInstance connectorMapper, ReaderConfig config) {
+    public Result reader(DatabaseConnectorInstance connectorInstance, ReaderConfig config) {
         // 1、获取select SQL
         // 1、获取select SQL
         boolean supportedCursor = enableCursor() && config.isSupportedCursor() && null != config.getCursors();
         boolean supportedCursor = enableCursor() && config.isSupportedCursor() && null != config.getCursors();
         String queryKey = supportedCursor ? ConnectorConstant.OPERTION_QUERY_CURSOR : ConnectorConstant.OPERTION_QUERY;
         String queryKey = supportedCursor ? ConnectorConstant.OPERTION_QUERY_CURSOR : ConnectorConstant.OPERTION_QUERY;
@@ -152,14 +152,14 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         Collections.addAll(config.getArgs(), supportedCursor ? getPageCursorArgs(config) : getPageArgs(config));
         Collections.addAll(config.getArgs(), supportedCursor ? getPageCursorArgs(config) : getPageArgs(config));
 
 
         // 3、执行SQL
         // 3、执行SQL
-        List<Map<String, Object>> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
+        List<Map<String, Object>> list = connectorInstance.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
 
 
         // 4、返回结果集
         // 4、返回结果集
         return new Result(list);
         return new Result(list);
     }
     }
 
 
     @Override
     @Override
-    public Result writer(DatabaseConnectorInstance connectorMapper, WriterBatchConfig config) {
+    public Result writer(DatabaseConnectorInstance connectorInstance, WriterBatchConfig config) {
         String event = config.getEvent();
         String event = config.getEvent();
         List<Map> data = config.getData();
         List<Map> data = config.getData();
 
 
@@ -190,9 +190,9 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         int[] execute = null;
         int[] execute = null;
         try {
         try {
             // 2、设置参数
             // 2、设置参数
-            execute = connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(executeSql, batchRows(fields, data)));
+            execute = connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(executeSql, batchRows(fields, data)));
         } catch (Exception e) {
         } catch (Exception e) {
-            data.forEach(row -> forceUpdate(result, connectorMapper, config, pkFields, row));
+            data.forEach(row -> forceUpdate(result, connectorInstance, config, pkFields, row));
         }
         }
 
 
         if (null != execute) {
         if (null != execute) {
@@ -202,7 +202,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
                     result.getSuccessData().add(data.get(i));
                     result.getSuccessData().add(data.get(i));
                     continue;
                     continue;
                 }
                 }
-                forceUpdate(result, connectorMapper, config, pkFields, data.get(i));
+                forceUpdate(result, connectorInstance, config, pkFields, data.get(i));
             }
             }
         }
         }
         return result;
         return result;
@@ -440,14 +440,14 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     /**
     /**
      * 获取表列表
      * 获取表列表
      *
      *
-     * @param connectorMapper
+     * @param connectorInstance
      * @param catalog
      * @param catalog
      * @param schema
      * @param schema
      * @param tableNamePattern
      * @param tableNamePattern
      * @return
      * @return
      */
      */
-    private List<Table> getTable(DatabaseConnectorInstance connectorMapper, String catalog, String schema, String tableNamePattern) {
-        return connectorMapper.execute(databaseTemplate -> {
+    private List<Table> getTable(DatabaseConnectorInstance connectorInstance, String catalog, String schema, String tableNamePattern) {
+        return connectorInstance.execute(databaseTemplate -> {
             List<Table> tables = new ArrayList<>();
             List<Table> tables = new ArrayList<>();
             SimpleConnection connection = databaseTemplate.getSimpleConnection();
             SimpleConnection connection = databaseTemplate.getSimpleConnection();
             Connection conn = connection.getConnection();
             Connection conn = connection.getConnection();
@@ -569,7 +569,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return args;
         return args;
     }
     }
 
 
-    private void forceUpdate(Result result, DatabaseConnectorInstance connectorMapper, WriterBatchConfig config, List<Field> pkFields,
+    private void forceUpdate(Result result, DatabaseConnectorInstance connectorInstance, WriterBatchConfig config, List<Field> pkFields,
                              Map row) {
                              Map row) {
         if (isUpdate(config.getEvent()) || isInsert(config.getEvent())) {
         if (isUpdate(config.getEvent()) || isInsert(config.getEvent())) {
             // 存在执行覆盖更新,否则写入
             // 存在执行覆盖更新,否则写入
@@ -579,14 +579,14 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             for (int i = 0; i < size; i++) {
             for (int i = 0; i < size; i++) {
                 args[i] = row.get(pkFields.get(i).getName());
                 args[i] = row.get(pkFields.get(i).getName());
             }
             }
-            final String event = existRow(connectorMapper, queryCount, args) ? ConnectorConstant.OPERTION_UPDATE
+            final String event = existRow(connectorInstance, queryCount, args) ? ConnectorConstant.OPERTION_UPDATE
                     : ConnectorConstant.OPERTION_INSERT;
                     : ConnectorConstant.OPERTION_INSERT;
             logger.warn("{}表执行{}失败, 重新执行{}, {}", config.getTableName(), config.getEvent(), event, row);
             logger.warn("{}表执行{}失败, 重新执行{}, {}", config.getTableName(), config.getEvent(), event, row);
-            writer(result, connectorMapper, config, pkFields, row, event);
+            writer(result, connectorInstance, config, pkFields, row, event);
         }
         }
     }
     }
 
 
-    private void writer(Result result, DatabaseConnectorInstance connectorMapper, WriterBatchConfig config, List<Field> pkFields, Map row,
+    private void writer(Result result, DatabaseConnectorInstance connectorInstance, WriterBatchConfig config, List<Field> pkFields, Map row,
                         String event) {
                         String event) {
         // 1、获取 SQL
         // 1、获取 SQL
         String sql = config.getCommand().get(event);
         String sql = config.getCommand().get(event);
@@ -604,7 +604,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
 
         try {
         try {
             // 2、设置参数
             // 2、设置参数
-            int execute = connectorMapper.execute(databaseTemplate -> databaseTemplate.update(sql, batchRow(fields, row)));
+            int execute = connectorInstance.execute(databaseTemplate -> databaseTemplate.update(sql, batchRow(fields, row)));
             if (execute == 0) {
             if (execute == 0) {
                 throw new SdkException(String.format("尝试执行[%s]失败", event));
                 throw new SdkException(String.format("尝试执行[%s]失败", event));
             }
             }
@@ -618,10 +618,10 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         }
         }
     }
     }
 
 
-    private boolean existRow(DatabaseConnectorInstance connectorMapper, String sql, Object[] args) {
+    private boolean existRow(DatabaseConnectorInstance connectorInstance, String sql, Object[] args) {
         int rowNum = 0;
         int rowNum = 0;
         try {
         try {
-            rowNum = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, Integer.class, args));
+            rowNum = connectorInstance.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, Integer.class, args));
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, args);
             logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, args);
         }
         }
@@ -651,11 +651,11 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     }
     }
 
 
     @Override
     @Override
-    public Result writerDDL(DatabaseConnectorInstance connectorMapper, DDLConfig config) {
+    public Result writerDDL(DatabaseConnectorInstance connectorInstance, DDLConfig config) {
         Result result = new Result();
         Result result = new Result();
         try {
         try {
             Assert.hasText(config.getSql(), "执行SQL语句不能为空.");
             Assert.hasText(config.getSql(), "执行SQL语句不能为空.");
-            connectorMapper.execute(databaseTemplate -> {
+            connectorInstance.execute(databaseTemplate -> {
                 databaseTemplate.execute(config.getSql());
                 databaseTemplate.execute(config.getSql());
                 return true;
                 return true;
             });
             });

+ 3 - 3
dbsyncer-web/src/main/resources/public/plugin/plugin.html

@@ -77,11 +77,11 @@ public class MyPlugin implements ConvertService {
         // 完成同步后调用该方法
         // 完成同步后调用该方法
        logger.info("插件正在处理同步成功的数据,目标源表:{},事件:{},条数:{}", context.getTargetTableName(), context.getEvent(), context.getTargetList().size());
        logger.info("插件正在处理同步成功的数据,目标源表:{},事件:{},条数:{}", context.getTargetTableName(), context.getEvent(), context.getTargetList().size());
 
 
-        ConnectorMapper connectorMapper = context.getSourceConnectorMapper();
+        ConnectorInstance connectorInstance = context.getSourceConnectorInstance();
 
 
         // 获取关系型数据库连接,实现自己的业务逻辑...
         // 获取关系型数据库连接,实现自己的业务逻辑...
-        if (connectorMapper instanceof DatabaseConnectorMapper) {
-            DatabaseConnectorMapper db = (DatabaseConnectorMapper) connectorMapper;
+        if (connectorInstance instanceof DatabaseConnectorInstance) {
+            DatabaseConnectorInstance db = (DatabaseConnectorInstance) connectorInstance;
             // 方式一(推荐):
             // 方式一(推荐):
             String query = "select * from my_user";
             String query = "select * from my_user";
             db.execute(databaseTemplate -> databaseTemplate.queryForList(query));
             db.execute(databaseTemplate -> databaseTemplate.queryForList(query));