穿云 hai 3 meses
pai
achega
8c663e6120
Modificáronse 15 ficheiros con 142 adicións e 173 borrados
  1. 16 9
      dbsyncer-connector/dbsyncer-connector-base/src/main/java/org/dbsyncer/connector/base/ConnectorFactory.java
  2. 15 9
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/ElasticsearchConnector.java
  3. 10 7
      dbsyncer-connector/dbsyncer-connector-file/src/main/java/org/dbsyncer/connector/file/FileConnector.java
  4. 5 5
      dbsyncer-connector/dbsyncer-connector-kafka/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java
  5. 1 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/BufferActuatorRouter.java
  6. 1 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java
  7. 27 22
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/ParserComponentImpl.java
  8. 13 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/SystemConfig.java
  9. 0 76
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/WriterBatchConfig.java
  10. 12 12
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/AbstractConnector.java
  11. 21 19
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDatabaseConnector.java
  12. 6 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/plugin/AbstractPluginContext.java
  13. 9 0
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/plugin/PluginContext.java
  14. 3 3
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/spi/ConnectorService.java
  15. 3 8
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/util/PrimaryKeyUtil.java

+ 16 - 9
dbsyncer-connector/dbsyncer-connector-base/src/main/java/org/dbsyncer/connector/base/ConnectorFactory.java

@@ -7,13 +7,13 @@ import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.sdk.config.CommandConfig;
 import org.dbsyncer.sdk.config.DDLConfig;
-import org.dbsyncer.sdk.config.WriterBatchConfig;
 import org.dbsyncer.sdk.connector.AbstractConnector;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
 import org.dbsyncer.sdk.listener.Listener;
 import org.dbsyncer.sdk.model.ConnectorConfig;
 import org.dbsyncer.sdk.model.MetaInfo;
 import org.dbsyncer.sdk.model.Table;
+import org.dbsyncer.sdk.plugin.PluginContext;
 import org.dbsyncer.sdk.plugin.ReaderContext;
 import org.dbsyncer.sdk.schema.SchemaResolver;
 import org.dbsyncer.sdk.spi.ConnectorService;
@@ -22,7 +22,14 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
-import java.util.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -175,28 +182,28 @@ public class ConnectorFactory implements DisposableBean {
         return result;
     }
 
-    public Result writer(ConnectorInstance connectorInstance, WriterBatchConfig config) {
+    public Result writer(PluginContext context) {
+        ConnectorInstance connectorInstance = context.getTargetConnectorInstance();
         Assert.notNull(connectorInstance, "ConnectorInstance can not null");
-        Assert.notNull(config, "WriterBatchConfig can not null");
         ConnectorService connector = getConnectorService(connectorInstance.getConfig());
         if (connector instanceof AbstractConnector) {
             AbstractConnector conn = (AbstractConnector) connector;
             try {
                 SchemaResolver schemaResolver = connector.getSchemaResolver();
-                if (config.isEnableSchemaResolver() && schemaResolver != null) {
-                    conn.convertProcessBeforeWriter(schemaResolver, config);
+                if (context.isEnableSchemaResolver() && schemaResolver != null) {
+                    conn.convertProcessBeforeWriter(schemaResolver, context);
                 } else {
-                    conn.convertProcessBeforeWriter(connectorInstance, config);
+                    conn.convertProcessBeforeWriter(connectorInstance, context);
                 }
             } catch (Exception e) {
                 Result result = new Result();
                 result.getError().append(e.getMessage());
-                result.addFailData(config.getData());
+                result.addFailData(context.getTargetList());
                 return result;
             }
         }
 
-        Result result = connector.writer(connectorInstance, config);
+        Result result = connector.writer(connectorInstance, context);
         Assert.notNull(result, "Connector writer batch result can not null");
         return result;
     }

+ 15 - 9
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/ElasticsearchConnector.java

@@ -18,7 +18,6 @@ import org.dbsyncer.connector.elasticsearch.schema.ESOtherValueMapper;
 import org.dbsyncer.connector.elasticsearch.util.ESUtil;
 import org.dbsyncer.connector.elasticsearch.validator.ESConfigValidator;
 import org.dbsyncer.sdk.config.CommandConfig;
-import org.dbsyncer.sdk.config.WriterBatchConfig;
 import org.dbsyncer.sdk.connector.AbstractConnector;
 import org.dbsyncer.sdk.connector.ConfigValidator;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
@@ -31,6 +30,7 @@ import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.model.Filter;
 import org.dbsyncer.sdk.model.MetaInfo;
 import org.dbsyncer.sdk.model.Table;
+import org.dbsyncer.sdk.plugin.PluginContext;
 import org.dbsyncer.sdk.plugin.ReaderContext;
 import org.dbsyncer.sdk.spi.ConnectorService;
 import org.dbsyncer.sdk.util.PrimaryKeyUtil;
@@ -65,7 +65,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.Types;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -248,21 +254,21 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
     }
 
     @Override
-    public Result writer(ESConnectorInstance connectorInstance, WriterBatchConfig config) {
-        List<Map> data = config.getData();
-        if (CollectionUtils.isEmpty(data) || CollectionUtils.isEmpty(config.getFields())) {
+    public Result writer(ESConnectorInstance connectorInstance, PluginContext context) {
+        List<Map> data = context.getTargetList();
+        if (CollectionUtils.isEmpty(data)) {
             logger.error("writer data can not be empty.");
             throw new ElasticsearchException("writer data can not be empty.");
         }
 
         final Result result = new Result();
-        final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
+        final List<Field> pkFields = PrimaryKeyUtil.findExistPrimaryKeyFields(context.getTargetFields());
         try {
             final BulkRequest request = new BulkRequest();
             final String pk = pkFields.get(0).getName();
-            final String indexName = config.getCommand().get(_TARGET_INDEX);
-            final String type = config.getCommand().get(_TYPE);
-            data.forEach(row -> addRequest(request, indexName, type, config.getEvent(), String.valueOf(row.get(pk)), row));
+            final String indexName = context.getCommand().get(_TARGET_INDEX);
+            final String type = context.getCommand().get(_TYPE);
+            data.forEach(row -> addRequest(request, indexName, type, context.getEvent(), String.valueOf(row.get(pk)), row));
 
             BulkResponse response = connectorInstance.getConnection().bulkWithVersion(request, RequestOptions.DEFAULT);
             RestStatus restStatus = response.status();

+ 10 - 7
dbsyncer-connector/dbsyncer-connector-file/src/main/java/org/dbsyncer/connector/file/FileConnector.java

@@ -14,7 +14,6 @@ import org.dbsyncer.connector.file.model.FileResolver;
 import org.dbsyncer.connector.file.model.FileSchema;
 import org.dbsyncer.connector.file.validator.FileConfigValidator;
 import org.dbsyncer.sdk.config.CommandConfig;
-import org.dbsyncer.sdk.config.WriterBatchConfig;
 import org.dbsyncer.sdk.connector.AbstractConnector;
 import org.dbsyncer.sdk.connector.ConfigValidator;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
@@ -23,13 +22,18 @@ import org.dbsyncer.sdk.listener.Listener;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.model.MetaInfo;
 import org.dbsyncer.sdk.model.Table;
+import org.dbsyncer.sdk.plugin.PluginContext;
 import org.dbsyncer.sdk.plugin.ReaderContext;
 import org.dbsyncer.sdk.spi.ConnectorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import java.io.*;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.sql.Types;
@@ -181,24 +185,23 @@ public final class FileConnector extends AbstractConnector implements ConnectorS
     }
 
     @Override
-    public Result writer(FileConnectorInstance connectorInstance, WriterBatchConfig config) {
-        List<Map> data = config.getData();
+    public Result writer(FileConnectorInstance connectorInstance, PluginContext context) {
+        List<Map> data = context.getTargetList();
         if (CollectionUtils.isEmpty(data)) {
             logger.error("writer data can not be empty.");
             throw new FileException("writer data can not be empty.");
         }
 
-        final List<Field> fields = config.getFields();
         final String separator = new String(new char[]{connectorInstance.getConfig().getSeparator()});
 
         Result result = new Result();
         OutputStream output = null;
         try {
-            final String filePath = connectorInstance.getFilePath(config.getCommand().get(FILE_NAME));
+            final String filePath = connectorInstance.getFilePath(context.getCommand().get(FILE_NAME));
             output = new FileOutputStream(filePath, true);
             List<String> lines = data.stream().map(row -> {
                 List<String> array = new ArrayList<>();
-                fields.forEach(field -> {
+                context.getTargetFields().forEach(field -> {
                     Object o = row.get(field.getName());
                     array.add(null != o ? String.valueOf(o) : "");
                 });

+ 5 - 5
dbsyncer-connector/dbsyncer-connector-kafka/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -10,7 +10,6 @@ import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.connector.kafka.config.KafkaConfig;
 import org.dbsyncer.connector.kafka.validator.KafkaConfigValidator;
 import org.dbsyncer.sdk.config.CommandConfig;
-import org.dbsyncer.sdk.config.WriterBatchConfig;
 import org.dbsyncer.sdk.connector.AbstractConnector;
 import org.dbsyncer.sdk.connector.ConfigValidator;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
@@ -18,6 +17,7 @@ import org.dbsyncer.sdk.listener.Listener;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.model.MetaInfo;
 import org.dbsyncer.sdk.model.Table;
+import org.dbsyncer.sdk.plugin.PluginContext;
 import org.dbsyncer.sdk.plugin.ReaderContext;
 import org.dbsyncer.sdk.spi.ConnectorService;
 import org.dbsyncer.sdk.util.PrimaryKeyUtil;
@@ -102,16 +102,16 @@ public class KafkaConnector extends AbstractConnector implements ConnectorServic
     }
 
     @Override
-    public Result writer(KafkaConnectorInstance connectorInstance, WriterBatchConfig config) {
-        List<Map> data = config.getData();
-        if (CollectionUtils.isEmpty(data) || CollectionUtils.isEmpty(config.getFields())) {
+    public Result writer(KafkaConnectorInstance connectorInstance, PluginContext context) {
+        List<Map> data = context.getTargetList();
+        if (CollectionUtils.isEmpty(data)) {
             logger.error("writer data can not be empty.");
             throw new KafkaException("writer data can not be empty.");
         }
 
         Result result = new Result();
         final KafkaConfig cfg = connectorInstance.getConfig();
-        final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
+        final List<Field> pkFields = PrimaryKeyUtil.findExistPrimaryKeyFields(context.getTargetFields());
         try {
             String topic = cfg.getTopic();
             // 默认取第一个主键

+ 1 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/BufferActuatorRouter.java

@@ -90,9 +90,8 @@ public final class BufferActuatorRouter implements DisposableBean {
     public void unbind(String metaId) {
         router.computeIfPresent(metaId, (k, processor) -> {
             processor.values().forEach(TableGroupBufferActuator::stop);
-            return processor;
+            return null;
         });
-        router.remove(metaId);
     }
 
     private void offer(AbstractBufferActuator actuator, ChangedEvent event) {

+ 1 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -190,6 +190,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         context.setTargetList(targetDataList);
         context.setPluginExtInfo(tableGroup.getPluginExtInfo());
         context.setForceUpdate(mapping.isForceUpdate());
+        context.setEnableSchemaResolver(profileComponent.getSystemConfig().isEnableSchemaResolver());
         pluginFactory.process(tableGroup.getPlugin(), context, ProcessEnum.CONVERT);
 
         // 4、批量执行同步

+ 27 - 22
dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/ParserComponentImpl.java

@@ -10,7 +10,12 @@ import org.dbsyncer.connector.base.ConnectorFactory;
 import org.dbsyncer.parser.ParserComponent;
 import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.event.FullRefreshEvent;
-import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Picker;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.Task;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
@@ -18,11 +23,9 @@ import org.dbsyncer.plugin.PluginFactory;
 import org.dbsyncer.plugin.enums.ProcessEnum;
 import org.dbsyncer.plugin.impl.FullPluginContext;
 import org.dbsyncer.sdk.config.CommandConfig;
-import org.dbsyncer.sdk.config.WriterBatchConfig;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.model.ConnectorConfig;
-import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.model.MetaInfo;
 import org.dbsyncer.sdk.model.Table;
 import org.dbsyncer.sdk.plugin.PluginContext;
@@ -149,6 +152,7 @@ public class ParserComponentImpl implements ParserComponent {
         context.setTargetFields(picker.getTargetFields());
         context.setSupportedCursor(StringUtil.isNotBlank(command.get(ConnectorConstant.OPERTION_QUERY_CURSOR)));
         context.setPageSize(mapping.getReadNum());
+        context.setEnableSchemaResolver(profileComponent.getSystemConfig().isEnableSchemaResolver());
         // 0、插件前置处理
         pluginFactory.process(group.getPlugin(), context, ProcessEnum.BEFORE);
 
@@ -212,16 +216,11 @@ public class ParserComponentImpl implements ParserComponent {
 
         List<Map> dataList = context.getTargetList();
         int batchSize = context.getBatchSize();
-        String tableName = context.getTargetTableName();
-        String event = context.getEvent();
-        Map<String, String> command = context.getCommand();
-        List<Field> fields = context.getTargetFields();
         // 总数
         int total = dataList.size();
         // 单次任务
         if (total <= batchSize) {
-            WriterBatchConfig batchConfig = new WriterBatchConfig(tableName, event, command, fields, dataList, context.isForceUpdate(), context.isEnableSchemaResolver());
-            return connectorFactory.writer(context.getTargetConnectorInstance(), batchConfig);
+            return connectorFactory.writer(context);
         }
 
         // 批量任务, 拆分
@@ -241,19 +240,25 @@ public class ParserComponentImpl implements ParserComponent {
                 toIndex += batchSize;
             }
 
-            executor.execute(() -> {
-                try {
-                    WriterBatchConfig batchConfig = new WriterBatchConfig(tableName, event, command, fields, data, context.isForceUpdate(), context.isEnableSchemaResolver());
-                    Result w = connectorFactory.writer(context.getTargetConnectorInstance(), batchConfig);
-                    result.addSuccessData(w.getSuccessData());
-                    result.addFailData(w.getFailData());
-                    result.getError().append(w.getError());
-                } catch (Exception e) {
-                    logger.error(e.getMessage());
-                } finally {
-                    latch.countDown();
-                }
-            });
+            try {
+                PluginContext tmpContext = (PluginContext) context.clone();
+                tmpContext.setTargetList(data);
+                executor.execute(() -> {
+                    try {
+                        Result w = connectorFactory.writer(tmpContext);
+                        result.addSuccessData(w.getSuccessData());
+                        result.addFailData(w.getFailData());
+                        result.getError().append(w.getError());
+                    } catch (Exception e) {
+                        logger.error(e.getMessage());
+                    } finally {
+                        latch.countDown();
+                    }
+                });
+            } catch (CloneNotSupportedException e) {
+                logger.error(e.getMessage(), e);
+                latch.countDown();
+            }
         }
         try {
             latch.await();

+ 13 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/SystemConfig.java

@@ -69,6 +69,11 @@ public class SystemConfig extends ConfigModel {
      */
     private String watermark;
 
+    /**
+     * 是否启用字段解析器
+     */
+    private boolean enableSchemaResolver;
+
     public int getExpireDataDays() {
         return expireDataDays;
     }
@@ -148,4 +153,12 @@ public class SystemConfig extends ConfigModel {
     public void setWatermark(String watermark) {
         this.watermark = watermark;
     }
+
+    public boolean isEnableSchemaResolver() {
+        return enableSchemaResolver;
+    }
+
+    public void setEnableSchemaResolver(boolean enableSchemaResolver) {
+        this.enableSchemaResolver = enableSchemaResolver;
+    }
 }

+ 0 - 76
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/config/WriterBatchConfig.java

@@ -1,76 +0,0 @@
-package org.dbsyncer.sdk.config;
-
-import org.dbsyncer.sdk.model.Field;
-
-import java.util.List;
-import java.util.Map;
-
-public class WriterBatchConfig {
-
-    /**
-     * 表名
-     */
-    private final String tableName;
-    /**
-     * 事件
-     */
-    private final String event;
-    /**
-     * 执行命令
-     */
-    private final Map<String, String> command;
-    /**
-     * 字段信息
-     */
-    private final List<Field> fields;
-    /**
-     * 集合数据
-     */
-    private final List<Map> data;
-    /**
-     * 覆盖写入
-     */
-    private final boolean forceUpdate;
-    /**
-     * 是否启用字段解析器
-     */
-    private final boolean enableSchemaResolver;
-
-    public WriterBatchConfig(String tableName, String event, Map<String, String> command, List<Field> fields, List<Map> data, boolean forceUpdate, boolean enableSchemaResolver) {
-        this.tableName = tableName;
-        this.event = event;
-        this.command = command;
-        this.fields = fields;
-        this.data = data;
-        this.forceUpdate = forceUpdate;
-        this.enableSchemaResolver = enableSchemaResolver;
-    }
-
-    public String getTableName() {
-        return tableName;
-    }
-
-    public String getEvent() {
-        return event;
-    }
-
-    public Map<String, String> getCommand() {
-        return command;
-    }
-
-    public List<Field> getFields() {
-        return fields;
-    }
-
-    public List<Map> getData() {
-        return data;
-    }
-
-    public boolean isForceUpdate() {
-        return forceUpdate;
-    }
-
-    public boolean isEnableSchemaResolver() {
-        return enableSchemaResolver;
-    }
-}

+ 12 - 12
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/AbstractConnector.java

@@ -6,7 +6,6 @@ package org.dbsyncer.sdk.connector;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.SdkException;
-import org.dbsyncer.sdk.config.WriterBatchConfig;
 import org.dbsyncer.sdk.connector.schema.BigintValueMapper;
 import org.dbsyncer.sdk.connector.schema.BinaryValueMapper;
 import org.dbsyncer.sdk.connector.schema.BitValueMapper;
@@ -35,6 +34,7 @@ import org.dbsyncer.sdk.connector.schema.VarBinaryValueMapper;
 import org.dbsyncer.sdk.connector.schema.VarcharValueMapper;
 import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.model.Field;
+import org.dbsyncer.sdk.plugin.PluginContext;
 import org.dbsyncer.sdk.schema.SchemaResolver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,17 +87,17 @@ public abstract class AbstractConnector {
      * 转换字段值
      *
      * @param connectorInstance
-     * @param config
+     * @param context
      */
-    public void convertProcessBeforeWriter(ConnectorInstance connectorInstance, WriterBatchConfig config) {
-        if (CollectionUtils.isEmpty(config.getFields()) || CollectionUtils.isEmpty(config.getData())) {
+    public void convertProcessBeforeWriter(ConnectorInstance connectorInstance, PluginContext context) {
+        if (CollectionUtils.isEmpty(context.getTargetFields()) || CollectionUtils.isEmpty(context.getTargetList())) {
             return;
         }
 
         // 获取字段映射规则
-        for (Map row : config.getData()) {
+        for (Map row : context.getTargetList()) {
             // 根据目标字段类型转换值
-            for (Field f : config.getFields()) {
+            for (Field f : context.getTargetFields()) {
                 if (null == f) {
                     continue;
                 }
@@ -108,7 +108,7 @@ public abstract class AbstractConnector {
                     try {
                         row.put(f.getName(), valueMapper.convertValue(connectorInstance, row.get(f.getName())));
                     } catch (Exception e) {
-                        logger.error("convert value error: ({}, {})", f.getName(), row.get(f.getName()));
+                        logger.error("convert value error: ({}, {}, {})", context.getTargetTableName(), f.getName(), row.get(f.getName()));
                         throw new SdkException(e);
                     }
                 }
@@ -116,13 +116,13 @@ public abstract class AbstractConnector {
         }
     }
 
-    public void convertProcessBeforeWriter(SchemaResolver resolver, WriterBatchConfig config) {
-        if (CollectionUtils.isEmpty(config.getFields()) || CollectionUtils.isEmpty(config.getData())) {
+    public void convertProcessBeforeWriter(SchemaResolver resolver, PluginContext context) {
+        if (CollectionUtils.isEmpty(context.getTargetFields()) || CollectionUtils.isEmpty(context.getTargetList())) {
             return;
         }
 
-        for (Map row : config.getData()) {
-            for (Field f : config.getFields()) {
+        for (Map row : context.getTargetList()) {
+            for (Field f : context.getTargetFields()) {
                 if (null == f) {
                     continue;
                 }
@@ -131,7 +131,7 @@ public abstract class AbstractConnector {
                     Object o = resolver.merge(row.get(f.getName()), f);
                     row.put(f.getName(), resolver.convert(o, f));
                 } catch (Exception e) {
-                    logger.error(String.format("convert value error: (%s, %s, %s)", config.getTableName(), f.getName(), row.get(f.getName())), e);
+                    logger.error(String.format("convert value error: (%s, %s, %s)", context.getTargetTableName(), f.getName(), row.get(f.getName())), e);
                     throw new SdkException(e);
                 }
             }

+ 21 - 19
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDatabaseConnector.java

@@ -18,6 +18,7 @@ import org.dbsyncer.sdk.enums.QuartzFilterEnum;
 import org.dbsyncer.sdk.enums.SqlBuilderEnum;
 import org.dbsyncer.sdk.enums.TableTypeEnum;
 import org.dbsyncer.sdk.model.*;
+import org.dbsyncer.sdk.plugin.PluginContext;
 import org.dbsyncer.sdk.plugin.ReaderContext;
 import org.dbsyncer.sdk.spi.ConnectorService;
 import org.dbsyncer.sdk.util.DatabaseUtil;
@@ -148,14 +149,15 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     }
 
     @Override
-    public Result writer(DatabaseConnectorInstance connectorInstance, WriterBatchConfig config) {
-        String event = config.getEvent();
-        List<Map> data = config.getData();
+    public Result writer(DatabaseConnectorInstance connectorInstance, PluginContext context) {
+        String event = context.getEvent();
+        List<Map> data = context.getTargetList();
+        List<Field> targetFields = context.getTargetFields();
 
         // 1、获取SQL
-        String executeSql = config.getCommand().get(event);
+        String executeSql = context.getCommand().get(event);
         Assert.hasText(executeSql, "执行SQL语句不能为空.");
-        if (CollectionUtils.isEmpty(config.getFields())) {
+        if (CollectionUtils.isEmpty(targetFields)) {
             logger.error("writer fields can not be empty.");
             throw new SdkException("writer fields can not be empty.");
         }
@@ -163,8 +165,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             logger.error("writer data can not be empty.");
             throw new SdkException("writer data can not be empty.");
         }
-        List<Field> fields = new ArrayList<>(config.getFields());
-        List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
+        List<Field> fields = new ArrayList<>(targetFields);
+        List<Field> pkFields = PrimaryKeyUtil.findExistPrimaryKeyFields(targetFields);
         // Update / Delete
         if (!isInsert(event)) {
             if (isDelete(event)) {
@@ -181,8 +183,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             // 2、设置参数
             execute = connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(executeSql, batchRows(fields, data)));
         } catch (Exception e) {
-            if (config.isForceUpdate()) {
-                data.forEach(row -> forceUpdate(result, connectorInstance, config, pkFields, row));
+            if (context.isForceUpdate()) {
+                data.forEach(row -> forceUpdate(result, connectorInstance, context, pkFields, row));
             }
         }
 
@@ -193,8 +195,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
                     result.getSuccessData().add(data.get(i));
                     continue;
                 }
-                if (config.isForceUpdate()) {
-                    forceUpdate(result, connectorInstance, config, pkFields, data.get(i));
+                if (context.isForceUpdate()) {
+                    forceUpdate(result, connectorInstance, context, pkFields, data.get(i));
                 }
             }
         }
@@ -567,11 +569,11 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return args;
     }
 
-    private void forceUpdate(Result result, DatabaseConnectorInstance connectorInstance, WriterBatchConfig config, List<Field> pkFields,
+    private void forceUpdate(Result result, DatabaseConnectorInstance connectorInstance, PluginContext context, List<Field> pkFields,
                              Map row) {
-        if (isUpdate(config.getEvent()) || isInsert(config.getEvent())) {
+        if (isUpdate(context.getEvent()) || isInsert(context.getEvent())) {
             // 存在执行覆盖更新,否则写入
-            final String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_EXIST);
+            final String queryCount = context.getCommand().get(ConnectorConstant.OPERTION_QUERY_EXIST);
             int size = pkFields.size();
             Object[] args = new Object[size];
             for (int i = 0; i < size; i++) {
@@ -579,17 +581,17 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             }
             final String event = existRow(connectorInstance, queryCount, args) ? ConnectorConstant.OPERTION_UPDATE
                     : ConnectorConstant.OPERTION_INSERT;
-            logger.warn("{}表执行{}失败, 重新执行{}, {}", config.getTableName(), config.getEvent(), event, row);
-            writer(result, connectorInstance, config, pkFields, row, event);
+            logger.warn("{}表执行{}失败, 重新执行{}, {}", context.getTargetTableName(), context.getEvent(), event, row);
+            writer(result, connectorInstance, context, pkFields, row, event);
         }
     }
 
-    private void writer(Result result, DatabaseConnectorInstance connectorInstance, WriterBatchConfig config, List<Field> pkFields, Map row,
+    private void writer(Result result, DatabaseConnectorInstance connectorInstance, PluginContext context, List<Field> pkFields, Map row,
                         String event) {
         // 1、获取 SQL
-        String sql = config.getCommand().get(event);
+        String sql = context.getCommand().get(event);
 
-        List<Field> fields = new ArrayList<>(config.getFields());
+        List<Field> fields = new ArrayList<>(context.getTargetFields());
         // Update / Delete
         if (!isInsert(event)) {
             if (isDelete(event)) {

+ 6 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/plugin/AbstractPluginContext.java

@@ -11,7 +11,7 @@ import java.util.Map;
  * @version 1.0.0
  * @date 2022/6/30 16:00
  */
-public abstract class AbstractPluginContext implements PluginContext {
+public abstract class AbstractPluginContext implements PluginContext, Cloneable {
 
     /**
      * 是否终止任务
@@ -212,4 +212,9 @@ public abstract class AbstractPluginContext implements PluginContext {
     public void setPluginExtInfo(String pluginExtInfo) {
         this.pluginExtInfo = pluginExtInfo;
     }
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
 }

+ 9 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/plugin/PluginContext.java

@@ -95,6 +95,8 @@ public interface PluginContext extends BaseContext {
      */
     List<Map> getTargetList();
 
+    void setTargetList(List<Map> targetList);
+
     /**
      * 获取插件参数
      *
@@ -102,4 +104,11 @@ public interface PluginContext extends BaseContext {
      */
     String getPluginExtInfo();
 
+    /**
+     * 浅拷贝
+     *
+     * @return
+     * @throws CloneNotSupportedException
+     */
+    Object clone() throws CloneNotSupportedException;
 }

+ 3 - 3
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/spi/ConnectorService.java

@@ -7,7 +7,6 @@ import org.dbsyncer.common.model.Result;
 import org.dbsyncer.sdk.SdkException;
 import org.dbsyncer.sdk.config.CommandConfig;
 import org.dbsyncer.sdk.config.DDLConfig;
-import org.dbsyncer.sdk.config.WriterBatchConfig;
 import org.dbsyncer.sdk.connector.ConfigValidator;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
 import org.dbsyncer.sdk.enums.ListenerTypeEnum;
@@ -15,6 +14,7 @@ import org.dbsyncer.sdk.listener.Listener;
 import org.dbsyncer.sdk.model.ConnectorConfig;
 import org.dbsyncer.sdk.model.MetaInfo;
 import org.dbsyncer.sdk.model.Table;
+import org.dbsyncer.sdk.plugin.PluginContext;
 import org.dbsyncer.sdk.plugin.ReaderContext;
 import org.dbsyncer.sdk.schema.SchemaResolver;
 import org.dbsyncer.sdk.storage.StorageService;
@@ -122,10 +122,10 @@ public interface ConnectorService<I extends ConnectorInstance, C extends Connect
      * 批量写入目标源数据
      *
      * @param connectorInstance
-     * @param connectorConfig
+     * @param context
      * @return
      */
-    Result writer(I connectorInstance, WriterBatchConfig connectorConfig);
+    Result writer(I connectorInstance, PluginContext context);
 
     /**
      * 执行DDL命令

+ 3 - 8
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/util/PrimaryKeyUtil.java

@@ -7,7 +7,6 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.SdkException;
-import org.dbsyncer.sdk.config.WriterBatchConfig;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.model.Table;
 
@@ -49,15 +48,11 @@ public abstract class PrimaryKeyUtil {
     /**
      * 返回主键属性字段集合
      *
-     * @param config
+     * @param fields
      * @return
      */
-    public static List<Field> findConfigPrimaryKeyFields(WriterBatchConfig config) {
-        if (null == config) {
-            throw new SdkException("The config is null.");
-        }
-
-        List<Field> list = findPrimaryKeyFields(config.getFields());
+    public static List<Field> findExistPrimaryKeyFields(List<Field> fields) {
+        List<Field> list = findPrimaryKeyFields(fields);
         if (CollectionUtils.isEmpty(list)) {
             throw new SdkException("主键为空");
         }