Kaynağa Gözat

修复手动同步

AE86 1 yıl önce
ebeveyn
işleme
8b0e546b63

+ 8 - 8
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -103,7 +103,7 @@ public class DataSyncServiceImpl implements DataSyncService {
         }
 
         // 3、反序列
-        Map<String, Object> map = new HashMap<>();
+        Map<String, Object> target = new HashMap<>();
         final Picker picker = new Picker(tableGroup.getFieldMapping());
         final Map<String, Field> fieldMap = picker.getTargetFieldMap();
         BinlogMap message = BinlogMap.parseFrom(bytes);
@@ -115,17 +115,17 @@ public class DataSyncServiceImpl implements DataSyncService {
                     if (null != val && val instanceof byte[]) {
                         byte[] b = (byte[]) val;
                         if (b.length > 128) {
-                            map.put(k, String.format("byte[%d]", b.length));
+                            target.put(k, String.format("byte[%d]", b.length));
                             return;
                         }
-                        map.put(k, Arrays.toString(b));
+                        target.put(k, Arrays.toString(b));
                         return;
                     }
                 }
-                map.put(k, val);
+                target.put(k, val);
             }
         });
-        return map;
+        return target;
     }
 
     @Override
@@ -138,7 +138,6 @@ public class DataSyncServiceImpl implements DataSyncService {
         try {
             Map row = monitor.getData(metaId, messageId);
             Map binlogData = getBinlogData(row, false);
-            // 历史数据不支持手动同步
             if (CollectionUtils.isEmpty(binlogData)) {
                 return messageId;
             }
@@ -152,8 +151,9 @@ public class DataSyncServiceImpl implements DataSyncService {
             TableGroup tableGroup = manager.getTableGroup(tableGroupId);
             String sourceTableName = tableGroup.getSourceTable().getName();
             RowChangedEvent changedEvent = new RowChangedEvent(sourceTableName, event, Collections.EMPTY_LIST);
-            // TODO 应转换为源字段
-            changedEvent.setChangedRow(binlogData);
+            // 转换为源字段
+            final Picker picker = new Picker(tableGroup.getFieldMapping());
+            changedEvent.setChangedRow(picker.pickSourceData(binlogData));
             parser.execute(tableGroupId, changedEvent);
             monitor.removeData(metaId, messageId);
             // 更新失败数

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -269,7 +269,7 @@ public class ParserFactory implements Parser {
             }
 
             // 2、映射字段
-            List<Map> target = picker.pickData(source);
+            List<Map> target = picker.pickTargetData(source);
 
             // 3、参数转换
             ConvertUtil.convert(group.getConvert(), target);

+ 10 - 16
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -139,7 +139,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         final Picker picker = new Picker(group.getFieldMapping());
         final List<Map> sourceDataList = response.getDataList();
         // 2、映射字段
-        List<Map> targetDataList = picker.pickData(sourceDataList);
+        List<Map> targetDataList = picker.pickTargetData(sourceDataList);
 
         // 3、参数转换
         ConvertUtil.convert(group.getConvert(), targetDataList);
@@ -190,6 +190,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
             String tConnType = tConnConfig.getConnectorType();
             // 0.生成目标表执行SQL(暂支持MySQL) fixme AE86 暂内测MySQL作为试运行版本
             if (StringUtil.equals(sConnType, tConnType) && StringUtil.equals(ConnectorEnum.MYSQL.getType(), tConnType)) {
+                // 1.转换为目标SQL,执行到目标库
                 String targetTableName = tableGroup.getTargetTable().getName();
                 List<FieldMapping> originalFieldMappings = tableGroup.getFieldMapping();
                 DDLConfig targetDDLConfig = ddlParser.parseDDlConfig(response.getSql(), tConnType, targetTableName, originalFieldMappings);
@@ -198,30 +199,23 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
                 result.setTableGroupId(tableGroup.getId());
                 result.setTargetTableGroupName(targetTableName);
 
-                // TODO life
-                // 1.获取目标表最新的属性字段
+                // 2.获取目标表最新的属性字段
                 MetaInfo targetMetaInfo = parser.getMetaInfo(mapping.getTargetConnectorId(), targetTableName);
                 MetaInfo originMetaInfo = parser.getMetaInfo(mapping.getSourceConnectorId(), tableGroup.getSourceTable().getName());
-                // 1.1 参考 org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker.refreshTableFields
-                //上面已经是刷新了
 
-                // 1.2 要注意,表支持自定义主键,要兼容处理
-                //主键问题还未涉及到这种情况,可能还没写,可能要是删除主键会需要考虑,其他的情况我应该不会动
-
-                // 2.更新TableGroup.targetTable
+                // 3.更新表字段映射(根据保留的更改的属性,进行更改)
                 tableGroup.getSourceTable().setColumn(originMetaInfo.getColumn());
                 tableGroup.getTargetTable().setColumn(targetMetaInfo.getColumn());
-
-                // 3.更新表字段映射(根据保留的更改的属性,进行更改)
                 tableGroup.setFieldMapping(ddlParser.refreshFiledMappings(originalFieldMappings, originMetaInfo, targetMetaInfo, targetDDLConfig));
-                // 4.合并驱动配置 & 更新TableGroup.command 合并驱动应该不需要了,我只是把该替换的地方替换掉了,原来的还是保持一致,应该需要更新TableGroup.command
-                // 4.1 参考 org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker.mergeConfig
+
+                // 4.更新执行命令
                 Map<String, String> commands = parser.getCommand(mapping, tableGroup);
                 tableGroup.setCommand(commands);
-                // 5.持久化存储 & 更新缓存
-                // 5.1 参考 org.dbsyncer.manager.ManagerFactory.editConfigModel
-                // 将方法移动到parser模块,就可以复用实现
+
+                // 5.持久化存储 & 更新缓存配置
                 flushCache(tableGroup);
+
+                // 6.发布更新事件,持久化增量数据
                 applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
                 flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
                 return;

+ 14 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -31,28 +31,35 @@ public class Picker {
         }
     }
 
-    public List<Map> pickData(List<Map> data) {
+    public List<Map> pickTargetData(List<Map> source) {
         List<Map> targetMapList = new ArrayList<>();
-        if (!CollectionUtils.isEmpty(data)) {
-            final int size = data.size();
+        if (!CollectionUtils.isEmpty(source)) {
+            final int size = source.size();
             final int sFieldSize = sourceFields.size();
             Map<String, Object> target = null;
             for (int i = 0; i < size; i++) {
                 target = new HashMap<>();
-                exchange(sFieldSize, sourceFields, targetFields, data.get(i), target);
+                exchange(sFieldSize, sourceFields, targetFields, source.get(i), target);
                 targetMapList.add(target);
             }
         }
         return targetMapList;
     }
 
-    private void exchange(int sFieldSize, List<Field> sFields, List<Field> tFields, Map<String, Object> source,
-                          Map<String, Object> target) {
+    public Map pickSourceData(Map target) {
+        Map<String, Object> source = new HashMap<>();
+        if (!CollectionUtils.isEmpty(target)) {
+            exchange(targetFields.size(), targetFields, sourceFields, target, source);
+        }
+        return source;
+    }
+
+    private void exchange(int fieldSize, List<Field> sFields, List<Field> tFields, Map<String, Object> source, Map<String, Object> target) {
         Field sField = null;
         Field tField = null;
         Object v = null;
         String tFieldName = null;
-        for (int k = 0; k < sFieldSize; k++) {
+        for (int k = 0; k < fieldSize; k++) {
             sField = sFields.get(k);
             tField = tFields.get(k);
             if (null != sField && null != tField) {