Explorar el Código

支持插件前置处理全量同步任务

AE86 hace 9 meses
padre
commit
3b9d027415

+ 3 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -27,6 +27,7 @@ import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
+import org.dbsyncer.plugin.enums.ProcessEnum;
 import org.dbsyncer.plugin.impl.IncrementPluginContext;
 import org.dbsyncer.sdk.config.DDLConfig;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
@@ -148,7 +149,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         final ConnectorInstance sConnectorInstance = connectorFactory.connect(getConnectorConfig(mapping.getSourceConnectorId()));
         final ConnectorInstance tConnectorInstance = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
         final IncrementPluginContext context = new IncrementPluginContext(sConnectorInstance, tConnectorInstance, sourceTableName, targetTableName, event, sourceDataList, targetDataList, group.getPluginExtInfo());
-        pluginFactory.convert(group.getPlugin(), context);
+        pluginFactory.process(group.getPlugin(), context, ProcessEnum.CONVERT);
 
         // 5、批量执行同步
         BatchWriter batchWriter = new BatchWriter(tConnectorInstance, group.getCommand(), targetTableName, event, picker.getTargetFields(), targetDataList, generalBufferConfig.getBufferWriterCount(), mapping.isForceUpdate());
@@ -163,7 +164,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         flushStrategy.flushIncrementData(mapping.getMetaId(), result, event);
 
         // 8、执行批量处理后的
-        pluginFactory.postProcessAfter(group.getPlugin(), context);
+        pluginFactory.process(group.getPlugin(), context, ProcessEnum.AFTER);
     }
 
     @Override

+ 14 - 11
dbsyncer-parser/src/main/java/org/dbsyncer/parser/impl/ParserComponentImpl.java

@@ -7,14 +7,6 @@ import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.base.ConnectorFactory;
-import org.dbsyncer.sdk.config.CommandConfig;
-import org.dbsyncer.sdk.config.ReaderConfig;
-import org.dbsyncer.sdk.config.WriterBatchConfig;
-import org.dbsyncer.sdk.constant.ConnectorConstant;
-import org.dbsyncer.sdk.model.Field;
-import org.dbsyncer.sdk.model.MetaInfo;
-import org.dbsyncer.sdk.model.Table;
-import org.dbsyncer.sdk.util.PrimaryKeyUtil;
 import org.dbsyncer.parser.ParserComponent;
 import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.event.FullRefreshEvent;
@@ -29,10 +21,19 @@ import org.dbsyncer.parser.strategy.FlushStrategy;
 import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
+import org.dbsyncer.plugin.enums.ProcessEnum;
+import org.dbsyncer.plugin.impl.FullPluginContext;
+import org.dbsyncer.sdk.config.CommandConfig;
+import org.dbsyncer.sdk.config.ReaderConfig;
+import org.dbsyncer.sdk.config.WriterBatchConfig;
 import org.dbsyncer.sdk.connector.ConnectorInstance;
+import org.dbsyncer.sdk.constant.ConnectorConstant;
 import org.dbsyncer.sdk.model.ConnectorConfig;
-import org.dbsyncer.plugin.impl.FullPluginContext;
+import org.dbsyncer.sdk.model.Field;
+import org.dbsyncer.sdk.model.MetaInfo;
+import org.dbsyncer.sdk.model.Table;
 import org.dbsyncer.sdk.plugin.PluginContext;
+import org.dbsyncer.sdk.util.PrimaryKeyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationContext;
@@ -148,6 +149,8 @@ public class ParserComponentImpl implements ParserComponent {
         final ConnectorInstance tConnectorInstance = connectorFactory.connect(tConfig);
         final String event = ConnectorConstant.OPERTION_INSERT;
         final FullPluginContext context = new FullPluginContext(sConnectorInstance, tConnectorInstance, sTableName, tTableName, event, group.getPluginExtInfo());
+        // 0、插件前置处理
+        pluginFactory.process(group.getPlugin(), context, ProcessEnum.BEFORE);
 
         for (; ; ) {
             if (!task.isRunning()) {
@@ -173,7 +176,7 @@ public class ParserComponentImpl implements ParserComponent {
             // 4、插件转换
             context.setSourceList(source);
             context.setTargetList(target);
-            pluginFactory.convert(group.getPlugin(), context);
+            pluginFactory.process(group.getPlugin(), context, ProcessEnum.CONVERT);
 
             // 5、写入目标源
             BatchWriter batchWriter = new BatchWriter(tConnectorInstance, command, tTableName, event, picker.getTargetFields(), target, batchSize, mapping.isForceUpdate());
@@ -187,7 +190,7 @@ public class ParserComponentImpl implements ParserComponent {
             flush(task, result);
 
             // 7、同步完成后通知插件做后置处理
-            pluginFactory.postProcessAfter(group.getPlugin(), context);
+            pluginFactory.process(group.getPlugin(), context, ProcessEnum.AFTER);
 
             // 8、判断尾页
             if (source.size() < pageSize) {

+ 18 - 18
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/PluginFactory.java

@@ -1,7 +1,11 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
 package org.dbsyncer.plugin;
 
 import org.apache.commons.io.FileUtils;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.plugin.enums.ProcessEnum;
 import org.dbsyncer.plugin.model.Plugin;
 import org.dbsyncer.sdk.plugin.PluginContext;
 import org.dbsyncer.sdk.spi.PluginService;
@@ -105,32 +109,27 @@ public class PluginFactory implements DisposableBean {
     }
 
     /**
-     * 全量同步/增量同步
+     * 全量同步/增量同步处理
      *
      * @param plugin
      * @param context
      */
-    public void convert(Plugin plugin, PluginContext context) {
+    public void process(Plugin plugin, PluginContext context, ProcessEnum processEnum) {
         if (null != plugin) {
             String pluginId = createPluginId(plugin.getClassName(), plugin.getVersion());
             service.computeIfPresent(pluginId, (k, c) -> {
                 c.convert(context);
-                return c;
-            });
-        }
-    }
-
-    /**
-     * 全量同步/增量同步完成后执行处理
-     *
-     * @param plugin
-     * @param context
-     */
-    public void postProcessAfter(Plugin plugin, PluginContext context) {
-        if (null != plugin) {
-            String pluginId = createPluginId(plugin.getClassName(), plugin.getVersion());
-            service.computeIfPresent(pluginId, (k, c) -> {
-                c.postProcessAfter(context);
+                switch (processEnum) {
+                    case BEFORE:
+                        c.postProcessBefore(context);
+                        break;
+                    case CONVERT:
+                        c.convert(context);
+                        break;
+                    case AFTER:
+                        c.postProcessAfter(context);
+                        break;
+                }
                 return c;
             });
         }
@@ -189,4 +188,5 @@ public class PluginFactory implements DisposableBean {
         });
         service.clear();
     }
+
 }

+ 28 - 0
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/enums/ProcessEnum.java

@@ -0,0 +1,28 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
+package org.dbsyncer.plugin.enums;
+
+/**
+ * @Author AE86
+ * @Version 1.0.0
+ * @Date 2024-08-13 00:27
+ */
+public enum ProcessEnum {
+
+    /**
+     * 全量同步前置处理
+     */
+    BEFORE,
+
+    /**
+     * 全量同步/增量同步转换
+     */
+    CONVERT,
+
+    /**
+     * 全量同步/增量同步后置处理
+     */
+    AFTER;
+
+}

+ 8 - 0
dbsyncer-plugin/src/main/java/org/dbsyncer/plugin/impl/DemoPluginServiceProvider.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
 package org.dbsyncer.plugin.impl;
 
 import org.dbsyncer.common.config.AppConfig;
@@ -17,6 +20,11 @@ public final class DemoPluginServiceProvider implements PluginService {
     @Resource
     private AppConfig appConfig;
 
+    @Override
+    public void postProcessBefore(PluginContext context) {
+        logger.info("插件正在处理全量同步,目标源表:{}", context.getTargetTableName());
+    }
+
     @Override
     public void convert(PluginContext context) {
         context.setTerminated(true);

+ 12 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/spi/PluginService.java

@@ -1,3 +1,6 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
 package org.dbsyncer.sdk.spi;
 
 import org.dbsyncer.sdk.plugin.PluginContext;
@@ -19,6 +22,14 @@ public interface PluginService {
 
     }
 
+    /**
+     * 全量同步前置处理
+     *
+     * @param pluginContext 上下文
+     */
+    default void postProcessBefore(PluginContext pluginContext) {
+    }
+
     /**
      * 全量同步/增量同步
      *
@@ -27,7 +38,7 @@ public interface PluginService {
     void convert(PluginContext pluginContext);
 
     /**
-     * 全量同步/增量同步完成后执行处理
+     * 全量同步/增量同步后置处理
      *
      * @param pluginContext 上下文
      */