|
@@ -1,7 +1,5 @@
|
|
|
package org.dbsyncer.parser.flush.impl;
|
|
|
|
|
|
-import java.util.Date;
|
|
|
-import java.util.LinkedList;
|
|
|
import org.dbsyncer.cache.CacheService;
|
|
|
import org.dbsyncer.common.config.GeneralBufferConfig;
|
|
|
import org.dbsyncer.common.event.RefreshOffsetEvent;
|
|
@@ -15,11 +13,8 @@ import org.dbsyncer.connector.ConnectorFactory;
|
|
|
import org.dbsyncer.connector.config.DDLConfig;
|
|
|
import org.dbsyncer.connector.constant.ConnectorConstant;
|
|
|
import org.dbsyncer.connector.enums.ConnectorEnum;
|
|
|
-import org.dbsyncer.connector.enums.DDLOperationEnum;
|
|
|
-import org.dbsyncer.connector.model.Field;
|
|
|
import org.dbsyncer.connector.model.MetaInfo;
|
|
|
import org.dbsyncer.parser.Parser;
|
|
|
-import org.dbsyncer.parser.ParserFactory;
|
|
|
import org.dbsyncer.parser.ddl.DDLParser;
|
|
|
import org.dbsyncer.parser.flush.AbstractBufferActuator;
|
|
|
import org.dbsyncer.parser.model.BatchWriter;
|
|
@@ -40,13 +35,13 @@ import org.dbsyncer.storage.StorageService;
|
|
|
import org.dbsyncer.storage.enums.StorageEnum;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.context.ApplicationContext;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.Assert;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
import javax.annotation.Resource;
|
|
|
+import java.util.Date;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.Executor;
|
|
@@ -84,7 +79,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
@Resource
|
|
|
private CacheService cacheService;
|
|
|
|
|
|
- @Autowired
|
|
|
+ @Resource
|
|
|
private StorageService storageService;
|
|
|
|
|
|
@Resource
|
|
@@ -196,7 +191,8 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
// 0.生成目标表执行SQL(暂支持MySQL) fixme AE86 暂内测MySQL作为试运行版本
|
|
|
if (StringUtil.equals(sConnType, tConnType) && StringUtil.equals(ConnectorEnum.MYSQL.getType(), tConnType)) {
|
|
|
String targetTableName = tableGroup.getTargetTable().getName();
|
|
|
- DDLConfig targetDDLConfig = ddlParser.parseDDlConfig(response.getSql(), tConnType, targetTableName,tableGroup.getFieldMapping());
|
|
|
+ List<FieldMapping> originalFieldMappings = tableGroup.getFieldMapping();
|
|
|
+ DDLConfig targetDDLConfig = ddlParser.parseDDlConfig(response.getSql(), tConnType, targetTableName, originalFieldMappings);
|
|
|
final ConnectorMapper tConnectorMapper = connectorFactory.connect(tConnConfig);
|
|
|
Result result = connectorFactory.writerDDL(tConnectorMapper, targetDDLConfig);
|
|
|
result.setTableGroupId(tableGroup.getId());
|
|
@@ -205,7 +201,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
// TODO life
|
|
|
// 1.获取目标表最新的属性字段
|
|
|
MetaInfo targetMetaInfo = parser.getMetaInfo(mapping.getTargetConnectorId(), targetTableName);
|
|
|
- MetaInfo originMetaInfo = parser.getMetaInfo(mapping.getSourceConnectorId(),tableGroup.getSourceTable().getName());
|
|
|
+ MetaInfo originMetaInfo = parser.getMetaInfo(mapping.getSourceConnectorId(), tableGroup.getSourceTable().getName());
|
|
|
// 1.1 参考 org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker.refreshTableFields
|
|
|
//上面已经是刷新了
|
|
|
|
|
@@ -217,14 +213,10 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
tableGroup.getTargetTable().setColumn(targetMetaInfo.getColumn());
|
|
|
|
|
|
// 3.更新表字段映射(根据保留的更改的属性,进行更改)
|
|
|
- List<FieldMapping> fieldMappingList = tableGroup.getFieldMapping(); // 获得刚开始filedMapping
|
|
|
- List<FieldMapping> targetMappingList = new LinkedList<>();//替换完成后的filedMapping
|
|
|
- refreshFiledMapping(targetMappingList,fieldMappingList,originMetaInfo,targetMetaInfo,targetDDLConfig);
|
|
|
-
|
|
|
- tableGroup.setFieldMapping(targetMappingList);
|
|
|
+ tableGroup.setFieldMapping(ddlParser.refreshFiledMappings(originalFieldMappings, originMetaInfo, targetMetaInfo, targetDDLConfig));
|
|
|
// 4.合并驱动配置 & 更新TableGroup.command 合并驱动应该不需要了,我只是把该替换的地方替换掉了,原来的还是保持一致,应该需要更新TableGroup.command
|
|
|
// 4.1 参考 org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker.mergeConfig
|
|
|
- Map<String, String> commands = parser.getCommand(mapping,tableGroup);
|
|
|
+ Map<String, String> commands = parser.getCommand(mapping, tableGroup);
|
|
|
tableGroup.setCommand(commands);
|
|
|
// 5.持久化存储 & 更新缓存
|
|
|
// 5.1 参考 org.dbsyncer.manager.ManagerFactory.editConfigModel
|
|
@@ -232,65 +224,12 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
flushCache(tableGroup);
|
|
|
applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
|
|
|
flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
|
|
|
+ return;
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
logger.error(e.getMessage(), e);
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- //根据原来的映射关系和更改的字段进行新关系的映射组合
|
|
|
- private void refreshFiledMapping(List<FieldMapping> targetMappingList, List<FieldMapping> fieldMappingList,MetaInfo originMetaInfo,
|
|
|
- MetaInfo targetMetaInfo,DDLConfig targetDDLConfig) {
|
|
|
- //处理映射关系
|
|
|
- for (FieldMapping fieldMapping:fieldMappingList) {
|
|
|
- String fieldSourceName = fieldMapping.getSource().getName();
|
|
|
- String filedTargetName = fieldMapping.getTarget().getName();
|
|
|
- //找到更改的源表的名称,也就是找到了对应的映射关系,这样就可以从源表找到更改后的名称进行对应,
|
|
|
- if (fieldSourceName.equals(targetDDLConfig.getSourceColumnName())){
|
|
|
- //找到源表的字段
|
|
|
- if (targetDDLConfig.getDdlOperationEnum() == DDLOperationEnum.ALTER_MODIFY){//说明字段名没有改变,只是改变了属性
|
|
|
- Field source = originMetaInfo.getColumn().stream()
|
|
|
- .filter(x->x.getName().equals(fieldSourceName)).findFirst().get();
|
|
|
- Field target = targetMetaInfo.getColumn().stream()
|
|
|
- .filter(x->x.getName().equals(filedTargetName)).findFirst().get();
|
|
|
- //替换
|
|
|
- targetMappingList.add(new FieldMapping(source,target)) ;
|
|
|
- continue;
|
|
|
- }else if (targetDDLConfig.getDdlOperationEnum() == DDLOperationEnum.ALTER_CHANGE){//改变名称
|
|
|
- Field source = originMetaInfo.getColumn().stream()
|
|
|
- .filter(x->x.getName().equals(targetDDLConfig.getChangedColumnName())).findFirst().get();
|
|
|
- Field target = targetMetaInfo.getColumn().stream()
|
|
|
- .filter(x->x.getName().equals(targetDDLConfig.getChangedColumnName())).findFirst().get();
|
|
|
- //替换
|
|
|
- targetMappingList.add(new FieldMapping(source,target)) ;
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
- targetMappingList.add(fieldMapping);
|
|
|
- }
|
|
|
-
|
|
|
- if (DDLOperationEnum.ALTER_ADD == targetDDLConfig.getDdlOperationEnum()){
|
|
|
- //处理新增的映射关系
|
|
|
- List<Field> addFields = targetDDLConfig.getAddFields();
|
|
|
- for (Field field:addFields) {
|
|
|
- Field source = originMetaInfo.getColumn().stream()
|
|
|
- .filter(x->x.getName().equals(field.getName())).findFirst().get();
|
|
|
- Field target = targetMetaInfo.getColumn().stream()
|
|
|
- .filter(x->x.getName().equals(field.getName())).findFirst().get();
|
|
|
- FieldMapping fieldMapping =new FieldMapping(source,target);
|
|
|
- targetMappingList.add(fieldMapping);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (DDLOperationEnum.ALTER_DROP == targetDDLConfig.getDdlOperationEnum()){
|
|
|
- //处理删除字段的映射关系
|
|
|
- List<Field> removeFields = targetDDLConfig.getRemoveFields();
|
|
|
- for (Field field:removeFields) {
|
|
|
- targetMappingList.removeIf(x->x.getSource().getName().equals(field.getName()));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
+ logger.warn("暂只支持MYSQL解析DDL");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -306,11 +245,12 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
return conn.getConfig();
|
|
|
}
|
|
|
|
|
|
- public void setGeneralExecutor(Executor generalExecutor) {
|
|
|
- this.generalExecutor = generalExecutor;
|
|
|
- }
|
|
|
-
|
|
|
- public void flushCache(TableGroup tableGroup){
|
|
|
+ /**
|
|
|
+ * 持久化驱动配置
|
|
|
+ *
|
|
|
+ * @param tableGroup
|
|
|
+ */
|
|
|
+ private void flushCache(TableGroup tableGroup) {
|
|
|
// 1、解析配置
|
|
|
ConfigModel model = tableGroup;
|
|
|
model.setCreateTime(new Date().getTime());
|
|
@@ -323,17 +263,12 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
storageService.edit(StorageEnum.CONFIG, params);
|
|
|
|
|
|
// 3、缓存
|
|
|
- // 1、缓存
|
|
|
Assert.notNull(model, "ConfigModel can not be null.");
|
|
|
- String id = model.getId();
|
|
|
- cacheService.put(id, model);
|
|
|
-
|
|
|
- // 2、分组
|
|
|
-// String groupId = tableGroup.getId();
|
|
|
-// cacheService.putIfAbsent(groupId, new Group());
|
|
|
-// Group group = cacheService.get(groupId, Group.class);
|
|
|
-// group.addIfAbsent(id);
|
|
|
-// logger.debug("Put the model [{}] for {} group into cache.", id, groupId);
|
|
|
+ cacheService.put(model.getId(), model);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setGeneralExecutor(Executor generalExecutor) {
|
|
|
+ this.generalExecutor = generalExecutor;
|
|
|
}
|
|
|
|
|
|
}
|