|
@@ -12,13 +12,13 @@ import org.dbsyncer.common.util.StringUtil;
|
|
|
import org.dbsyncer.connector.base.ConnectorFactory;
|
|
|
import org.dbsyncer.parser.ParserComponent;
|
|
|
import org.dbsyncer.parser.ProfileComponent;
|
|
|
+import org.dbsyncer.parser.TableGroupContext;
|
|
|
import org.dbsyncer.parser.ddl.DDLParser;
|
|
|
import org.dbsyncer.parser.event.RefreshOffsetEvent;
|
|
|
import org.dbsyncer.parser.flush.AbstractBufferActuator;
|
|
|
import org.dbsyncer.parser.model.*;
|
|
|
import org.dbsyncer.parser.strategy.FlushStrategy;
|
|
|
import org.dbsyncer.parser.util.ConvertUtil;
|
|
|
-import org.dbsyncer.parser.util.PickerUtil;
|
|
|
import org.dbsyncer.plugin.PluginFactory;
|
|
|
import org.dbsyncer.plugin.enums.ProcessEnum;
|
|
|
import org.dbsyncer.plugin.impl.IncrementPluginContext;
|
|
@@ -80,6 +80,9 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
@Resource
|
|
|
private DDLParser ddlParser;
|
|
|
|
|
|
+ @Resource
|
|
|
+ private TableGroupContext tableGroupContext;
|
|
|
+
|
|
|
@PostConstruct
|
|
|
public void init() {
|
|
|
setConfig(generalBufferConfig);
|
|
@@ -117,89 +120,81 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
|
|
|
@Override
|
|
|
public void pull(WriterResponse response) {
|
|
|
- // TODO add cache
|
|
|
- List<TableGroup> groupAll = profileComponent.getTableGroupAll(response.getChangedOffset().getMetaId());
|
|
|
- if (!CollectionUtils.isEmpty(groupAll)) {
|
|
|
- groupAll.forEach(tableGroup -> {
|
|
|
- if (StringUtil.equals(tableGroup.getSourceTable().getName(), response.getTableName())) {
|
|
|
- distributeTableGroup(response, tableGroup);
|
|
|
- }
|
|
|
- });
|
|
|
+ Meta meta = profileComponent.getMeta(response.getChangedOffset().getMetaId());
|
|
|
+ if (meta == null) {
|
|
|
+ return;
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- private void distributeTableGroup(WriterResponse response, TableGroup tableGroup) {
|
|
|
- // 0、获取配置信息
|
|
|
- final Mapping mapping = profileComponent.getMapping(tableGroup.getMappingId());
|
|
|
- final TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
|
|
|
+ final Mapping mapping = profileComponent.getMapping(meta.getMappingId());
|
|
|
+ List<TableGroup> tableGroups = tableGroupContext.getTableGroups(mapping, response.getTableName());
|
|
|
|
|
|
// 1、ddl解析
|
|
|
if (ChangedEventTypeEnum.isDDL(response.getTypeEnum())) {
|
|
|
- parseDDl(response, mapping, group);
|
|
|
+ tableGroups.forEach(tableGroup -> parseDDl(response, mapping, tableGroup));
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // 2、映射字段
|
|
|
- final Picker picker = new Picker(group.getFieldMapping());
|
|
|
+ // 2、dml解析
|
|
|
+ tableGroups.forEach(tableGroup -> distributeTableGroup(response, mapping, tableGroup));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void offerFailed(BlockingQueue<WriterRequest> queue, WriterRequest request) {
|
|
|
+ throw new QueueOverflowException("缓存队列已满");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void meter(TimeRegistry timeRegistry, long count) {
|
|
|
+ // 统计执行器同步效率TPS
|
|
|
+ timeRegistry.meter(TimeRegistry.GENERAL_BUFFER_ACTUATOR_TPS).add(count);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Executor getExecutor() {
|
|
|
+ return generalExecutor;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void distributeTableGroup(WriterResponse response, Mapping mapping, TableGroup tableGroup) {
|
|
|
+ // 1、映射字段
|
|
|
+ final Picker picker = new Picker(tableGroup.getFieldMapping());
|
|
|
List<Map> sourceDataList = new ArrayList<>();
|
|
|
List<Map> targetDataList = picker.pickTargetData(response.getDataList(), sourceDataList);
|
|
|
|
|
|
- // 3、参数转换
|
|
|
- ConvertUtil.convert(group.getConvert(), targetDataList);
|
|
|
+ // 2、参数转换
|
|
|
+ ConvertUtil.convert(tableGroup.getConvert(), targetDataList);
|
|
|
|
|
|
- // 4、插件转换
|
|
|
+ // 3、插件转换
|
|
|
final IncrementPluginContext context = new IncrementPluginContext();
|
|
|
context.setSourceConnectorInstance(connectorFactory.connect(getConnectorConfig(mapping.getSourceConnectorId())));
|
|
|
context.setTargetConnectorInstance(connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId())));
|
|
|
- context.setSourceTableName(group.getSourceTable().getName());
|
|
|
- context.setTargetTableName(group.getTargetTable().getName());
|
|
|
+ context.setSourceTableName(tableGroup.getSourceTable().getName());
|
|
|
+ context.setTargetTableName(tableGroup.getTargetTable().getName());
|
|
|
context.setEvent(response.getEvent());
|
|
|
context.setTargetFields(picker.getTargetFields());
|
|
|
- context.setCommand(group.getCommand());
|
|
|
+ context.setCommand(tableGroup.getCommand());
|
|
|
context.setBatchSize(generalBufferConfig.getBufferWriterCount());
|
|
|
context.setSourceList(sourceDataList);
|
|
|
context.setTargetList(targetDataList);
|
|
|
- context.setPluginExtInfo(group.getPluginExtInfo());
|
|
|
+ context.setPluginExtInfo(tableGroup.getPluginExtInfo());
|
|
|
context.setForceUpdate(mapping.isForceUpdate());
|
|
|
- pluginFactory.process(group.getPlugin(), context, ProcessEnum.CONVERT);
|
|
|
+ pluginFactory.process(tableGroup.getPlugin(), context, ProcessEnum.CONVERT);
|
|
|
|
|
|
- // 5、批量执行同步
|
|
|
+ // 4、批量执行同步
|
|
|
Result result = parserComponent.writeBatch(context, getExecutor());
|
|
|
|
|
|
- // 6.发布刷新增量点事件
|
|
|
+ // 5.发布刷新增量点事件
|
|
|
applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getChangedOffset()));
|
|
|
|
|
|
- // 7、持久化同步结果
|
|
|
+ // 6、持久化同步结果
|
|
|
result.setTableGroupId(tableGroup.getId());
|
|
|
result.setTargetTableGroupName(context.getTargetTableName());
|
|
|
flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
|
|
|
|
|
|
- // 8、执行批量处理后的
|
|
|
- pluginFactory.process(group.getPlugin(), context, ProcessEnum.AFTER);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected void offerFailed(BlockingQueue<WriterRequest> queue, WriterRequest request) {
|
|
|
- throw new QueueOverflowException("缓存队列已满");
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected void meter(TimeRegistry timeRegistry, long count) {
|
|
|
- // 统计执行器同步效率TPS
|
|
|
- timeRegistry.meter(TimeRegistry.GENERAL_BUFFER_ACTUATOR_TPS).add(count);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Executor getExecutor() {
|
|
|
- return generalExecutor;
|
|
|
+ // 7、执行批量处理后的
|
|
|
+ pluginFactory.process(tableGroup.getPlugin(), context, ProcessEnum.AFTER);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 解析DDL
|
|
|
- *
|
|
|
- * @param response
|
|
|
- * @param mapping
|
|
|
- * @param tableGroup
|
|
|
*/
|
|
|
private void parseDDl(WriterResponse response, Mapping mapping, TableGroup tableGroup) {
|
|
|
try {
|
|
@@ -248,9 +243,6 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
|
|
|
/**
|
|
|
* 获取连接器配置
|
|
|
- *
|
|
|
- * @param connectorId
|
|
|
- * @return
|
|
|
*/
|
|
|
private ConnectorConfig getConnectorConfig(String connectorId) {
|
|
|
Assert.hasText(connectorId, "Connector id can not be empty.");
|