Browse Source

!95 merge
Merge pull request !95 from AE86/V_1.0.0_RC

AE86 2 years ago
parent
commit
4c3b7b242b
26 changed files with 532 additions and 280 deletions
  1. 71 4
      dbsyncer-common/src/main/java/org/dbsyncer/common/model/AbstractConvertContext.java
  2. 5 21
      dbsyncer-common/src/main/java/org/dbsyncer/common/model/FullConvertContext.java
  3. 7 33
      dbsyncer-common/src/main/java/org/dbsyncer/common/model/IncrementConvertContext.java
  4. 26 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/model/Result.java
  5. 65 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/spi/ConvertContext.java
  6. 4 16
      dbsyncer-common/src/main/java/org/dbsyncer/common/spi/ConvertService.java
  7. 18 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  8. 12 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java
  9. 3 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java
  10. 34 16
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  11. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java
  12. 3 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java
  13. 3 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  14. 21 10
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java
  15. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableFullFlushStrategy.java
  16. 26 34
      dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java
  17. 8 6
      dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/service/DemoConvertServiceImpl.java
  18. 3 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java
  19. 109 87
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java
  20. 56 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/UnderlineToCamelUtils.java
  21. 2 0
      dbsyncer-storage/src/main/resources/dbsyncer_data.sql
  22. 1 0
      dbsyncer-storage/src/main/resources/dbsyncer_upgrade.sql
  23. 2 2
      dbsyncer-web/src/main/java/org/dbsyncer/web/controller/plugin/PluginController.java
  24. 50 40
      dbsyncer-web/src/main/resources/public/plugin/plugin.html
  25. BIN
      dbsyncer-web/src/main/resources/static/img/plugin/jar.png
  26. BIN
      dbsyncer-web/src/main/resources/static/img/plugin/spi.png

+ 71 - 4
dbsyncer-common/src/main/java/org/dbsyncer/common/model/AbstractConvertContext.java

@@ -1,14 +1,24 @@
 package org.dbsyncer.common.model;
 
 import org.dbsyncer.common.spi.ConnectorMapper;
+import org.dbsyncer.common.spi.ConvertContext;
 import org.dbsyncer.common.spi.ProxyApplicationContext;
 
+import java.util.List;
+import java.util.Map;
+
 /**
  * @author AE86
  * @version 1.0.0
  * @date 2022/6/30 16:00
  */
-public abstract class AbstractConvertContext {
+public abstract class AbstractConvertContext implements ConvertContext {
+
+    /**
+     * 是否终止任务
+     * <p>true:目标源不再接收同步数据,默认值false
+     */
+    private boolean terminated;
 
     /**
      * Spring上下文
@@ -21,19 +31,76 @@ public abstract class AbstractConvertContext {
     protected ConnectorMapper targetConnectorMapper;
 
     /**
-     * 目标表
+     * 数据源表
+     */
+    protected String sourceTableName;
+
+    /**
+     * 目标源表
      */
     protected String targetTableName;
 
+    /**
+     * 同步事件(INSERT/UPDATE/DELETE)
+     */
+    protected String event;
+
+    /**
+     * 数据源数据集合
+     */
+    protected List<Map> sourceList;
+
+    /**
+     * 目标源源数据集合
+     */
+    protected List<Map> targetList;
+
+    public void setContext(ProxyApplicationContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return terminated;
+    }
+
+    @Override
+    public void setTerminated(boolean terminated) {
+        this.terminated = terminated;
+    }
+
+    @Override
     public ProxyApplicationContext getContext() {
         return context;
     }
 
+    @Override
+    public ConnectorMapper getTargetConnectorMapper() {
+        return targetConnectorMapper;
+    }
+
+    @Override
+    public String getSourceTableName() {
+        return sourceTableName;
+    }
+
+    @Override
     public String getTargetTableName() {
         return targetTableName;
     }
 
-    public ConnectorMapper getTargetConnectorMapper() {
-        return targetConnectorMapper;
+    @Override
+    public String getEvent() {
+        return event;
+    }
+
+    @Override
+    public List<Map> getSourceList() {
+        return sourceList;
+    }
+
+    @Override
+    public List<Map> getTargetList() {
+        return targetList;
     }
 }

+ 5 - 21
dbsyncer-common/src/main/java/org/dbsyncer/common/model/FullConvertContext.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.common.model;
 
 import org.dbsyncer.common.spi.ConnectorMapper;
-import org.dbsyncer.common.spi.ProxyApplicationContext;
 
 import java.util.List;
 import java.util.Map;
@@ -11,31 +10,16 @@ import java.util.Map;
  * @version 1.0.0
  * @date 2022/6/30 16:04
  */
-public class FullConvertContext extends AbstractConvertContext {
+public final class FullConvertContext extends AbstractConvertContext {
 
-    /**
-     * 全量同步,数据源数据集合
-     */
-    private List<Map> sourceList;
-
-    /**
-     * 全量同步,目标源源数据集合
-     */
-    private List<Map> targetList;
-
-    public FullConvertContext(ProxyApplicationContext context, ConnectorMapper targetConnectorMapper, String targetTableName, List<Map> sourceList, List<Map> targetList) {
-        this.context = context;
+    public FullConvertContext(ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event,
+                              List<Map> sourceList, List<Map> targetList) {
         this.targetConnectorMapper = targetConnectorMapper;
+        this.sourceTableName = sourceTableName;
         this.targetTableName = targetTableName;
+        this.event = event;
         this.sourceList = sourceList;
         this.targetList = targetList;
     }
 
-    public List<Map> getSourceList() {
-        return sourceList;
-    }
-
-    public List<Map> getTargetList() {
-        return targetList;
-    }
 }

+ 7 - 33
dbsyncer-common/src/main/java/org/dbsyncer/common/model/IncrementConvertContext.java

@@ -1,8 +1,8 @@
 package org.dbsyncer.common.model;
 
 import org.dbsyncer.common.spi.ConnectorMapper;
-import org.dbsyncer.common.spi.ProxyApplicationContext;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -10,41 +10,15 @@ import java.util.Map;
  * @version 1.0.0
  * @date 2022/6/30 16:06
  */
-public class IncrementConvertContext extends AbstractConvertContext {
+public final class IncrementConvertContext extends AbstractConvertContext {
 
-    /**
-     * 增量同步,事件(INSERT/UPDATE/DELETE)
-     */
-    private String event;
-
-    /**
-     * 增量同步,数据源数据
-     */
-    private Map source;
-
-    /**
-     * 增量同步,目标源数据
-     */
-    private Map target;
-
-    public IncrementConvertContext(ProxyApplicationContext context, ConnectorMapper targetConnectorMapper, String targetTableName, String event, Map source, Map target) {
-        this.context = context;
+    public IncrementConvertContext(ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event,
+                                   List<Map> sourceList, List<Map> targetList) {
         this.targetConnectorMapper = targetConnectorMapper;
+        this.sourceTableName = sourceTableName;
         this.targetTableName = targetTableName;
         this.event = event;
-        this.source = source;
-        this.target = target;
-    }
-
-    public String getEvent() {
-        return event;
-    }
-
-    public Map getSource() {
-        return source;
-    }
-
-    public Map getTarget() {
-        return target;
+        this.sourceList = sourceList;
+        this.targetList = targetList;
     }
 }

+ 26 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/model/Result.java

@@ -20,6 +20,16 @@ public class Result<T> {
      */
     private StringBuffer error = new StringBuffer();
 
+    /**
+     * 驱动表映射关系ID
+     */
+    private String tableGroupId;
+
+    /**
+     * 目标表名称
+     */
+    private String targetTableGroupName;
+
     private final Object LOCK = new Object();
 
     public Result() {
@@ -62,4 +72,20 @@ public class Result<T> {
             this.successData.addAll(successData);
         }
     }
+
+    public String getTableGroupId() {
+        return tableGroupId;
+    }
+
+    public void setTableGroupId(String tableGroupId) {
+        this.tableGroupId = tableGroupId;
+    }
+
+    public String getTargetTableGroupName() {
+        return targetTableGroupName;
+    }
+
+    public void setTargetTableGroupName(String targetTableGroupName) {
+        this.targetTableGroupName = targetTableGroupName;
+    }
 }

+ 65 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/spi/ConvertContext.java

@@ -0,0 +1,65 @@
+package org.dbsyncer.common.spi;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 插件转换上下文
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/10/28 20:26
+ */
+public interface ConvertContext {
+
+    /**
+     * 是否终止同步数据到目标源库
+     *
+     * @return
+     */
+    boolean isTerminated();
+
+    /**
+     * 是否终止同步数据到目标源库
+     * <p>true: 终止,默认值false
+     *
+     * @param terminated
+     */
+    void setTerminated(boolean terminated);
+
+    /**
+     * Spring上下文
+     */
+    ProxyApplicationContext getContext();
+
+    /**
+     * 目标源连接实例
+     */
+    ConnectorMapper getTargetConnectorMapper();
+
+    /**
+     * 数据源表
+     */
+    String getSourceTableName();
+
+    /**
+     * 目标源表
+     */
+    String getTargetTableName();
+
+    /**
+     * 增量同步,事件(INSERT/UPDATE/DELETE)
+     */
+    String getEvent();
+
+    /**
+     * 数据源数据集合
+     */
+    List<Map> getSourceList();
+
+    /**
+     * 目标源源数据集合
+     */
+    List<Map> getTargetList();
+
+}

+ 4 - 16
dbsyncer-common/src/main/java/org/dbsyncer/common/spi/ConvertService.java

@@ -1,8 +1,5 @@
 package org.dbsyncer.common.spi;
 
-import org.dbsyncer.common.model.FullConvertContext;
-import org.dbsyncer.common.model.IncrementConvertContext;
-
 /**
  * 插件扩展服务接口
  * <p>全量同步/增量同步,扩展转换</p>
@@ -14,27 +11,18 @@ import org.dbsyncer.common.model.IncrementConvertContext;
 public interface ConvertService {
 
     /**
-     * 全量同步
-     *
-     * @param context 上下文
-     */
-    void convert(FullConvertContext context);
-
-    /**
-     * 增量同步
+     * 全量同步/增量同步
      *
      * @param context 上下文
      */
-    void convert(IncrementConvertContext context);
+    void convert(ConvertContext context);
 
     /**
-     * 数据插入后处理接口
+     * 全量同步/增量同步完成后执行处理
      *
      * @param context 上下文
-     * @author wangxiri
-     * @date 2022/10/25
      */
-    default void postProcessAfter(IncrementConvertContext context) {
+    default void postProcessAfter(ConvertContext context) {
     }
 
     /**

+ 18 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -165,11 +165,11 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         if (null != execute) {
             int batchSize = execute.length;
             for (int i = 0; i < batchSize; i++) {
-                if (execute[i] == 0 || execute[i] == -2) {
-                    forceUpdate(result, connectorMapper, config, pkField, data.get(i));
+                if (execute[i] == 1 || execute[i] == -2) {
+                    result.getSuccessData().add(data.get(i));
                     continue;
                 }
-                result.getSuccessData().add(data.get(i));
+                forceUpdate(result, connectorMapper, config, pkField, data.get(i));
             }
         }
         return result;
@@ -246,6 +246,16 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return "";
     }
 
+    /**
+     * 获取条件值引号
+     *
+     * @param value
+     * @return
+     */
+    protected String buildSqlFilterWithQuotation(String value) {
+        return "'";
+    }
+
     /**
      * 获取架构名
      *
@@ -440,7 +450,11 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         for (int i = 0; i < size; i++) {
             c = list.get(i);
             // "USER" = 'zhangsan'
-            sql.append(quotation).append(c.getName()).append(quotation).append(" ").append(c.getFilter()).append(" ").append("'").append(c.getValue()).append("'");
+            sql.append(quotation).append(c.getName()).append(quotation);
+            sql.append(" ").append(c.getFilter()).append(" ");
+            // 如果使用了函数则不加引号
+            String filterValueQuotation = buildSqlFilterWithQuotation(c.getValue());
+            sql.append(filterValueQuotation).append(c.getValue()).append(filterValueQuotation);
             if (i < end) {
                 sql.append(" ").append(queryOperator).append(" ");
             }

+ 12 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -29,6 +29,18 @@ public final class OracleConnector extends AbstractDatabaseConnector {
         return "\"";
     }
 
+    @Override
+    protected String buildSqlFilterWithQuotation(String value) {
+        if (StringUtil.isNotBlank(value)) {
+            // 支持Oracle系统函数(to_char/to_date/to_timestamp/to_number)
+            String val = value.toLowerCase();
+            if (StringUtil.startsWith(val, "to_") && StringUtil.endsWith(val, ")")) {
+                return "";
+            }
+        }
+        return super.buildSqlFilterWithQuotation(value);
+    }
+
     @Override
     protected String getValidationQuery() {
         return "select 1 from dual";

+ 3 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -4,6 +4,7 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.spi.ConnectorMapper;
+import org.dbsyncer.common.spi.ConvertContext;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
@@ -165,9 +166,10 @@ public interface Parser {
     /**
      * 批执行
      *
+     * @param context
      * @param batchWriter
      * @return
      */
-    Result writeBatch(BatchWriter batchWriter);
+    Result writeBatch(ConvertContext context, BatchWriter batchWriter);
 
 }

+ 34 - 16
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -5,8 +5,10 @@ import com.alibaba.fastjson.JSONObject;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
+import org.dbsyncer.common.model.FullConvertContext;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.spi.ConnectorMapper;
+import org.dbsyncer.common.spi.ConvertContext;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.connector.ConnectorFactory;
@@ -25,13 +27,18 @@ import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.event.FullRefreshEvent;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
-import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.model.BatchWriter;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Picker;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.Task;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.strategy.ParserStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
-import org.dbsyncer.plugin.config.Plugin;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -252,6 +259,7 @@ public class ParserFactory implements Parser {
         int batchSize = mapping.getBatchNum();
         ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
         ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
+        final String event = ConnectorConstant.OPERTION_INSERT;
 
         for (; ; ) {
             if (!task.isRunning()) {
@@ -274,20 +282,22 @@ public class ParserFactory implements Parser {
             ConvertUtil.convert(group.getConvert(), target);
 
             // 4、插件转换
-            Plugin plugin = group.getPlugin();
-            pluginFactory.convert(tConnectorMapper, plugin, tTableName, data, target);
+            final FullConvertContext context = new FullConvertContext(tConnectorMapper, sTableName, tTableName, event, data, target);
+            pluginFactory.convert(group.getPlugin(), context);
 
             // 5、写入目标源
-            BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, tTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
-            Result writer = writeBatch(batchWriter, executorService);
+            BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, tTableName, event, picker.getTargetFields(), target, batchSize);
+            Result result = writeBatch(context, batchWriter, executorService);
 
-            // 6、执行批量处理后的
-            pluginFactory.postProcessAfter(tConnectorMapper, plugin, tTableName, ConnectorConstant.OPERTION_INSERT, data, target);
+            // 6、同步完成后通知插件做后置处理
+            pluginFactory.postProcessAfter(group.getPlugin(), context);
 
             // 7、更新结果
             task.setPageIndex(task.getPageIndex() + 1);
             task.setCursor(getLastCursor(data, pk));
-            flush(task, writer);
+            result.setTableGroupId(tableGroup.getId());
+            result.setTargetTableGroupName(tTableName);
+            flush(task, result);
 
             // 8、判断尾页
             if (data.size() < pageSize) {
@@ -306,22 +316,31 @@ public class ParserFactory implements Parser {
     /**
      * 批量写入
      *
+     * @param context
      * @param batchWriter
      * @return
      */
     @Override
-    public Result writeBatch(BatchWriter batchWriter) {
-        return writeBatch(batchWriter, taskExecutor);
+    public Result writeBatch(ConvertContext context, BatchWriter batchWriter) {
+        return writeBatch(context, batchWriter, taskExecutor);
     }
 
     /**
      * 批量写入
      *
+     * @param context
      * @param batchWriter
      * @param taskExecutor
      * @return
      */
-    private Result writeBatch(BatchWriter batchWriter, Executor taskExecutor) {
+    private Result writeBatch(ConvertContext context, BatchWriter batchWriter, Executor taskExecutor) {
+        final Result result = new Result();
+        // 终止同步数据到目标源库
+        if(context.isTerminated()){
+            result.getSuccessData().addAll(batchWriter.getDataList());
+            return result;
+        }
+
         List<Map> dataList = batchWriter.getDataList();
         int batchSize = batchWriter.getBatchSize();
         String tableName = batchWriter.getTableName();
@@ -338,7 +357,6 @@ public class ParserFactory implements Parser {
         // 批量任务, 拆分
         int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
 
-        final Result result = new Result();
         final CountDownLatch latch = new CountDownLatch(taskSize);
         int fromIndex = 0;
         int toIndex = batchSize;
@@ -378,10 +396,10 @@ public class ParserFactory implements Parser {
      * 更新缓存
      *
      * @param task
-     * @param writer
+     * @param result
      */
-    private void flush(Task task, Result writer) {
-        flushStrategy.flushFullData(task.getId(), writer, ConnectorConstant.OPERTION_INSERT);
+    private void flush(Task task, Result result) {
+        flushStrategy.flushFullData(task.getId(), result, ConnectorConstant.OPERTION_INSERT);
 
         // 发布刷新事件给FullExtractor
         task.setEndTime(Instant.now().toEpochMilli());

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -41,12 +41,12 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
 
         if (flushDataConfig.isWriterFail() && !CollectionUtils.isEmpty(result.getFailData())) {
             final String error = StringUtil.substring(result.getError().toString(), 0, flushDataConfig.getMaxErrorLength());
-            flushService.write(metaId, event, false, result.getFailData(), error);
+            flushService.write(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, false, result.getFailData(), error);
         }
 
         // 是否写增量数据
         if (flushDataConfig.isWriterSuccess() && !CollectionUtils.isEmpty(result.getSuccessData())) {
-            flushService.write(metaId, event, true, result.getSuccessData(), "");
+            flushService.write(metaId, result.getTableGroupId(), result.getTargetTableGroupName(), event, true, result.getSuccessData(), "");
         }
     }
 

+ 3 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushService.java

@@ -20,9 +20,11 @@ public interface FlushService {
      * 记录数据
      *
      * @param metaId
+     * @param tableGroupId
+     * @param targetTableGroupName
      * @param event
      * @param success
      * @param data
      */
-    void write(String metaId, String event, boolean success, List<Map> data, String error);
+    void write(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error);
 }

+ 3 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -59,12 +59,14 @@ public class FlushServiceImpl implements FlushService {
     }
 
     @Override
-    public void write(String metaId, String event, boolean success, List<Map> data, String error) {
+    public void write(String metaId, String tableGroupId, String targetTableGroupName, String event, boolean success, List<Map> data, String error) {
         long now = Instant.now().toEpochMilli();
         data.forEach(r -> {
             Map<String, Object> row = new HashMap();
             row.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
             row.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
+            row.put(ConfigConstant.DATA_TABLE_GROUP_ID, tableGroupId);
+            row.put(ConfigConstant.DATA_TARGET_TABLE_NAME, targetTableGroupName);
             row.put(ConfigConstant.DATA_EVENT, event);
             row.put(ConfigConstant.DATA_ERROR, substring(error));
             try {

+ 21 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -3,18 +3,25 @@ package org.dbsyncer.parser.flush.impl;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.config.BufferActuatorConfig;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
+import org.dbsyncer.common.model.IncrementConvertContext;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.spi.ConnectorMapper;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.parser.ParserFactory;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
-import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.model.BatchWriter;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Picker;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.WriterRequest;
+import org.dbsyncer.parser.model.WriterResponse;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.strategy.ParserStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
+import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
-import org.dbsyncer.plugin.config.Plugin;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
@@ -75,31 +82,35 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         // 1、获取配置信息
         final TableGroup tableGroup = cacheService.get(response.getTableGroupId(), TableGroup.class);
         final Mapping mapping = cacheService.get(tableGroup.getMappingId(), Mapping.class);
-        final String targetTableName = tableGroup.getTargetTable().getName();
+        final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
+        final String sourceTableName = group.getSourceTable().getName();
+        final String targetTableName = group.getTargetTable().getName();
         final String event = response.getEvent();
         final List<Map> sourceDataList = response.getDataList();
 
         // 2、映射字段
-        final Picker picker = new Picker(tableGroup.getFieldMapping());
+        final Picker picker = new Picker(group.getFieldMapping());
         List<Map> targetDataList = picker.pickData(sourceDataList);
 
         // 3、参数转换
-        ConvertUtil.convert(tableGroup.getConvert(), targetDataList);
+        ConvertUtil.convert(group.getConvert(), targetDataList);
 
         // 4、插件转换
         ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
-        Plugin plugin = tableGroup.getPlugin();
-        pluginFactory.convert(targetConnectorMapper, plugin, targetTableName, event, sourceDataList, targetDataList);
+        final IncrementConvertContext context = new IncrementConvertContext(targetConnectorMapper, sourceTableName, targetTableName, event, sourceDataList, targetDataList);
+        pluginFactory.convert(group.getPlugin(), context);
 
         // 5、批量执行同步
-        BatchWriter batchWriter = new BatchWriter(targetConnectorMapper, tableGroup.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, bufferActuatorConfig.getWriterBatchCount());
-        Result result = parserFactory.writeBatch(batchWriter);
+        Result result = parserFactory.writeBatch(context, new BatchWriter(targetConnectorMapper, group.getCommand(), targetTableName, event,
+                picker.getTargetFields(), targetDataList, bufferActuatorConfig.getWriterBatchCount()));
 
         // 6、持久化同步结果
+        result.setTableGroupId(tableGroup.getId());
+        result.setTargetTableGroupName(targetTableName);
         flushStrategy.flushIncrementData(mapping.getMetaId(), result, event);
 
         // 7、执行批量处理后的
-        pluginFactory.postProcessAfter(targetConnectorMapper, plugin, targetTableName, event, sourceDataList, targetDataList);
+        pluginFactory.postProcessAfter(group.getPlugin(), context);
 
         // 8、完成处理
         parserStrategy.complete(response.getMessageIds());

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableFullFlushStrategy.java

@@ -31,7 +31,7 @@ public final class DisableFullFlushStrategy extends AbstractFlushStrategy {
         if (!CollectionUtils.isEmpty(result.getFailData())) {
             logger.error(result.getError().toString());
             LogType logType = LogType.TableGroupLog.FULL_FAILED;
-            logService.log(logType, "%s:%s", logType.getMessage(), result.getError().toString());
+            logService.log(logType, "%s:%s:%s", result.getTargetTableGroupName(), logType.getMessage(), result.getError().toString());
         }
     }
 

+ 26 - 34
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java

@@ -1,9 +1,8 @@
 package org.dbsyncer.plugin;
 
 import org.apache.commons.io.FileUtils;
-import org.dbsyncer.common.model.FullConvertContext;
-import org.dbsyncer.common.model.IncrementConvertContext;
-import org.dbsyncer.common.spi.ConnectorMapper;
+import org.dbsyncer.common.model.AbstractConvertContext;
+import org.dbsyncer.common.spi.ConvertContext;
 import org.dbsyncer.common.spi.ConvertService;
 import org.dbsyncer.common.spi.ProxyApplicationContext;
 import org.dbsyncer.common.util.CollectionUtils;
@@ -19,7 +18,13 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
 import java.util.stream.Collectors;
 
 /**
@@ -78,7 +83,7 @@ public class PluginFactory {
         } catch (IOException e) {
             logger.error(e.getMessage());
         }
-        Collection<File> files = FileUtils.listFiles(new File(PLUGIN_PATH), new String[]{"jar"}, true);
+        Collection<File> files = FileUtils.listFiles(new File(PLUGIN_PATH), new String[] {"jar"}, true);
         if (!CollectionUtils.isEmpty(files)) {
             files.forEach(f -> loadPlugin(f));
         }
@@ -97,43 +102,31 @@ public class PluginFactory {
         return Collections.unmodifiableList(plugins);
     }
 
-    public void convert(ConnectorMapper targetConnectorMapper, Plugin plugin, String targetTableName, List<Map> sourceList, List<Map> targetList) {
-        if (null != plugin && service.containsKey(plugin.getClassName())) {
-            service.get(plugin.getClassName()).convert(new FullConvertContext(applicationContextProxy, targetConnectorMapper, targetTableName, sourceList, targetList));
-        }
-    }
-
-    public void convert(ConnectorMapper targetConnectorMapper, Plugin plugin,String targetTableName, String event, List<Map> sourceList, List<Map> targetList) {
+    /**
+     * 全量同步/增量同步
+     *
+     * @param plugin
+     * @param context
+     */
+    public void convert(Plugin plugin, ConvertContext context) {
         if (null != plugin && service.containsKey(plugin.getClassName())) {
-            ConvertService convertService = service.get(plugin.getClassName());
-            int size = sourceList.size();
-            if (size == targetList.size()) {
-                for (int i = 0; i < size; i++) {
-                    convertService.convert(new IncrementConvertContext(applicationContextProxy, targetConnectorMapper, targetTableName, event, sourceList.get(i), targetList.get(i)));
-                }
+            if (context instanceof AbstractConvertContext) {
+                AbstractConvertContext ctx = (AbstractConvertContext) context;
+                ctx.setContext(applicationContextProxy);
             }
+            service.get(plugin.getClassName()).convert(context);
         }
     }
 
     /**
-     * 完成同步后执行处理
+     * 全量同步/增量同步完成后执行处理
      *
-     * @param targetConnectorMapper
      * @param plugin
-     * @param targetTableName
-     * @param event
-     * @param sourceList
-     * @param targetList
+     * @param context
      */
-    public void postProcessAfter(ConnectorMapper targetConnectorMapper, Plugin plugin, String targetTableName, String event, List<Map> sourceList, List<Map> targetList) {
+    public void postProcessAfter(Plugin plugin, ConvertContext context) {
         if (null != plugin && service.containsKey(plugin.getClassName())) {
-            ConvertService convertService = service.get(plugin.getClassName());
-            int size = sourceList.size();
-            if (size == targetList.size()) {
-                for (int i = 0; i < size; i++) {
-                    convertService.postProcessAfter(new IncrementConvertContext(applicationContextProxy, targetConnectorMapper, targetTableName, event, sourceList.get(i), targetList.get(i)));
-                }
-            }
+            service.get(plugin.getClassName()).postProcessAfter(context);
         }
     }
 
@@ -146,7 +139,7 @@ public class PluginFactory {
         try {
             String fileName = jar.getName();
             URL url = jar.toURI().toURL();
-            URLClassLoader loader = new URLClassLoader(new URL[]{url}, Thread.currentThread().getContextClassLoader());
+            URLClassLoader loader = new URLClassLoader(new URL[] {url}, Thread.currentThread().getContextClassLoader());
             ServiceLoader<ConvertService> services = ServiceLoader.load(ConvertService.class, loader);
             for (ConvertService s : services) {
                 String className = s.getClass().getName();
@@ -159,5 +152,4 @@ public class PluginFactory {
         }
 
     }
-
 }

+ 8 - 6
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/service/DemoConvertServiceImpl.java

@@ -1,8 +1,7 @@
 package org.dbsyncer.plugin.service;
 
 import org.dbsyncer.common.config.AppConfig;
-import org.dbsyncer.common.model.FullConvertContext;
-import org.dbsyncer.common.model.IncrementConvertContext;
+import org.dbsyncer.common.spi.ConvertContext;
 import org.dbsyncer.common.spi.ConvertService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,12 +17,15 @@ public class DemoConvertServiceImpl implements ConvertService {
     private AppConfig appConfig;
 
     @Override
-    public void convert(FullConvertContext context) {
+    public void convert(ConvertContext context) {
+        context.setTerminated(true);
+        logger.info("插件正在处理同步数据,数据源表:{},目标源表:{},事件:{},条数:{}", context.getSourceTableName(), context.getTargetTableName(),
+                context.getEvent(), context.getTargetList().size());
     }
 
     @Override
-    public void convert(IncrementConvertContext context) {
-        logger.info("插件正在处理同步数据,事件:{},数据:{}", context.getEvent(), context.getSource());
+    public void postProcessAfter(ConvertContext context) {
+        logger.info("插件正在处理同步成功的数据,目标源表:{},事件:{},条数:{}", context.getTargetTableName(), context.getEvent(), context.getTargetList().size());
     }
 
     @Override
@@ -34,4 +36,4 @@ public class DemoConvertServiceImpl implements ConvertService {
     public String getName() {
         return "Demo";
     }
-}
+}

+ 3 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java

@@ -31,7 +31,9 @@ public class ConfigConstant {
      * 数据
      */
     public static final String DATA_SUCCESS = "success";
+    public static final String DATA_TABLE_GROUP_ID = "tableGroupId";
+    public static final String DATA_TARGET_TABLE_NAME = "targetTableName";
     public static final String DATA_EVENT = "event";
     public static final String DATA_ERROR = "error";
 
-}
+}

+ 109 - 87
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -1,14 +1,11 @@
 package org.dbsyncer.storage.support;
 
 import org.dbsyncer.common.model.Paging;
-import org.dbsyncer.common.spi.ConnectorMapper;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
-import org.dbsyncer.connector.config.WriterBatchConfig;
-import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.Database;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
@@ -22,6 +19,7 @@ import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Param;
 import org.dbsyncer.storage.query.Query;
+import org.dbsyncer.storage.util.UnderlineToCamelUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -33,9 +31,9 @@ import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
 import java.io.*;
+import java.sql.SQLSyntaxErrorException;
 import java.sql.Types;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -60,10 +58,10 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
 
     private static final String PREFIX_TABLE = "dbsyncer_";
     private static final String SHOW_TABLE = "show tables where Tables_in_%s = \"%s\"";
+    private static final String SHOW_DATA_TABLE = "show tables where Tables_in_%s like \"%s\"";
     private static final String DROP_TABLE = "DROP TABLE %s";
     private static final String TRUNCATE_TABLE = "TRUNCATE TABLE %s";
-    private static final String TABLE_CREATE_TIME = "create_time";
-    private static final String TABLE_UPDATE_TIME = "update_time";
+    private static final String UPGRADE_SQL = "upgrade";
 
     @Autowired
     private ConnectorFactory connectorFactory;
@@ -86,6 +84,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         connector = (Database) connectorFactory.getConnector(connectorMapper);
         database = DatabaseUtil.getDatabaseName(config.getUrl());
 
+        // 升级脚本
+        initUpgradeSql();
+
         // 初始化表
         initTable();
     }
@@ -120,8 +121,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     public void update(StorageEnum type, String table, Map params) {
         Executor executor = getExecutor(type, table);
         String sql = executor.getUpdate();
-        List<Object> args = getParams(executor, params);
-        args.add(params.get(ConfigConstant.CONFIG_MODEL_ID));
+        List<Object> args = getUpdateArgs(executor, params);
         int update = connectorMapper.execute(databaseTemplate -> databaseTemplate.update(sql, args.toArray()));
         Assert.isTrue(update > 0, "update failed");
     }
@@ -159,12 +159,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     public void insertData(StorageEnum type, String table, List<Map> list) {
         if (!CollectionUtils.isEmpty(list)) {
             Executor executor = getExecutor(type, table);
-            Map<String, String> command = new HashMap<>();
-            command.put(SqlBuilderEnum.INSERT.getName(), executor.getInsert());
-            ConnectorMapper connectorMapper = connectorFactory.connect(config);
-            connectorFactory.writer(connectorMapper, new WriterBatchConfig(table, ConnectorConstant.OPERTION_INSERT, command, executor.getFields(), list));
+            List<Object[]> args = list.stream().map(row -> getArgs(executor, row).toArray()).collect(Collectors.toList());
+            connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(executor.getInsert(), args));
         }
-
     }
 
     @Override
@@ -180,7 +177,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     private int executeInsert(StorageEnum type, String table, Map params) {
         Executor executor = getExecutor(type, table);
         String sql = executor.getInsert();
-        List<Object> args = getParams(executor, params);
+        List<Object> args = getArgs(executor, params);
         int insert = connectorMapper.execute(databaseTemplate -> databaseTemplate.update(sql, args.toArray()));
         if (insert < 1) {
             logger.error("table:{}, params:{}");
@@ -189,8 +186,24 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         return insert;
     }
 
-    private List<Object> getParams(Executor executor, Map params) {
-        return executor.getFieldPairs().stream().map(p -> params.get(p.labelName)).collect(Collectors.toList());
+    private List<Object> getArgs(Executor executor, Map params) {
+        return executor.getFields().stream().map(f -> params.get(f.getLabelName())).collect(Collectors.toList());
+    }
+
+    private List<Object> getUpdateArgs(Executor executor, Map params) {
+        List<Object> args = new ArrayList<>();
+        Object pk = null;
+        for (Field f : executor.getFields()) {
+            if (f.isPk()) {
+                pk = params.get(f.getLabelName());
+                continue;
+            }
+            args.add(params.get(f.getLabelName()));
+        }
+
+        Assert.notNull(pk, "The primaryKey is null.");
+        args.add(pk);
+        return args;
     }
 
     private Executor getExecutor(StorageEnum type, String table) {
@@ -207,7 +220,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
                 return tables.get(table);
             }
             // 不存在
-            Executor newExecutor = new Executor(executor.getGroup(), executor.getFieldPairs(), executor.getFields(), executor.isDynamicTableName(), executor.isSystemType(), executor.isOrderByUpdateTime());
+            Executor newExecutor = new Executor(executor.getGroup(), executor.getFields(), executor.isDynamicTableName(), executor.isSystemType(), executor.isOrderByUpdateTime());
             createTableIfNotExist(table, newExecutor);
 
             tables.putIfAbsent(table, newExecutor);
@@ -231,9 +244,13 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     }
 
     private String buildQueryCountSql(Query query, Executor executor, List<Object> args) {
-        StringBuilder sql = new StringBuilder("SELECT COUNT(1) FROM ").append(executor.getTable());
+        StringBuilder queryCount = new StringBuilder();
+        queryCount.append("SELECT COUNT(1) FROM (");
+        StringBuilder sql = new StringBuilder("SELECT 1 FROM `").append(executor.getTable()).append("`");
         buildQuerySqlWithParams(query, args, sql);
-        return sql.toString();
+        queryCount.append(sql);
+        queryCount.append(" GROUP BY `ID`) DBSYNCER_T");
+        return queryCount.toString();
     }
 
     private void buildQuerySqlWithParams(Query query, List<Object> args, StringBuilder sql) {
@@ -253,26 +270,53 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         }
     }
 
+    private void initUpgradeSql() {
+        // show tables where Tables_in_dbsyncer like "dbsyncer_data%"
+        String sql = String.format(SHOW_DATA_TABLE, database, PREFIX_TABLE.concat(StorageEnum.DATA.getType()).concat("%"));
+        Map<String, String> tables = null;
+        try {
+            tables = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForMap(sql));
+        } catch (EmptyResultDataAccessException e) {
+            // 没有可更新的表
+        }
+        if (CollectionUtils.isEmpty(tables)) {
+            return;
+        }
+        tables.values().forEach(table -> {
+            try {
+                String ddl = readSql(UPGRADE_SQL, true, table);
+                executeSql(ddl);
+                logger.info(ddl);
+            } catch (Exception e) {
+                if (e.getCause() instanceof SQLSyntaxErrorException) {
+                    SQLSyntaxErrorException ex = (SQLSyntaxErrorException) e.getCause();
+                    if (ex.getSQLState().equals("42S21")) {
+                        // ignore
+                        return;
+                    }
+                }
+                logger.error(e.getMessage());
+            }
+        });
+    }
+
     private void initTable() throws InterruptedException {
         // 配置
         FieldBuilder builder = new FieldBuilder();
         builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_NAME, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_UPDATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
-        List<FieldPair> configFields = builder.getFieldPairs();
-        List<Field> cfields = builder.getFields();
+        List<Field> configFields = builder.getFields();
 
         // 日志
         builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
-        List<FieldPair> logFields = builder.getFieldPairs();
-        List<Field> lfields = builder.getFields();
+        List<Field> logFields = builder.getFields();
 
         // 数据
-        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.DATA_SUCCESS, ConfigConstant.DATA_EVENT, ConfigConstant.DATA_ERROR, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
-        List<FieldPair> dataFields = builder.getFieldPairs();
-        List<Field> dfields = builder.getFields();
+        builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.DATA_SUCCESS, ConfigConstant.DATA_TABLE_GROUP_ID, ConfigConstant.DATA_TARGET_TABLE_NAME, ConfigConstant.DATA_EVENT, ConfigConstant.DATA_ERROR, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
+        List<Field> dataFields = builder.getFields();
 
-        tables.putIfAbsent(StorageEnum.CONFIG.getType(), new Executor(StorageEnum.CONFIG, configFields, cfields, false, true, true));
-        tables.putIfAbsent(StorageEnum.LOG.getType(), new Executor(StorageEnum.LOG, logFields, lfields, false, true, false));
-        tables.putIfAbsent(StorageEnum.DATA.getType(), new Executor(StorageEnum.DATA, dataFields, dfields, true, false, false));
+        tables.putIfAbsent(StorageEnum.CONFIG.getType(), new Executor(StorageEnum.CONFIG, configFields, false, true, true));
+        tables.putIfAbsent(StorageEnum.LOG.getType(), new Executor(StorageEnum.LOG, logFields, false, true, false));
+        tables.putIfAbsent(StorageEnum.DATA.getType(), new Executor(StorageEnum.DATA, dataFields, true, false, false));
         // 创建表
         tables.forEach((tableName, e) -> {
             if (!e.isDynamicTableName()) {
@@ -292,16 +336,12 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForMap(sql));
         } catch (EmptyResultDataAccessException e) {
             // 不存在表
-            String type = executor.getGroup().getType();
-            String template = PREFIX_TABLE.concat(type);
-            String ddl = readSql("/".concat(template).concat(".sql"));
-            // 动态替换表名
-            ddl = executor.isDynamicTableName() ? StringUtil.replaceOnce(ddl, template, table) : ddl;
+            String ddl = readSql(executor.getGroup().getType(), executor.isDynamicTableName(), table);
             logger.info(ddl);
             executeSql(ddl);
         }
 
-        List<Field> fields = executor.getFieldPairs().stream().map(p -> new Field(p.columnName, p.labelName)).collect(Collectors.toList());
+        List<Field> fields = executor.getFields();
         final SqlBuilderConfig config = new SqlBuilderConfig(connector, "", table, ConfigConstant.CONFIG_MODEL_ID, fields, "", "");
 
         String query = SqlBuilderEnum.QUERY.getSqlBuilder().buildQuerySql(config);
@@ -311,7 +351,10 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         executor.setTable(table).setQuery(query).setInsert(insert).setUpdate(update).setDelete(delete);
     }
 
-    private String readSql(String filePath) {
+    private String readSql(String type, boolean dynamicTableName, String table) {
+        String template = PREFIX_TABLE.concat(type);
+        String filePath = "/".concat(template).concat(".sql");
+
         StringBuilder res = new StringBuilder();
         InputStream in = null;
         InputStreamReader isr = null;
@@ -331,6 +374,11 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             close(isr);
             close(in);
         }
+
+        // 动态替换表名
+        if (dynamicTableName) {
+            return StringUtil.replaceOnce(res.toString(), template, table);
+        }
         return res.toString();
     }
 
@@ -369,51 +417,30 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         this.config = config;
     }
 
-    final class FieldPair {
-        String columnName;
-        String labelName;
-
-        public FieldPair(String columnName) {
-            this.columnName = columnName;
-            this.labelName = columnName;
-        }
-
-        public FieldPair(String columnName, String labelName) {
-            this.columnName = columnName;
-            this.labelName = labelName;
-        }
-    }
-
     final class FieldBuilder {
-        Map<String, FieldPair> fieldPairMap = new ConcurrentHashMap<>();
-        Map<String, Field> fieldMap = new ConcurrentHashMap<>();
-        List<FieldPair> fieldPairs;
+        Map<String, Field> fieldMap;
         List<Field> fields;
 
         public FieldBuilder() {
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_ID, new FieldPair(ConfigConstant.CONFIG_MODEL_ID));
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_NAME, new FieldPair(ConfigConstant.CONFIG_MODEL_NAME));
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_TYPE, new FieldPair(ConfigConstant.CONFIG_MODEL_TYPE));
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_CREATE_TIME, new FieldPair(TABLE_CREATE_TIME, ConfigConstant.CONFIG_MODEL_CREATE_TIME));
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, new FieldPair(TABLE_UPDATE_TIME, ConfigConstant.CONFIG_MODEL_UPDATE_TIME));
-            fieldPairMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_JSON, new FieldPair(ConfigConstant.CONFIG_MODEL_JSON));
-            fieldPairMap.putIfAbsent(ConfigConstant.DATA_SUCCESS, new FieldPair(ConfigConstant.DATA_SUCCESS));
-            fieldPairMap.putIfAbsent(ConfigConstant.DATA_EVENT, new FieldPair(ConfigConstant.DATA_EVENT));
-            fieldPairMap.putIfAbsent(ConfigConstant.DATA_ERROR, new FieldPair(ConfigConstant.DATA_ERROR));
-
-            fieldMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_ID, new Field(ConfigConstant.CONFIG_MODEL_ID, "VARCHAR", Types.VARCHAR, true));
-            fieldMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_NAME, new Field(ConfigConstant.CONFIG_MODEL_NAME, "VARCHAR", Types.VARCHAR));
-            fieldMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_TYPE, new Field(ConfigConstant.CONFIG_MODEL_TYPE, "VARCHAR", Types.VARCHAR));
-            fieldMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_CREATE_TIME, new Field(ConfigConstant.CONFIG_MODEL_CREATE_TIME, "BIGINT", Types.BIGINT));
-            fieldMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, new Field(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, "BIGINT", Types.BIGINT));
-            fieldMap.putIfAbsent(ConfigConstant.CONFIG_MODEL_JSON, new Field(ConfigConstant.CONFIG_MODEL_JSON, "LONGVARCHAR", Types.LONGVARCHAR));
-            fieldMap.putIfAbsent(ConfigConstant.DATA_SUCCESS, new Field(ConfigConstant.DATA_SUCCESS, "INTEGER", Types.INTEGER));
-            fieldMap.putIfAbsent(ConfigConstant.DATA_EVENT, new Field(ConfigConstant.DATA_EVENT, "VARCHAR", Types.VARCHAR));
-            fieldMap.putIfAbsent(ConfigConstant.DATA_ERROR, new Field(ConfigConstant.DATA_ERROR, "LONGVARCHAR", Types.LONGVARCHAR));
-        }
-
-        public List<FieldPair> getFieldPairs() {
-            return fieldPairs;
+            fieldMap = Stream.of(
+                    new Field(ConfigConstant.CONFIG_MODEL_ID, "VARCHAR", Types.VARCHAR, true),
+                    new Field(ConfigConstant.CONFIG_MODEL_NAME, "VARCHAR", Types.VARCHAR),
+                    new Field(ConfigConstant.CONFIG_MODEL_TYPE, "VARCHAR", Types.VARCHAR),
+                    new Field(ConfigConstant.CONFIG_MODEL_CREATE_TIME, "BIGINT", Types.BIGINT),
+                    new Field(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, "BIGINT", Types.BIGINT),
+                    new Field(ConfigConstant.CONFIG_MODEL_JSON, "LONGVARCHAR", Types.LONGVARCHAR),
+                    new Field(ConfigConstant.DATA_SUCCESS, "INTEGER", Types.INTEGER),
+                    new Field(ConfigConstant.DATA_TABLE_GROUP_ID, "VARCHAR", Types.VARCHAR),
+                    new Field(ConfigConstant.DATA_TARGET_TABLE_NAME, "VARCHAR", Types.VARCHAR),
+                    new Field(ConfigConstant.DATA_EVENT, "VARCHAR", Types.VARCHAR),
+                    new Field(ConfigConstant.DATA_ERROR, "LONGVARCHAR", Types.LONGVARCHAR)
+            ).map(field -> {
+                field.setLabelName(field.getName());
+                // 转换列下划线
+                String labelName = UnderlineToCamelUtils.camelToUnderline(field.getName());
+                field.setName(labelName);
+                return field;
+            }).collect(Collectors.toMap(Field::getLabelName, field -> field));
         }
 
         public List<Field> getFields() {
@@ -421,11 +448,12 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         }
 
         public void build(String... fieldNames) {
-            fieldPairs = new ArrayList<>(fieldNames.length);
             fields = new ArrayList<>(fieldNames.length);
             Stream.of(fieldNames).parallel().forEach(k -> {
-                fieldPairs.add(fieldPairMap.get(k));
-                fields.add(fieldMap.get(k));
+                if (fieldMap.containsKey(k)) {
+                    Field field = fieldMap.get(k);
+                    fields.add(field);
+                }
             });
         }
     }
@@ -437,15 +465,13 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         private String update;
         private String delete;
         private StorageEnum group;
-        private List<FieldPair> fieldPairs;
         private List<Field> fields;
         private boolean dynamicTableName;
         private boolean systemType;
         private boolean orderByUpdateTime;
 
-        public Executor(StorageEnum group, List<FieldPair> fieldPairs, List<Field> fields, boolean dynamicTableName, boolean systemType, boolean orderByUpdateTime) {
+        public Executor(StorageEnum group, List<Field> fields, boolean dynamicTableName, boolean systemType, boolean orderByUpdateTime) {
             this.group = group;
-            this.fieldPairs = fieldPairs;
             this.fields = fields;
             this.dynamicTableName = dynamicTableName;
             this.systemType = systemType;
@@ -505,10 +531,6 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             return fields;
         }
 
-        public List<FieldPair> getFieldPairs() {
-            return fieldPairs;
-        }
-
         public boolean isDynamicTableName() {
             return dynamicTableName;
         }

+ 56 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/UnderlineToCamelUtils.java

@@ -0,0 +1,56 @@
+package org.dbsyncer.storage.util;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public abstract class UnderlineToCamelUtils {
+
+    /**
+     * 下划线转驼峰法
+     *
+     * @param line       源字符串
+     * @param smallCamel 大小驼峰,是否为小驼峰
+     * @return 转换后的字符串
+     */
+    public static String underlineToCamel(String line, boolean smallCamel) {
+        if (line == null || "".equals(line)) {
+            return "";
+        }
+        StringBuffer sb = new StringBuffer();
+        Pattern pattern = Pattern.compile("([A-Za-z\\d]+)(_)?");
+        Matcher matcher = pattern.matcher(line);
+        while (matcher.find()) {
+            String word = matcher.group();
+            sb.append(smallCamel && matcher.start() == 0 ? Character.toLowerCase(word.charAt(0)) : Character.toUpperCase(word.charAt(0)));
+            int index = word.lastIndexOf('_');
+            if (index > 0) {
+                sb.append(word.substring(1, index).toLowerCase());
+            } else {
+                sb.append(word.substring(1).toLowerCase());
+            }
+        }
+        return sb.toString();
+    }
+
+    /**
+     * 驼峰法转下划线
+     *
+     * @param line 源字符串
+     * @return 转换后的字符串
+     */
+    public static String camelToUnderline(String line) {
+        if (line == null || "".equals(line)) {
+            return "";
+        }
+        line = String.valueOf(line.charAt(0)).toUpperCase().concat(line.substring(1));
+        StringBuffer sb = new StringBuffer();
+        Pattern pattern = Pattern.compile("[A-Z]([a-z\\d]+)?");
+        Matcher matcher = pattern.matcher(line);
+        while (matcher.find()) {
+            String word = matcher.group();
+            sb.append(word.toUpperCase());
+            sb.append(matcher.end() == line.length() ? "" : "_");
+        }
+        return sb.toString();
+    }
+}

+ 2 - 0
dbsyncer-storage/src/main/resources/dbsyncer_data.sql

@@ -1,6 +1,8 @@
 CREATE TABLE `dbsyncer_data` (
   `ID` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '唯一ID',
   `SUCCESS` int(1) NOT NULL COMMENT '成功1/失败0',
+  `TABLE_GROUP_ID` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '驱动表映射关系ID',
+  `TARGET_TABLE_NAME` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '目标表名称',
   `EVENT` varchar(10) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '事件',
   `ERROR` mediumtext CHARACTER SET utf8 COLLATE utf8_bin NULL COMMENT '异常信息',
   `CREATE_TIME` bigint(0) NOT NULL COMMENT '创建时间',

+ 1 - 0
dbsyncer-storage/src/main/resources/dbsyncer_upgrade.sql

@@ -0,0 +1 @@
+ALTER TABLE dbsyncer_upgrade ADD COLUMN TABLE_GROUP_ID VARCHAR(64) DEFAULT '' COMMENT '驱动表映射关系ID' AFTER SUCCESS, ADD COLUMN TARGET_TABLE_NAME VARCHAR(100) DEFAULT '' COMMENT '目标表名称' AFTER TABLE_GROUP_ID;

+ 2 - 2
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/plugin/PluginController.java

@@ -67,8 +67,8 @@ public class PluginController {
     }
 
     @GetMapping("/download")
-    public void download(HttpServletResponse response) {
-        String fileName = String.format("dbsyncer-common-%s.jar", appConfig.getVersion());
+    public void download(HttpServletResponse response, String name) {
+        String fileName = String.format("dbsyncer-%s-%s.jar", name, appConfig.getVersion());
         File file = new File(pluginService.getLibraryPath() + fileName);
         if (!file.exists()) {
             write(response, RestResult.restFail("Could not find file", 404));

+ 50 - 40
dbsyncer-web/src/main/resources/public/plugin/plugin.html

@@ -18,9 +18,12 @@
                                                target='_blank'>示例项目</a></li>
                     <li>导入开发包:
                         <ul>
-                            <li>方式1:导入jar <a onClick="downLoad()" href="javascript:;" title="下载开发包">dbsyncer-common-[[${version}]].jar</a></li>
+                            <li>方式1:导入jar
+                                <p><a onClick="downLoad('common')" href="javascript:;" title="下载开发包">dbsyncer-common-[[${version}]].jar</a></p>
+                                <p><a onClick="downLoad('connector')" href="javascript:;" title="下载开发包">dbsyncer-connector-[[${version}]].jar</a> (非必须)</p>
+                            </li>
                             <li>方式2:引入pom(需要安装到本地)
-                                <pre>&lt;dependency&gt;<br/>&nbsp;&nbsp;&lt;groupId>org.ghi&lt;/groupId&gt;<br/>&nbsp;&nbsp;&lt;artifactId>dbsyncer-common&lt;/artifactId&gt;<br/>&nbsp;&nbsp;&lt;version>[[${version}]]&lt;/version&gt;<br/>&lt;/dependency&gt;</pre>
+                                <pre>&lt;dependency&gt;<br/>&nbsp;&nbsp;&lt;groupId>org.ghi&lt;/groupId&gt;<br/>&nbsp;&nbsp;&lt;artifactId>dbsyncer-common&lt;/artifactId&gt;<br/>&nbsp;&nbsp;&lt;version>[[${version}]]&lt;/version&gt;<br/>&lt;/dependency&gt;<br/>&lt;/&lt;dependency&gt;<br/>&nbsp;&nbsp;&lt;groupId>org.ghi&lt;/groupId&gt;<br/>&nbsp;&nbsp;&lt;artifactId>dbsyncer-connector&lt;/artifactId&gt;<br/>&nbsp;&nbsp;&lt;version>[[${version}]]&lt;/version&gt;<br/>&lt;/dependency&gt;</pre>
                             </li>
                         </ul>
                     </li>
@@ -28,63 +31,70 @@
                         <simple>新建一个类,比如MyPlugin,实现接口ConvertService方法</simple>
 <pre>package org.test;
 
-import org.dbsyncer.common.model.FullConvertContext;
-import org.dbsyncer.common.model.IncrementConvertContext;
+import org.dbsyncer.common.spi.ConvertContext;
 import org.dbsyncer.common.spi.ConvertService;
 
-import java.util.List;
-import java.util.Map;
-
-public class MyPlugin implements ConvertService{
+public class MyPlugin implements ConvertService {
 
     /**
-    * 全量同步
-    *
-    * @param context
-    */
+     * 全量同步/增量同步
+     *
+     * @param convertContext
+     */
     @Override
-    public void convert(FullConvertContext context) {
-        // 数据源
-        List<Map> sourceList = context.getSourceList();
-        // 目标源
-        List<Map> targetList = context.getTargetList();
+    public void convert(ConvertContext convertContext) {
         // TODO 消费或处理数据
+        System.out.println("插件消费数据中...");
+
+        // 是否终止同步到目标库开关,默认false
+        convertContext.setTerminated(false);
+
+        // 获取Spring上下文,当然也可获取dbs注册的bean对象
+        convertContext.getContext();
+
+        // 数据源表和目标源表
+        convertContext.getSourceTableName();
+        convertContext.getTargetTableName();
+
+        // 捕获的事件(INSERT/UPDATE/DELETE)
+        convertContext.getEvent();
+
+        // 数据源和目标源表全量或增量数据
+        convertContext.getSourceList();
+        convertContext.getTargetList();
+
+        // 获取目标库连接器实例(如果需要用到连接器,必须引入dbsyncer-connector-[[${version}]].jar)
+        convertContext.getTargetConnectorMapper();
     }
 
     /**
-    * 增量同步
-    *
-    * @param context
-    */
+     * 全量同步/增量同步完成后执行处理
+     *
+     * @param context
+     */
     @Override
-    public void convert(IncrementConvertContext context) {
-        // 事件(INSERT/UPDATE/DELETE)
-        String event = context.getEvent();
-        // 数据源
-        Map source = context.getSource();
-        // 目标源
-        Map target = context.getTarget();
-        // TODO 消费或处理数据
+    public void postProcessAfter(ConvertContext context) {
+        // 完成同步后调用该方法
     }
 
     /**
-    * 重写方法:设置版本号
-    *
-    * @return
-    */
+     * 重写方法:设置版本号
+     *
+     * @return
+     */
     @Override
     public String getVersion() {
         return "1.0.0";
     }
 
     /**
-    * 重写方法:设置插件名称
-    *
-    * @return
-    */
+     * 重写方法:设置插件名称
+     *
+     * @return
+     */
     @Override
     public String getName() {
-        return "MyPlugin";
+        return "我的插件";
     }
 }</pre>
                     </li>
@@ -162,8 +172,8 @@ public class MyPlugin implements ConvertService{
         doLoader("/plugin");
     });
 
-    function downLoad(){
-        window.open($basePath + "/plugin/download");
+    function downLoad(fileName){
+        window.open($basePath + "/plugin/download?name=" + fileName);
     }
 </script>
 </html>

BIN
dbsyncer-web/src/main/resources/static/img/plugin/jar.png


BIN
dbsyncer-web/src/main/resources/static/img/plugin/spi.png