|
@@ -1,5 +1,7 @@
|
|
|
package org.dbsyncer.parser.flush.impl;
|
|
|
|
|
|
+import java.util.Date;
|
|
|
+import java.util.LinkedList;
|
|
|
import org.dbsyncer.cache.CacheService;
|
|
|
import org.dbsyncer.common.config.GeneralBufferConfig;
|
|
|
import org.dbsyncer.common.event.RefreshOffsetEvent;
|
|
@@ -13,24 +15,31 @@ import org.dbsyncer.connector.ConnectorFactory;
|
|
|
import org.dbsyncer.connector.config.DDLConfig;
|
|
|
import org.dbsyncer.connector.constant.ConnectorConstant;
|
|
|
import org.dbsyncer.connector.enums.ConnectorEnum;
|
|
|
+import org.dbsyncer.connector.model.Field;
|
|
|
import org.dbsyncer.connector.model.MetaInfo;
|
|
|
import org.dbsyncer.parser.Parser;
|
|
|
import org.dbsyncer.parser.ParserFactory;
|
|
|
import org.dbsyncer.parser.ddl.DDLParser;
|
|
|
import org.dbsyncer.parser.flush.AbstractBufferActuator;
|
|
|
import org.dbsyncer.parser.model.BatchWriter;
|
|
|
+import org.dbsyncer.parser.model.ConfigModel;
|
|
|
import org.dbsyncer.parser.model.Connector;
|
|
|
+import org.dbsyncer.parser.model.FieldMapping;
|
|
|
import org.dbsyncer.parser.model.Mapping;
|
|
|
import org.dbsyncer.parser.model.Picker;
|
|
|
import org.dbsyncer.parser.model.TableGroup;
|
|
|
import org.dbsyncer.parser.model.WriterRequest;
|
|
|
import org.dbsyncer.parser.model.WriterResponse;
|
|
|
import org.dbsyncer.parser.strategy.FlushStrategy;
|
|
|
+import org.dbsyncer.parser.util.ConfigModelUtil;
|
|
|
import org.dbsyncer.parser.util.ConvertUtil;
|
|
|
import org.dbsyncer.parser.util.PickerUtil;
|
|
|
import org.dbsyncer.plugin.PluginFactory;
|
|
|
+import org.dbsyncer.storage.StorageService;
|
|
|
+import org.dbsyncer.storage.enums.StorageEnum;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.context.ApplicationContext;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.Assert;
|
|
@@ -74,6 +83,9 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
@Resource
|
|
|
private CacheService cacheService;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private StorageService storageService;
|
|
|
+
|
|
|
@Resource
|
|
|
private ApplicationContext applicationContext;
|
|
|
|
|
@@ -172,7 +184,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
*
|
|
|
* @param response
|
|
|
* @param mapping
|
|
|
- * @param group
|
|
|
+ * @param tableGroup
|
|
|
*/
|
|
|
private void parseDDl(WriterResponse response, Mapping mapping, TableGroup tableGroup) {
|
|
|
try {
|
|
@@ -183,7 +195,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
// 0.生成目标表执行SQL(暂支持MySQL) fixme AE86 暂内测MySQL作为试运行版本
|
|
|
if (StringUtil.equals(sConnType, tConnType) && StringUtil.equals(ConnectorEnum.MYSQL.getType(), tConnType)) {
|
|
|
String targetTableName = tableGroup.getTargetTable().getName();
|
|
|
- DDLConfig targetDDLConfig = ddlParser.parseDDlConfig(response.getSql(), tConnType, targetTableName);
|
|
|
+ DDLConfig targetDDLConfig = ddlParser.parseDDlConfig(response.getSql(), tConnType, targetTableName,tableGroup.getFieldMapping());
|
|
|
final ConnectorMapper tConnectorMapper = connectorFactory.connect(tConnConfig);
|
|
|
Result result = connectorFactory.writerDDL(tConnectorMapper, targetDDLConfig);
|
|
|
result.setTableGroupId(tableGroup.getId());
|
|
@@ -191,23 +203,48 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
|
|
|
// TODO life
|
|
|
// 1.获取目标表最新的属性字段
|
|
|
- MetaInfo metaInfo = parser.getMetaInfo(mapping.getTargetConnectorId(), targetTableName);
|
|
|
+ MetaInfo tagertMetaInfo = parser.getMetaInfo(mapping.getTargetConnectorId(), targetTableName);
|
|
|
+ MetaInfo originMetaInfo = parser.getMetaInfo(mapping.getSourceConnectorId(),tableGroup.getSourceTable().getName());
|
|
|
// 1.1 参考 org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker.refreshTableFields
|
|
|
+ //上面已经是刷新了
|
|
|
+
|
|
|
// 1.2 要注意,表支持自定义主键,要兼容处理
|
|
|
+ //主键问题还未涉及到这种情况,可能还没写,可能要是删除主键会需要考虑,其他的情况我应该不会动
|
|
|
|
|
|
// 2.更新TableGroup.targetTable
|
|
|
- tableGroup.getTargetTable().setColumn(metaInfo.getColumn());
|
|
|
-
|
|
|
- // 3.更新表字段映射(添加相似字段)
|
|
|
- // 3.1 参考 org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker.matchSimilarFieldMapping
|
|
|
-
|
|
|
- // 4.合并驱动配置 & 更新TableGroup.command
|
|
|
+ tableGroup.getSourceTable().setColumn(originMetaInfo.getColumn());
|
|
|
+ tableGroup.getTargetTable().setColumn(tagertMetaInfo.getColumn());
|
|
|
+
|
|
|
+ // 3.更新表字段映射(根据保留的更改的属性,进行更改)
|
|
|
+ List<FieldMapping> fieldMappingList = tableGroup.getFieldMapping(); // 获得刚开始filedMapping
|
|
|
+ List<FieldMapping> targetMappingList = new LinkedList<>();//替换完成后的filedMapping
|
|
|
+ for (FieldMapping fieldMapping:fieldMappingList) {
|
|
|
+ String fieldSourceName = fieldMapping.getSource().getName();
|
|
|
+ String filedTargetName = fieldMapping.getTarget().getName();
|
|
|
+ //找到更改的源表的名称,也就是找到了对应的映射关系,这样就可以从源表找到更改后的名称进行对应,
|
|
|
+ if (fieldSourceName.equals(targetDDLConfig.getSourceColumnName())){
|
|
|
+ //找到源表的字段
|
|
|
+ if (targetDDLConfig.getSourceColumnName().equals(targetDDLConfig.getChangedColumnName())){//说明字段名没有改变,只是改变了属性
|
|
|
+ Field source = originMetaInfo.getColumn().stream()
|
|
|
+ .filter(x->x.getName().equals(fieldSourceName)).findFirst().get();
|
|
|
+ Field target = tagertMetaInfo.getColumn().stream()
|
|
|
+ .filter(x->x.getName().equals(filedTargetName)).findFirst().get();
|
|
|
+ //替换
|
|
|
+ targetMappingList.add(new FieldMapping(source,target)) ;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ targetMappingList.add(fieldMapping);
|
|
|
+ }
|
|
|
+ tableGroup.setFieldMapping(targetMappingList);
|
|
|
+ // 4.合并驱动配置 & 更新TableGroup.command 合并驱动应该不需要了,我只是把该替换的地方替换掉了,原来的还是保持一致,应该需要更新TableGroup.command
|
|
|
// 4.1 参考 org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker.mergeConfig
|
|
|
-
|
|
|
+ Map<String, String> commands = parser.getCommand(mapping,tableGroup);
|
|
|
+ tableGroup.setCommand(commands);
|
|
|
// 5.持久化存储 & 更新缓存
|
|
|
// 5.1 参考 org.dbsyncer.manager.ManagerFactory.editConfigModel
|
|
|
// 将方法移动到parser模块,就可以复用实现
|
|
|
-
|
|
|
+ flushCache(tableGroup);
|
|
|
applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
|
|
|
flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
|
|
|
}
|
|
@@ -233,4 +270,30 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
this.generalExecutor = generalExecutor;
|
|
|
}
|
|
|
|
|
|
+ public void flushCache(TableGroup tableGroup){
|
|
|
+ // 1、解析配置
|
|
|
+ ConfigModel model = tableGroup;
|
|
|
+ model.setCreateTime(new Date().getTime());
|
|
|
+ model.setUpdateTime(new Date().getTime());
|
|
|
+ Assert.notNull(model, "ConfigModel can not be null.");
|
|
|
+
|
|
|
+ // 2、持久化
|
|
|
+ Map<String, Object> params = ConfigModelUtil.convertModelToMap(model);
|
|
|
+ logger.debug("params:{}", params);
|
|
|
+ storageService.edit(StorageEnum.CONFIG, params);
|
|
|
+
|
|
|
+ // 3、缓存
|
|
|
+ // 1、缓存
|
|
|
+ Assert.notNull(model, "ConfigModel can not be null.");
|
|
|
+ String id = model.getId();
|
|
|
+ cacheService.put(id, model);
|
|
|
+
|
|
|
+ // 2、分组
|
|
|
+// String groupId = tableGroup.getId();
|
|
|
+// cacheService.putIfAbsent(groupId, new Group());
|
|
|
+// Group group = cacheService.get(groupId, Group.class);
|
|
|
+// group.addIfAbsent(id);
|
|
|
+// logger.debug("Put the model [{}] for {} group into cache.", id, groupId);
|
|
|
+ }
|
|
|
+
|
|
|
}
|