浏览代码

支持通过插件上下文获取数据源连接实例

AE86 2 年之前
父节点
当前提交
eb35d9be2e

+ 29 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/model/AbstractConvertContext.java

@@ -25,6 +25,11 @@ public abstract class AbstractConvertContext implements ConvertContext {
      */
     protected ProxyApplicationContext context;
 
+    /**
+     * 数据源连接实例
+     */
+    protected ConnectorMapper sourceConnectorMapper;
+
     /**
      * 目标源连接实例
      */
@@ -55,6 +60,17 @@ public abstract class AbstractConvertContext implements ConvertContext {
      */
     protected List<Map> targetList;
 
+    public void init(ConnectorMapper sourceConnectorMapper, ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event,
+                              List<Map> sourceList, List<Map> targetList) {
+        this.sourceConnectorMapper = sourceConnectorMapper;
+        this.targetConnectorMapper = targetConnectorMapper;
+        this.sourceTableName = sourceTableName;
+        this.targetTableName = targetTableName;
+        this.event = event;
+        this.sourceList = sourceList;
+        this.targetList = targetList;
+    }
+
     public void setContext(ProxyApplicationContext context) {
         this.context = context;
     }
@@ -74,6 +90,11 @@ public abstract class AbstractConvertContext implements ConvertContext {
         return context;
     }
 
+    @Override
+    public ConnectorMapper getSourceConnectorMapper() {
+        return sourceConnectorMapper;
+    }
+
     @Override
     public ConnectorMapper getTargetConnectorMapper() {
         return targetConnectorMapper;
@@ -99,8 +120,16 @@ public abstract class AbstractConvertContext implements ConvertContext {
         return sourceList;
     }
 
+    public void setSourceList(List<Map> sourceList) {
+        this.sourceList = sourceList;
+    }
+
     @Override
     public List<Map> getTargetList() {
         return targetList;
     }
+
+    public void setTargetList(List<Map> targetList) {
+        this.targetList = targetList;
+    }
 }

+ 2 - 12
dbsyncer-common/src/main/java/org/dbsyncer/common/model/FullConvertContext.java

@@ -2,9 +2,6 @@ package org.dbsyncer.common.model;
 
 import org.dbsyncer.common.spi.ConnectorMapper;
 
-import java.util.List;
-import java.util.Map;
-
 /**
  * @author AE86
  * @version 1.0.0
@@ -12,14 +9,7 @@ import java.util.Map;
  */
 public final class FullConvertContext extends AbstractConvertContext {
 
-    public FullConvertContext(ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event,
-                              List<Map> sourceList, List<Map> targetList) {
-        this.targetConnectorMapper = targetConnectorMapper;
-        this.sourceTableName = sourceTableName;
-        this.targetTableName = targetTableName;
-        this.event = event;
-        this.sourceList = sourceList;
-        this.targetList = targetList;
+    public FullConvertContext(ConnectorMapper sourceConnectorMapper, ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event) {
+        super.init(sourceConnectorMapper, targetConnectorMapper, sourceTableName, targetTableName, event, null, null);
     }
-
 }

+ 2 - 8
dbsyncer-common/src/main/java/org/dbsyncer/common/model/IncrementConvertContext.java

@@ -12,13 +12,7 @@ import java.util.Map;
  */
 public final class IncrementConvertContext extends AbstractConvertContext {
 
-    public IncrementConvertContext(ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event,
-                                   List<Map> sourceList, List<Map> targetList) {
-        this.targetConnectorMapper = targetConnectorMapper;
-        this.sourceTableName = sourceTableName;
-        this.targetTableName = targetTableName;
-        this.event = event;
-        this.sourceList = sourceList;
-        this.targetList = targetList;
+    public IncrementConvertContext(ConnectorMapper sourceConnectorMapper, ConnectorMapper targetConnectorMapper, String sourceTableName, String targetTableName, String event, List<Map> sourceList, List<Map> targetList) {
+        super.init(sourceConnectorMapper, targetConnectorMapper, sourceTableName, targetTableName, event, sourceList, targetList);
     }
 }

+ 5 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/spi/ConvertContext.java

@@ -32,6 +32,11 @@ public interface ConvertContext {
      */
     ProxyApplicationContext getContext();
 
+    /**
+     * 数据源连接实例
+     */
+    ConnectorMapper getSourceConnectorMapper();
+
     /**
      * 目标源连接实例
      */

+ 14 - 12
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -252,9 +252,10 @@ public class ParserFactory implements Parser {
 
         int pageSize = mapping.getReadNum();
         int batchSize = mapping.getBatchNum();
-        ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
-        ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
+        final ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
+        final ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
         final String event = ConnectorConstant.OPERTION_INSERT;
+        final FullConvertContext context = new FullConvertContext(sConnectorMapper, tConnectorMapper, sTableName, tTableName, event);
 
         for (; ; ) {
             if (!task.isRunning()) {
@@ -264,20 +265,21 @@ public class ParserFactory implements Parser {
 
             // 1、获取数据源数据
             Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), task.getCursor(), task.getPageIndex(), pageSize));
-            List<Map> data = reader.getSuccessData();
-            if (CollectionUtils.isEmpty(data)) {
+            List<Map> source = reader.getSuccessData();
+            if (CollectionUtils.isEmpty(source)) {
                 logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
                 break;
             }
 
             // 2、映射字段
-            List<Map> target = picker.pickData(data);
+            List<Map> target = picker.pickData(source);
 
             // 3、参数转换
             ConvertUtil.convert(group.getConvert(), target);
 
             // 4、插件转换
-            final FullConvertContext context = new FullConvertContext(tConnectorMapper, sTableName, tTableName, event, data, target);
+            context.setSourceList(source);
+            context.setTargetList(target);
             pluginFactory.convert(group.getPlugin(), context);
 
             // 5、写入目标源
@@ -289,13 +291,13 @@ public class ParserFactory implements Parser {
 
             // 7、更新结果
             task.setPageIndex(task.getPageIndex() + 1);
-            task.setCursor(getLastCursor(data, pk));
+            task.setCursor(getLastCursor(source, pk));
             result.setTableGroupId(tableGroup.getId());
             result.setTargetTableGroupName(tTableName);
             flush(task, result);
 
             // 8、判断尾页
-            if (data.size() < pageSize) {
+            if (source.size() < pageSize) {
                 logger.info("完成全量:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
                 break;
             }
@@ -331,7 +333,7 @@ public class ParserFactory implements Parser {
     private Result writeBatch(ConvertContext context, BatchWriter batchWriter, Executor taskExecutor) {
         final Result result = new Result();
         // 终止同步数据到目标源库
-        if(context.isTerminated()){
+        if (context.isTerminated()) {
             result.getSuccessData().addAll(batchWriter.getDataList());
             return result;
         }
@@ -427,12 +429,12 @@ public class ParserFactory implements Parser {
     /**
      * 获取最新游标值
      *
-     * @param data
+     * @param source
      * @param pk
      * @return
      */
-    private Object getLastCursor(List<Map> data, String pk) {
-        return CollectionUtils.isEmpty(data) ? null : data.get(data.size() - 1).get(pk);
+    private Object getLastCursor(List<Map> source, String pk) {
+        return CollectionUtils.isEmpty(source) ? null : source.get(source.size() - 1).get(pk);
     }
 
 }

+ 4 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -96,12 +96,13 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
         ConvertUtil.convert(group.getConvert(), targetDataList);
 
         // 4、插件转换
-        ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
-        final IncrementConvertContext context = new IncrementConvertContext(targetConnectorMapper, sourceTableName, targetTableName, event, sourceDataList, targetDataList);
+        final ConnectorMapper sConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getSourceConnectorId()));
+        final ConnectorMapper tConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
+        final IncrementConvertContext context = new IncrementConvertContext(sConnectorMapper, tConnectorMapper, sourceTableName, targetTableName, event, sourceDataList, targetDataList);
         pluginFactory.convert(group.getPlugin(), context);
 
         // 5、批量执行同步
-        Result result = parserFactory.writeBatch(context, new BatchWriter(targetConnectorMapper, group.getCommand(), targetTableName, event,
+        Result result = parserFactory.writeBatch(context, new BatchWriter(tConnectorMapper, group.getCommand(), targetTableName, event,
                 picker.getTargetFields(), targetDataList, bufferActuatorConfig.getWriterBatchCount()));
 
         // 6、持久化同步结果