Răsfoiți Sursa

插件支持控制同步目标库逻辑 & 优化插件上下文参数

Signed-off-by: AE86 <836391306@qq.com>
AE86 2 ani în urmă
părinte
comite
5abbd5958e

+ 71 - 4
dbsyncer-common/src/main/java/org/dbsyncer/common/model/AbstractConvertContext.java

@@ -1,14 +1,24 @@
 package org.dbsyncer.common.model;
 
 import org.dbsyncer.common.spi.ConnectorMapper;
+import org.dbsyncer.common.spi.ConvertContext;
 import org.dbsyncer.common.spi.ProxyApplicationContext;
 
+import java.util.List;
+import java.util.Map;
+
 /**
  * @author AE86
  * @version 1.0.0
  * @date 2022/6/30 16:00
  */
-public abstract class AbstractConvertContext {
+public abstract class AbstractConvertContext implements ConvertContext {
+
+    /**
+     * 是否终止任务
+     * <p>true:目标源不再接收同步数据,默认值false
+     */
+    private boolean terminated;
 
     /**
      * Spring上下文
@@ -21,19 +31,76 @@ public abstract class AbstractConvertContext {
     protected ConnectorMapper targetConnectorMapper;
 
     /**
-     * 目标表
+     * 数据源表
+     */
+    protected String sourceTableName;
+
+    /**
+     * 目标源表
      */
     protected String targetTableName;
 
+    /**
+     * 同步事件(INSERT/UPDATE/DELETE)
+     */
+    protected String event;
+
+    /**
+     * 数据源数据集合
+     */
+    protected List<Map> sourceList;
+
+    /**
+     * 目标源源数据集合
+     */
+    protected List<Map> targetList;
+
+    public void setContext(ProxyApplicationContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return terminated;
+    }
+
+    @Override
+    public void setTerminated(boolean terminated) {
+        this.terminated = terminated;
+    }
+
+    @Override
     public ProxyApplicationContext getContext() {
         return context;
     }
 
+    @Override
+    public ConnectorMapper getTargetConnectorMapper() {
+        return targetConnectorMapper;
+    }
+
+    @Override
+    public String getSourceTableName() {
+        return sourceTableName;
+    }
+
+    @Override
     public String getTargetTableName() {
         return targetTableName;
     }
 
-    public ConnectorMapper getTargetConnectorMapper() {
-        return targetConnectorMapper;
+    @Override
+    public String getEvent() {
+        return event;
+    }
+
+    @Override
+    public List<Map> getSourceList() {
+        return sourceList;
+    }
+
+    @Override
+    public List<Map> getTargetList() {
+        return targetList;
     }
 }

+ 5 - 21
dbsyncer-common/src/main/java/org/dbsyncer/common/model/FullConvertContext.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.common.model;
 
 import org.dbsyncer.common.spi.ConnectorMapper;
-import org.dbsyncer.common.spi.ProxyApplicationContext;
 
 import java.util.List;
 import java.util.Map;
@@ -11,31 +10,16 @@ import java.util.Map;
  * @version 1.0.0
  * @date 2022/6/30 16:04
  */
-public class FullConvertContext extends AbstractConvertContext {
+public final class FullConvertContext extends AbstractConvertContext {
 
-    /**
-     * 全量同步,数据源数据集合
-     */
-    private List<Map> sourceList;
-
-    /**
-     * 全量同步,目标源源数据集合
-     */
-    private List<Map> targetList;
-
-    public FullConvertContext(ProxyApplicationContext context, ConnectorMapper targetConnectorMapper, String targetTableName, List<Map> sourceList, List<Map> targetList) {
-        this.context = context;
+    public FullConvertContext(ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event,
+                              List<Map> sourceList, List<Map> targetList) {
         this.targetConnectorMapper = targetConnectorMapper;
+        this.sourceTableName = sourceTableName;
         this.targetTableName = targetTableName;
+        this.event = event;
         this.sourceList = sourceList;
         this.targetList = targetList;
     }
 
-    public List<Map> getSourceList() {
-        return sourceList;
-    }
-
-    public List<Map> getTargetList() {
-        return targetList;
-    }
 }

+ 7 - 33
dbsyncer-common/src/main/java/org/dbsyncer/common/model/IncrementConvertContext.java

@@ -1,8 +1,8 @@
 package org.dbsyncer.common.model;
 
 import org.dbsyncer.common.spi.ConnectorMapper;
-import org.dbsyncer.common.spi.ProxyApplicationContext;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -10,41 +10,15 @@ import java.util.Map;
  * @version 1.0.0
  * @date 2022/6/30 16:06
  */
-public class IncrementConvertContext extends AbstractConvertContext {
+public final class IncrementConvertContext extends AbstractConvertContext {
 
-    /**
-     * 增量同步,事件(INSERT/UPDATE/DELETE)
-     */
-    private String event;
-
-    /**
-     * 增量同步,数据源数据
-     */
-    private Map source;
-
-    /**
-     * 增量同步,目标源数据
-     */
-    private Map target;
-
-    public IncrementConvertContext(ProxyApplicationContext context, ConnectorMapper targetConnectorMapper, String targetTableName, String event, Map source, Map target) {
-        this.context = context;
+    public IncrementConvertContext(ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event,
+                                   List<Map> sourceList, List<Map> targetList) {
         this.targetConnectorMapper = targetConnectorMapper;
+        this.sourceTableName = sourceTableName;
         this.targetTableName = targetTableName;
         this.event = event;
-        this.source = source;
-        this.target = target;
-    }
-
-    public String getEvent() {
-        return event;
-    }
-
-    public Map getSource() {
-        return source;
-    }
-
-    public Map getTarget() {
-        return target;
+        this.sourceList = sourceList;
+        this.targetList = targetList;
     }
 }

+ 65 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/spi/ConvertContext.java

@@ -0,0 +1,65 @@
+package org.dbsyncer.common.spi;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 插件转换上下文
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/10/28 20:26
+ */
+public interface ConvertContext {
+
+    /**
+     * 是否终止同步数据到目标源库
+     *
+     * @return
+     */
+    boolean isTerminated();
+
+    /**
+     * 是否终止同步数据到目标源库
+     * <p>true: 终止,默认值false
+     *
+     * @param terminated
+     */
+    void setTerminated(boolean terminated);
+
+    /**
+     * Spring上下文
+     */
+    ProxyApplicationContext getContext();
+
+    /**
+     * 目标源连接实例
+     */
+    ConnectorMapper getTargetConnectorMapper();
+
+    /**
+     * 数据源表
+     */
+    String getSourceTableName();
+
+    /**
+     * 目标源表
+     */
+    String getTargetTableName();
+
+    /**
+     * 增量同步,事件(INSERT/UPDATE/DELETE)
+     */
+    String getEvent();
+
+    /**
+     * 数据源数据集合
+     */
+    List<Map> getSourceList();
+
+    /**
+     * 目标源源数据集合
+     */
+    List<Map> getTargetList();
+
+}

+ 4 - 16
dbsyncer-common/src/main/java/org/dbsyncer/common/spi/ConvertService.java

@@ -1,8 +1,5 @@
 package org.dbsyncer.common.spi;
 
-import org.dbsyncer.common.model.FullConvertContext;
-import org.dbsyncer.common.model.IncrementConvertContext;
-
 /**
  * 插件扩展服务接口
  * <p>全量同步/增量同步,扩展转换</p>
@@ -14,27 +11,18 @@ import org.dbsyncer.common.model.IncrementConvertContext;
 public interface ConvertService {
 
     /**
-     * 全量同步
-     *
-     * @param context 上下文
-     */
-    void convert(FullConvertContext context);
-
-    /**
-     * 增量同步
+     * 全量同步/增量同步
      *
      * @param context 上下文
      */
-    void convert(IncrementConvertContext context);
+    void convert(ConvertContext context);
 
     /**
-     * 数据插入后处理接口
+     * 全量同步/增量同步完成后执行处理
      *
      * @param context 上下文
-     * @author wangxiri
-     * @date 2022/10/25
      */
-    default void postProcessAfter(IncrementConvertContext context) {
+    default void postProcessAfter(ConvertContext context) {
     }
 
     /**

+ 3 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -4,6 +4,7 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.spi.ConnectorMapper;
+import org.dbsyncer.common.spi.ConvertContext;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
@@ -165,9 +166,10 @@ public interface Parser {
     /**
      * 批执行
      *
+     * @param context
      * @param batchWriter
      * @return
      */
-    Result writeBatch(BatchWriter batchWriter);
+    Result writeBatch(ConvertContext context, BatchWriter batchWriter);
 
 }

+ 28 - 12
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -5,8 +5,10 @@ import com.alibaba.fastjson.JSONObject;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
+import org.dbsyncer.common.model.FullConvertContext;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.spi.ConnectorMapper;
+import org.dbsyncer.common.spi.ConvertContext;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.connector.ConnectorFactory;
@@ -25,13 +27,18 @@ import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.event.FullRefreshEvent;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
-import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.model.BatchWriter;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.FieldMapping;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Picker;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.Task;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.strategy.ParserStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
-import org.dbsyncer.plugin.config.Plugin;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -252,6 +259,7 @@ public class ParserFactory implements Parser {
         int batchSize = mapping.getBatchNum();
         ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
         ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
+        final String event = ConnectorConstant.OPERTION_INSERT;
 
         for (; ; ) {
             if (!task.isRunning()) {
@@ -274,15 +282,15 @@ public class ParserFactory implements Parser {
             ConvertUtil.convert(group.getConvert(), target);
 
             // 4、插件转换
-            Plugin plugin = group.getPlugin();
-            pluginFactory.convert(tConnectorMapper, plugin, tTableName, data, target);
+            final FullConvertContext context = new FullConvertContext(tConnectorMapper, sTableName, tTableName, event, data, target);
+            pluginFactory.convert(group.getPlugin(), context);
 
             // 5、写入目标源
-            BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, tTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
-            Result writer = writeBatch(batchWriter, executorService);
+            BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, tTableName, event, picker.getTargetFields(), target, batchSize);
+            Result writer = writeBatch(context, batchWriter, executorService);
 
-            // 6、执行批量处理后的
-            pluginFactory.postProcessAfter(tConnectorMapper, plugin, tTableName, ConnectorConstant.OPERTION_INSERT, data, target);
+            // 6、同步完成后通知插件做后置处理
+            pluginFactory.postProcessAfter(group.getPlugin(), context);
 
             // 7、更新结果
             task.setPageIndex(task.getPageIndex() + 1);
@@ -306,22 +314,31 @@ public class ParserFactory implements Parser {
     /**
      * 批量写入
      *
+     * @param context
      * @param batchWriter
      * @return
      */
     @Override
-    public Result writeBatch(BatchWriter batchWriter) {
-        return writeBatch(batchWriter, taskExecutor);
+    public Result writeBatch(ConvertContext context, BatchWriter batchWriter) {
+        return writeBatch(context, batchWriter, taskExecutor);
     }
 
     /**
      * 批量写入
      *
+     * @param context
      * @param batchWriter
      * @param taskExecutor
      * @return
      */
-    private Result writeBatch(BatchWriter batchWriter, Executor taskExecutor) {
+    private Result writeBatch(ConvertContext context, BatchWriter batchWriter, Executor taskExecutor) {
+        final Result result = new Result();
+        // 终止同步数据到目标源库
+        if(context.isTerminated()){
+            result.getSuccessData().addAll(batchWriter.getDataList());
+            return result;
+        }
+
         List<Map> dataList = batchWriter.getDataList();
         int batchSize = batchWriter.getBatchSize();
         String tableName = batchWriter.getTableName();
@@ -338,7 +355,6 @@ public class ParserFactory implements Parser {
         // 批量任务, 拆分
         int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
 
-        final Result result = new Result();
         final CountDownLatch latch = new CountDownLatch(taskSize);
         int fromIndex = 0;
         int toIndex = batchSize;

+ 19 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -3,18 +3,25 @@ package org.dbsyncer.parser.flush.impl;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.config.BufferActuatorConfig;
 import org.dbsyncer.common.model.AbstractConnectorConfig;
+import org.dbsyncer.common.model.IncrementConvertContext;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.spi.ConnectorMapper;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.parser.ParserFactory;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
-import org.dbsyncer.parser.model.*;
+import org.dbsyncer.parser.model.BatchWriter;
+import org.dbsyncer.parser.model.Connector;
+import org.dbsyncer.parser.model.Mapping;
+import org.dbsyncer.parser.model.Picker;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.WriterRequest;
+import org.dbsyncer.parser.model.WriterResponse;
 import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.strategy.ParserStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
+import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
-import org.dbsyncer.plugin.config.Plugin;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
@@ -75,31 +82,33 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         // 1、获取配置信息
         final TableGroup tableGroup = cacheService.get(response.getTableGroupId(), TableGroup.class);
         final Mapping mapping = cacheService.get(tableGroup.getMappingId(), Mapping.class);
-        final String targetTableName = tableGroup.getTargetTable().getName();
+        final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
+        final String sourceTableName = group.getSourceTable().getName();
+        final String targetTableName = group.getTargetTable().getName();
         final String event = response.getEvent();
         final List<Map> sourceDataList = response.getDataList();
 
         // 2、映射字段
-        final Picker picker = new Picker(tableGroup.getFieldMapping());
+        final Picker picker = new Picker(group.getFieldMapping());
         List<Map> targetDataList = picker.pickData(sourceDataList);
 
         // 3、参数转换
-        ConvertUtil.convert(tableGroup.getConvert(), targetDataList);
+        ConvertUtil.convert(group.getConvert(), targetDataList);
 
         // 4、插件转换
         ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
-        Plugin plugin = tableGroup.getPlugin();
-        pluginFactory.convert(targetConnectorMapper, plugin, targetTableName, event, sourceDataList, targetDataList);
+        final IncrementConvertContext context = new IncrementConvertContext(targetConnectorMapper, sourceTableName, targetTableName, event, sourceDataList, targetDataList);
+        pluginFactory.convert(group.getPlugin(), context);
 
         // 5、批量执行同步
-        BatchWriter batchWriter = new BatchWriter(targetConnectorMapper, tableGroup.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, bufferActuatorConfig.getWriterBatchCount());
-        Result result = parserFactory.writeBatch(batchWriter);
+        Result result = parserFactory.writeBatch(context, new BatchWriter(targetConnectorMapper, group.getCommand(), targetTableName, event,
+                picker.getTargetFields(), targetDataList, bufferActuatorConfig.getWriterBatchCount()));
 
         // 6、持久化同步结果
         flushStrategy.flushIncrementData(mapping.getMetaId(), result, event);
 
         // 7、执行批量处理后的
-        pluginFactory.postProcessAfter(targetConnectorMapper, plugin, targetTableName, event, sourceDataList, targetDataList);
+        pluginFactory.postProcessAfter(group.getPlugin(), context);
 
         // 8、完成处理
         parserStrategy.complete(response.getMessageIds());

+ 26 - 34
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java

@@ -1,9 +1,8 @@
 package org.dbsyncer.plugin;
 
 import org.apache.commons.io.FileUtils;
-import org.dbsyncer.common.model.FullConvertContext;
-import org.dbsyncer.common.model.IncrementConvertContext;
-import org.dbsyncer.common.spi.ConnectorMapper;
+import org.dbsyncer.common.model.AbstractConvertContext;
+import org.dbsyncer.common.spi.ConvertContext;
 import org.dbsyncer.common.spi.ConvertService;
 import org.dbsyncer.common.spi.ProxyApplicationContext;
 import org.dbsyncer.common.util.CollectionUtils;
@@ -19,7 +18,13 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
 import java.util.stream.Collectors;
 
 /**
@@ -78,7 +83,7 @@ public class PluginFactory {
         } catch (IOException e) {
             logger.error(e.getMessage());
         }
-        Collection<File> files = FileUtils.listFiles(new File(PLUGIN_PATH), new String[]{"jar"}, true);
+        Collection<File> files = FileUtils.listFiles(new File(PLUGIN_PATH), new String[] {"jar"}, true);
         if (!CollectionUtils.isEmpty(files)) {
             files.forEach(f -> loadPlugin(f));
         }
@@ -97,43 +102,31 @@ public class PluginFactory {
         return Collections.unmodifiableList(plugins);
     }
 
-    public void convert(ConnectorMapper targetConnectorMapper, Plugin plugin, String targetTableName, List<Map> sourceList, List<Map> targetList) {
-        if (null != plugin && service.containsKey(plugin.getClassName())) {
-            service.get(plugin.getClassName()).convert(new FullConvertContext(applicationContextProxy, targetConnectorMapper, targetTableName, sourceList, targetList));
-        }
-    }
-
-    public void convert(ConnectorMapper targetConnectorMapper, Plugin plugin,String targetTableName, String event, List<Map> sourceList, List<Map> targetList) {
+    /**
+     * 全量同步/增量同步
+     *
+     * @param plugin
+     * @param context
+     */
+    public void convert(Plugin plugin, ConvertContext context) {
         if (null != plugin && service.containsKey(plugin.getClassName())) {
-            ConvertService convertService = service.get(plugin.getClassName());
-            int size = sourceList.size();
-            if (size == targetList.size()) {
-                for (int i = 0; i < size; i++) {
-                    convertService.convert(new IncrementConvertContext(applicationContextProxy, targetConnectorMapper, targetTableName, event, sourceList.get(i), targetList.get(i)));
-                }
+            if (context instanceof AbstractConvertContext) {
+                AbstractConvertContext ctx = (AbstractConvertContext) context;
+                ctx.setContext(applicationContextProxy);
             }
+            service.get(plugin.getClassName()).convert(context);
         }
     }
 
     /**
-     * 完成同步后执行处理
+     * 全量同步/增量同步完成后执行处理
      *
-     * @param targetConnectorMapper
      * @param plugin
-     * @param targetTableName
-     * @param event
-     * @param sourceList
-     * @param targetList
+     * @param context
      */
-    public void postProcessAfter(ConnectorMapper targetConnectorMapper, Plugin plugin, String targetTableName, String event, List<Map> sourceList, List<Map> targetList) {
+    public void postProcessAfter(Plugin plugin, ConvertContext context) {
         if (null != plugin && service.containsKey(plugin.getClassName())) {
-            ConvertService convertService = service.get(plugin.getClassName());
-            int size = sourceList.size();
-            if (size == targetList.size()) {
-                for (int i = 0; i < size; i++) {
-                    convertService.postProcessAfter(new IncrementConvertContext(applicationContextProxy, targetConnectorMapper, targetTableName, event, sourceList.get(i), targetList.get(i)));
-                }
-            }
+            service.get(plugin.getClassName()).postProcessAfter(context);
         }
     }
 
@@ -146,7 +139,7 @@ public class PluginFactory {
         try {
             String fileName = jar.getName();
             URL url = jar.toURI().toURL();
-            URLClassLoader loader = new URLClassLoader(new URL[]{url}, Thread.currentThread().getContextClassLoader());
+            URLClassLoader loader = new URLClassLoader(new URL[] {url}, Thread.currentThread().getContextClassLoader());
             ServiceLoader<ConvertService> services = ServiceLoader.load(ConvertService.class, loader);
             for (ConvertService s : services) {
                 String className = s.getClass().getName();
@@ -159,5 +152,4 @@ public class PluginFactory {
         }
 
     }
-
 }

+ 8 - 6
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/service/DemoConvertServiceImpl.java

@@ -1,8 +1,7 @@
 package org.dbsyncer.plugin.service;
 
 import org.dbsyncer.common.config.AppConfig;
-import org.dbsyncer.common.model.FullConvertContext;
-import org.dbsyncer.common.model.IncrementConvertContext;
+import org.dbsyncer.common.spi.ConvertContext;
 import org.dbsyncer.common.spi.ConvertService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,12 +17,15 @@ public class DemoConvertServiceImpl implements ConvertService {
     private AppConfig appConfig;
 
     @Override
-    public void convert(FullConvertContext context) {
+    public void convert(ConvertContext context) {
+        context.setTerminated(true);
+        logger.info("插件正在处理同步数据,数据源表:{},目标源表:{},事件:{},条数:{}", context.getSourceTableName(), context.getTargetTableName(),
+                context.getEvent(), context.getTargetList().size());
     }
 
     @Override
-    public void convert(IncrementConvertContext context) {
-        logger.info("插件正在处理同步数据,事件:{},数据:{}", context.getEvent(), context.getSource());
+    public void postProcessAfter(ConvertContext context) {
+        logger.info("插件正在处理同步成功的数据,目标源表:{},事件:{},条数:{}", context.getTargetTableName(), context.getEvent(), context.getTargetList().size());
     }
 
     @Override
@@ -34,4 +36,4 @@ public class DemoConvertServiceImpl implements ConvertService {
     public String getName() {
         return "Demo";
     }
-}
+}