|
@@ -12,13 +12,20 @@ import org.dbsyncer.common.util.StringUtil;
|
|
|
import org.dbsyncer.connector.base.ConnectorFactory;
|
|
|
import org.dbsyncer.parser.ParserComponent;
|
|
|
import org.dbsyncer.parser.ProfileComponent;
|
|
|
+import org.dbsyncer.parser.TableGroupContext;
|
|
|
import org.dbsyncer.parser.ddl.DDLParser;
|
|
|
import org.dbsyncer.parser.event.RefreshOffsetEvent;
|
|
|
import org.dbsyncer.parser.flush.AbstractBufferActuator;
|
|
|
-import org.dbsyncer.parser.model.*;
|
|
|
+import org.dbsyncer.parser.model.Connector;
|
|
|
+import org.dbsyncer.parser.model.FieldMapping;
|
|
|
+import org.dbsyncer.parser.model.Mapping;
|
|
|
+import org.dbsyncer.parser.model.Meta;
|
|
|
+import org.dbsyncer.parser.model.TableGroup;
|
|
|
+import org.dbsyncer.parser.model.TableGroupPicker;
|
|
|
+import org.dbsyncer.parser.model.WriterRequest;
|
|
|
+import org.dbsyncer.parser.model.WriterResponse;
|
|
|
import org.dbsyncer.parser.strategy.FlushStrategy;
|
|
|
import org.dbsyncer.parser.util.ConvertUtil;
|
|
|
-import org.dbsyncer.parser.util.PickerUtil;
|
|
|
import org.dbsyncer.plugin.PluginFactory;
|
|
|
import org.dbsyncer.plugin.enums.ProcessEnum;
|
|
|
import org.dbsyncer.plugin.impl.IncrementPluginContext;
|
|
@@ -26,7 +33,10 @@ import org.dbsyncer.sdk.config.DDLConfig;
|
|
|
import org.dbsyncer.sdk.connector.ConnectorInstance;
|
|
|
import org.dbsyncer.sdk.enums.ChangedEventTypeEnum;
|
|
|
import org.dbsyncer.sdk.model.ConnectorConfig;
|
|
|
+import org.dbsyncer.sdk.model.Field;
|
|
|
import org.dbsyncer.sdk.model.MetaInfo;
|
|
|
+import org.dbsyncer.sdk.schema.SchemaResolver;
|
|
|
+import org.dbsyncer.sdk.spi.ConnectorService;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.context.ApplicationContext;
|
|
@@ -35,10 +45,12 @@ import org.springframework.util.Assert;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
import javax.annotation.Resource;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.Executor;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* 通用执行器(单线程消费,多线程批量写,按序执行)
|
|
@@ -65,7 +77,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
private ParserComponent parserComponent;
|
|
|
|
|
|
@Resource
|
|
|
- protected ProfileComponent profileComponent;
|
|
|
+ private ProfileComponent profileComponent;
|
|
|
|
|
|
@Resource
|
|
|
private PluginFactory pluginFactory;
|
|
@@ -79,6 +91,9 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
@Resource
|
|
|
private DDLParser ddlParser;
|
|
|
|
|
|
+ @Resource
|
|
|
+ private TableGroupContext tableGroupContext;
|
|
|
+
|
|
|
@PostConstruct
|
|
|
public void init() {
|
|
|
setConfig(generalBufferConfig);
|
|
@@ -87,7 +102,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
|
|
|
@Override
|
|
|
protected String getPartitionKey(WriterRequest request) {
|
|
|
- return request.getTableGroupId();
|
|
|
+ return request.getTableName();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -96,10 +111,10 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
response.addData(request.getRow());
|
|
|
}
|
|
|
if (request.getChangedOffset() != null) {
|
|
|
- response.addChangedOffset(request.getChangedOffset());
|
|
|
+ response.setChangedOffset(request.getChangedOffset());
|
|
|
}
|
|
|
if (!response.isMerged()) {
|
|
|
- response.setTableGroupId(request.getTableGroupId());
|
|
|
+ response.setTableName(request.getTableName());
|
|
|
response.setEvent(request.getEvent());
|
|
|
response.setTypeEnum(request.getTypeEnum());
|
|
|
response.setSql(request.getSql());
|
|
@@ -116,54 +131,32 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
|
|
|
@Override
|
|
|
public void pull(WriterResponse response) {
|
|
|
- // 0、获取配置信息
|
|
|
- final TableGroup tableGroup = getTableGroup(response.getTableGroupId());
|
|
|
- final Mapping mapping = profileComponent.getMapping(tableGroup.getMappingId());
|
|
|
- final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
|
|
|
-
|
|
|
- // 1、ddl解析
|
|
|
- if (ChangedEventTypeEnum.isDDL(response.getTypeEnum())) {
|
|
|
- parseDDl(response, mapping, group);
|
|
|
+ Meta meta = profileComponent.getMeta(response.getChangedOffset().getMetaId());
|
|
|
+ if (meta == null) {
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
- final Picker picker = new Picker(group.getFieldMapping());
|
|
|
- final List<Map> sourceDataList = response.getDataList();
|
|
|
- // 2、映射字段
|
|
|
- List<Map> targetDataList = picker.pickTargetData(sourceDataList);
|
|
|
-
|
|
|
- // 3、参数转换
|
|
|
- ConvertUtil.convert(group.getConvert(), targetDataList);
|
|
|
-
|
|
|
- // 4、插件转换
|
|
|
- final IncrementPluginContext context = new IncrementPluginContext();
|
|
|
- context.setSourceConnectorInstance(connectorFactory.connect(getConnectorConfig(mapping.getSourceConnectorId())));
|
|
|
- context.setTargetConnectorInstance(connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId())));
|
|
|
- context.setSourceTableName(group.getSourceTable().getName());
|
|
|
- context.setTargetTableName(group.getTargetTable().getName());
|
|
|
- context.setEvent(response.getEvent());
|
|
|
- context.setTargetFields(picker.getTargetFields());
|
|
|
- context.setCommand(group.getCommand());
|
|
|
- context.setBatchSize(generalBufferConfig.getBufferWriterCount());
|
|
|
- context.setSourceList(sourceDataList);
|
|
|
- context.setTargetList(targetDataList);
|
|
|
- context.setPluginExtInfo(group.getPluginExtInfo());
|
|
|
- context.setForceUpdate(mapping.isForceUpdate());
|
|
|
- pluginFactory.process(group.getPlugin(), context, ProcessEnum.CONVERT);
|
|
|
-
|
|
|
- // 5、批量执行同步
|
|
|
- Result result = parserComponent.writeBatch(context, getExecutor());
|
|
|
-
|
|
|
- // 6.发布刷新增量点事件
|
|
|
- applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
|
|
|
-
|
|
|
- // 7、持久化同步结果
|
|
|
- result.setTableGroupId(tableGroup.getId());
|
|
|
- result.setTargetTableGroupName(context.getTargetTableName());
|
|
|
- flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
|
|
|
-
|
|
|
- // 8、执行批量处理后的
|
|
|
- pluginFactory.process(group.getPlugin(), context, ProcessEnum.AFTER);
|
|
|
+ final Mapping mapping = profileComponent.getMapping(meta.getMappingId());
|
|
|
+ List<TableGroupPicker> pickers = tableGroupContext.getTableGroupPickers(meta.getId(), response.getTableName());
|
|
|
+
|
|
|
+ switch (response.getTypeEnum()) {
|
|
|
+ case DDL:
|
|
|
+ tableGroupContext.update(mapping, pickers.stream().map(picker -> {
|
|
|
+ TableGroup tableGroup = profileComponent.getTableGroup(picker.getTableGroup().getId());
|
|
|
+ parseDDl(response, mapping, tableGroup);
|
|
|
+ return tableGroup;
|
|
|
+ }).collect(Collectors.toList()));
|
|
|
+ break;
|
|
|
+ case SCAN:
|
|
|
+ pickers.forEach(picker -> distributeTableGroup(response, mapping, picker, picker.getSourceFields(), false));
|
|
|
+ break;
|
|
|
+ case ROW:
|
|
|
+ pickers.forEach(picker -> distributeTableGroup(response, mapping, picker, picker.getTableGroup().getSourceTable().getColumn(), true));
|
|
|
+ // 发布刷新增量点事件
|
|
|
+ applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getChangedOffset()));
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -182,16 +175,54 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
return generalExecutor;
|
|
|
}
|
|
|
|
|
|
- public TableGroup getTableGroup(String tableGroupId) {
|
|
|
- return profileComponent.getTableGroup(tableGroupId);
|
|
|
+ private void distributeTableGroup(WriterResponse response, Mapping mapping, TableGroupPicker tableGroupPicker, List<Field> sourceFields, boolean enableFilter) {
|
|
|
+ // 1、映射字段
|
|
|
+ boolean enableSchemaResolver = profileComponent.getSystemConfig().isEnableSchemaResolver();
|
|
|
+ ConnectorConfig sourceConfig = getConnectorConfig(mapping.getSourceConnectorId());
|
|
|
+ ConnectorService sourceConnector = connectorFactory.getConnectorService(sourceConfig.getConnectorType());
|
|
|
+ List<Map> sourceDataList = new ArrayList<>();
|
|
|
+ List<Map> targetDataList = tableGroupPicker.getPicker()
|
|
|
+ .setSourceResolver(enableSchemaResolver ? sourceConnector.getSchemaResolver() : null)
|
|
|
+ .pickTargetData(sourceFields, enableFilter, response.getDataList(), sourceDataList);
|
|
|
+ if (CollectionUtils.isEmpty(targetDataList)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2、参数转换
|
|
|
+ TableGroup tableGroup = tableGroupPicker.getTableGroup();
|
|
|
+ ConvertUtil.convert(tableGroup.getConvert(), targetDataList);
|
|
|
+
|
|
|
+ // 3、插件转换
|
|
|
+ final IncrementPluginContext context = new IncrementPluginContext();
|
|
|
+ context.setSourceConnectorInstance(connectorFactory.connect(sourceConfig));
|
|
|
+ context.setTargetConnectorInstance(connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId())));
|
|
|
+ context.setSourceTableName(tableGroup.getSourceTable().getName());
|
|
|
+ context.setTargetTableName(tableGroup.getTargetTable().getName());
|
|
|
+ context.setEvent(response.getEvent());
|
|
|
+ context.setTargetFields(tableGroupPicker.getTargetFields());
|
|
|
+ context.setCommand(tableGroup.getCommand());
|
|
|
+ context.setBatchSize(generalBufferConfig.getBufferWriterCount());
|
|
|
+ context.setSourceList(sourceDataList);
|
|
|
+ context.setTargetList(targetDataList);
|
|
|
+ context.setPluginExtInfo(tableGroup.getPluginExtInfo());
|
|
|
+ context.setForceUpdate(mapping.isForceUpdate());
|
|
|
+ context.setEnableSchemaResolver(enableSchemaResolver);
|
|
|
+ pluginFactory.process(tableGroup.getPlugin(), context, ProcessEnum.CONVERT);
|
|
|
+
|
|
|
+ // 4、批量执行同步
|
|
|
+ Result result = parserComponent.writeBatch(context, getExecutor());
|
|
|
+
|
|
|
+ // 5、持久化同步结果
|
|
|
+ result.setTableGroupId(tableGroup.getId());
|
|
|
+ result.setTargetTableGroupName(context.getTargetTableName());
|
|
|
+ flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
|
|
|
+
|
|
|
+ // 6、执行后置处理
|
|
|
+ pluginFactory.process(tableGroup.getPlugin(), context, ProcessEnum.AFTER);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 解析DDL
|
|
|
- *
|
|
|
- * @param response
|
|
|
- * @param mapping
|
|
|
- * @param tableGroup
|
|
|
*/
|
|
|
private void parseDDl(WriterResponse response, Mapping mapping, TableGroup tableGroup) {
|
|
|
try {
|
|
@@ -200,49 +231,44 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
String sConnType = sConnConfig.getConnectorType();
|
|
|
String tConnType = tConnConfig.getConnectorType();
|
|
|
// 0.生成目标表执行SQL(暂支持同源)
|
|
|
- if (StringUtil.equals(sConnType, tConnType)) {
|
|
|
- // 1.转换为目标SQL,执行到目标库
|
|
|
- String targetTableName = tableGroup.getTargetTable().getName();
|
|
|
- List<FieldMapping> originalFieldMappings = tableGroup.getFieldMapping();
|
|
|
- DDLConfig targetDDLConfig = ddlParser.parseDDlConfig(response.getSql(), tConnType, targetTableName, originalFieldMappings);
|
|
|
- final ConnectorInstance tConnectorInstance = connectorFactory.connect(tConnConfig);
|
|
|
- Result result = connectorFactory.writerDDL(tConnectorInstance, targetDDLConfig);
|
|
|
- result.setTableGroupId(tableGroup.getId());
|
|
|
- result.setTargetTableGroupName(targetTableName);
|
|
|
-
|
|
|
- // 2.获取目标表最新的属性字段
|
|
|
- MetaInfo targetMetaInfo = parserComponent.getMetaInfo(mapping.getTargetConnectorId(), targetTableName);
|
|
|
- MetaInfo originMetaInfo = parserComponent.getMetaInfo(mapping.getSourceConnectorId(), tableGroup.getSourceTable().getName());
|
|
|
-
|
|
|
- // 3.更新表字段映射(根据保留的更改的属性,进行更改)
|
|
|
- tableGroup.getSourceTable().setColumn(originMetaInfo.getColumn());
|
|
|
- tableGroup.getTargetTable().setColumn(targetMetaInfo.getColumn());
|
|
|
- tableGroup.setFieldMapping(ddlParser.refreshFiledMappings(originalFieldMappings, originMetaInfo, targetMetaInfo, targetDDLConfig));
|
|
|
-
|
|
|
- // 4.更新执行命令
|
|
|
- Map<String, String> commands = parserComponent.getCommand(mapping, tableGroup);
|
|
|
- tableGroup.setCommand(commands);
|
|
|
-
|
|
|
- // 5.持久化存储 & 更新缓存配置
|
|
|
- profileComponent.editTableGroup(tableGroup);
|
|
|
-
|
|
|
- // 6.发布更新事件,持久化增量数据
|
|
|
- applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
|
|
|
- flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
|
|
|
+ if (!StringUtil.equals(sConnType, tConnType)) {
|
|
|
+ logger.warn("暂只支持数据库同源并且是关系性解析DDL");
|
|
|
return;
|
|
|
}
|
|
|
+ // 1.转换为目标SQL,执行到目标库
|
|
|
+ String targetTableName = tableGroup.getTargetTable().getName();
|
|
|
+ ConnectorService connectorService = connectorFactory.getConnectorService(tConnType);
|
|
|
+ DDLConfig targetDDLConfig = ddlParser.parse(connectorService, tableGroup, response.getSql());
|
|
|
+ ConnectorInstance tConnectorInstance = connectorFactory.connect(tConnConfig);
|
|
|
+ Result result = connectorFactory.writerDDL(tConnectorInstance, targetDDLConfig);
|
|
|
+ result.setTableGroupId(tableGroup.getId());
|
|
|
+ result.setTargetTableGroupName(targetTableName);
|
|
|
+
|
|
|
+ // 2.获取目标表最新的属性字段
|
|
|
+ MetaInfo sourceMetaInfo = parserComponent.getMetaInfo(mapping.getSourceConnectorId(), tableGroup.getSourceTable().getName());
|
|
|
+ MetaInfo targetMetaInfo = parserComponent.getMetaInfo(mapping.getTargetConnectorId(), targetTableName);
|
|
|
+
|
|
|
+ // 3.更新表字段映射(根据保留的更改的属性,进行更改)
|
|
|
+ tableGroup.getSourceTable().setColumn(sourceMetaInfo.getColumn());
|
|
|
+ tableGroup.getTargetTable().setColumn(targetMetaInfo.getColumn());
|
|
|
+ ddlParser.refreshFiledMappings(tableGroup, targetDDLConfig);
|
|
|
+
|
|
|
+ // 4.更新执行命令
|
|
|
+ tableGroup.setCommand(parserComponent.getCommand(mapping, tableGroup));
|
|
|
+
|
|
|
+ // 5.持久化存储 & 更新缓存配置
|
|
|
+ profileComponent.editTableGroup(tableGroup);
|
|
|
+
|
|
|
+ // 6.发布更新事件,持久化增量数据
|
|
|
+ applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getChangedOffset()));
|
|
|
+ flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
|
|
|
} catch (Exception e) {
|
|
|
logger.error(e.getMessage(), e);
|
|
|
- return;
|
|
|
}
|
|
|
- logger.warn("暂只支持数据库同源并且是关系性解析DDL");
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取连接器配置
|
|
|
- *
|
|
|
- * @param connectorId
|
|
|
- * @return
|
|
|
*/
|
|
|
private ConnectorConfig getConnectorConfig(String connectorId) {
|
|
|
Assert.hasText(connectorId, "Connector id can not be empty.");
|