Quellcode durchsuchen

!176 merge
Merge pull request !176 from AE86/v_1.2.5_dev

AE86 vor 1 Jahr
Ursprung
Commit
bf6818f651

+ 1 - 1
dbsyncer-biz/pom.xml

@@ -5,7 +5,7 @@
 	<parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.5</version>
+        <version>1.2.6</version>
     </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-biz</artifactId>

+ 9 - 8
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -103,9 +103,9 @@ public class DataSyncServiceImpl implements DataSyncService {
         }
 
         // 3、反序列
-        Map<String, Object> map = new HashMap<>();
+        Map<String, Object> target = new HashMap<>();
         final Picker picker = new Picker(tableGroup.getFieldMapping());
-        final Map<String, Field> fieldMap = picker.getSourceFieldMap();
+        final Map<String, Field> fieldMap = picker.getTargetFieldMap();
         BinlogMap message = BinlogMap.parseFrom(bytes);
         message.getRowMap().forEach((k, v) -> {
             if (fieldMap.containsKey(k)) {
@@ -115,17 +115,17 @@ public class DataSyncServiceImpl implements DataSyncService {
                     if (null != val && val instanceof byte[]) {
                         byte[] b = (byte[]) val;
                         if (b.length > 128) {
-                            map.put(k, String.format("byte[%d]", b.length));
+                            target.put(k, String.format("byte[%d]", b.length));
                             return;
                         }
-                        map.put(k, Arrays.toString(b));
+                        target.put(k, Arrays.toString(b));
                         return;
                     }
                 }
-                map.put(k, val);
+                target.put(k, val);
             }
         });
-        return map;
+        return target;
     }
 
     @Override
@@ -138,7 +138,6 @@ public class DataSyncServiceImpl implements DataSyncService {
         try {
             Map row = monitor.getData(metaId, messageId);
             Map binlogData = getBinlogData(row, false);
-            // 历史数据不支持手动同步
             if (CollectionUtils.isEmpty(binlogData)) {
                 return messageId;
             }
@@ -152,7 +151,9 @@ public class DataSyncServiceImpl implements DataSyncService {
             TableGroup tableGroup = manager.getTableGroup(tableGroupId);
             String sourceTableName = tableGroup.getSourceTable().getName();
             RowChangedEvent changedEvent = new RowChangedEvent(sourceTableName, event, Collections.EMPTY_LIST);
-            changedEvent.setChangedRow(binlogData);
+            // 转换为源字段
+            final Picker picker = new Picker(tableGroup.getFieldMapping());
+            changedEvent.setChangedRow(picker.pickSourceData(binlogData));
             parser.execute(tableGroupId, changedEvent);
             monitor.removeData(metaId, messageId);
             // 更新失败数

+ 1 - 1
dbsyncer-cache/pom.xml

@@ -4,7 +4,7 @@
 	<parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.5</version>
+        <version>1.2.6</version>
     </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-cache</artifactId>

+ 1 - 1
dbsyncer-cluster/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.5</version>
+        <version>1.2.6</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-cluster</artifactId>

+ 1 - 1
dbsyncer-common/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.5</version>
+        <version>1.2.6</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-common</artifactId>

+ 1 - 1
dbsyncer-connector/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.5</version>
+        <version>1.2.6</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-connector</artifactId>

+ 1 - 1
dbsyncer-listener/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.5</version>
+        <version>1.2.6</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-listener</artifactId>

+ 1 - 1
dbsyncer-manager/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.5</version>
+        <version>1.2.6</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-manager</artifactId>

+ 1 - 1
dbsyncer-monitor/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.5</version>
+        <version>1.2.6</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-monitor</artifactId>

+ 1 - 1
dbsyncer-parser/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.5</version>
+        <version>1.2.6</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-parser</artifactId>

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -269,7 +269,7 @@ public class ParserFactory implements Parser {
             }
 
             // 2、映射字段
-            List<Map> target = picker.pickData(source);
+            List<Map> target = picker.pickTargetData(source);
 
             // 3、参数转换
             ConvertUtil.convert(group.getConvert(), target);

+ 10 - 16
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -139,7 +139,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         final Picker picker = new Picker(group.getFieldMapping());
         final List<Map> sourceDataList = response.getDataList();
         // 2、映射字段
-        List<Map> targetDataList = picker.pickData(sourceDataList);
+        List<Map> targetDataList = picker.pickTargetData(sourceDataList);
 
         // 3、参数转换
         ConvertUtil.convert(group.getConvert(), targetDataList);
@@ -190,6 +190,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
             String tConnType = tConnConfig.getConnectorType();
             // 0.生成目标表执行SQL(暂支持MySQL) fixme AE86 暂内测MySQL作为试运行版本
             if (StringUtil.equals(sConnType, tConnType) && StringUtil.equals(ConnectorEnum.MYSQL.getType(), tConnType)) {
+                // 1.转换为目标SQL,执行到目标库
                 String targetTableName = tableGroup.getTargetTable().getName();
                 List<FieldMapping> originalFieldMappings = tableGroup.getFieldMapping();
                 DDLConfig targetDDLConfig = ddlParser.parseDDlConfig(response.getSql(), tConnType, targetTableName, originalFieldMappings);
@@ -198,30 +199,23 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
                 result.setTableGroupId(tableGroup.getId());
                 result.setTargetTableGroupName(targetTableName);
 
-                // TODO life
-                // 1.获取目标表最新的属性字段
+                // 2.获取目标表最新的属性字段
                 MetaInfo targetMetaInfo = parser.getMetaInfo(mapping.getTargetConnectorId(), targetTableName);
                 MetaInfo originMetaInfo = parser.getMetaInfo(mapping.getSourceConnectorId(), tableGroup.getSourceTable().getName());
-                // 1.1 参考 org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker.refreshTableFields
-                //上面已经是刷新了
 
-                // 1.2 要注意,表支持自定义主键,要兼容处理
-                //主键问题还未涉及到这种情况,可能还没写,可能要是删除主键会需要考虑,其他的情况我应该不会动
-
-                // 2.更新TableGroup.targetTable
+                // 3.更新表字段映射(根据保留的更改的属性,进行更改)
                 tableGroup.getSourceTable().setColumn(originMetaInfo.getColumn());
                 tableGroup.getTargetTable().setColumn(targetMetaInfo.getColumn());
-
-                // 3.更新表字段映射(根据保留的更改的属性,进行更改)
                 tableGroup.setFieldMapping(ddlParser.refreshFiledMappings(originalFieldMappings, originMetaInfo, targetMetaInfo, targetDDLConfig));
-                // 4.合并驱动配置 & 更新TableGroup.command 合并驱动应该不需要了,我只是把该替换的地方替换掉了,原来的还是保持一致,应该需要更新TableGroup.command
-                // 4.1 参考 org.dbsyncer.biz.checker.impl.tablegroup.TableGroupChecker.mergeConfig
+
+                // 4.更新执行命令
                 Map<String, String> commands = parser.getCommand(mapping, tableGroup);
                 tableGroup.setCommand(commands);
-                // 5.持久化存储 & 更新缓存
-                // 5.1 参考 org.dbsyncer.manager.ManagerFactory.editConfigModel
-                // 将方法移动到parser模块,就可以复用实现
+
+                // 5.持久化存储 & 更新缓存配置
                 flushCache(tableGroup);
+
+                // 6.发布更新事件,持久化增量数据
                 applicationContext.publishEvent(new RefreshOffsetEvent(applicationContext, response.getOffsetList()));
                 flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
                 return;

+ 16 - 9
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -31,28 +31,35 @@ public class Picker {
         }
     }
 
-    public List<Map> pickData(List<Map> data) {
+    public List<Map> pickTargetData(List<Map> source) {
         List<Map> targetMapList = new ArrayList<>();
-        if (!CollectionUtils.isEmpty(data)) {
-            final int size = data.size();
+        if (!CollectionUtils.isEmpty(source)) {
+            final int size = source.size();
             final int sFieldSize = sourceFields.size();
             Map<String, Object> target = null;
             for (int i = 0; i < size; i++) {
                 target = new HashMap<>();
-                exchange(sFieldSize, sourceFields, targetFields, data.get(i), target);
+                exchange(sFieldSize, sourceFields, targetFields, source.get(i), target);
                 targetMapList.add(target);
             }
         }
         return targetMapList;
     }
 
-    private void exchange(int sFieldSize, List<Field> sFields, List<Field> tFields, Map<String, Object> source,
-                          Map<String, Object> target) {
+    public Map pickSourceData(Map target) {
+        Map<String, Object> source = new HashMap<>();
+        if (!CollectionUtils.isEmpty(target)) {
+            exchange(targetFields.size(), targetFields, sourceFields, target, source);
+        }
+        return source;
+    }
+
+    private void exchange(int fieldSize, List<Field> sFields, List<Field> tFields, Map<String, Object> source, Map<String, Object> target) {
         Field sField = null;
         Field tField = null;
         Object v = null;
         String tFieldName = null;
-        for (int k = 0; k < sFieldSize; k++) {
+        for (int k = 0; k < fieldSize; k++) {
             sField = sFields.get(k);
             tField = tFields.get(k);
             if (null != sField && null != tField) {
@@ -82,7 +89,7 @@ public class Picker {
         return Collections.unmodifiableList(fields);
     }
 
-    public Map<String, Field> getSourceFieldMap() {
-        return sourceFields.stream().filter(f -> null != f).collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
+    public Map<String, Field> getTargetFieldMap() {
+        return targetFields.stream().filter(f -> null != f).collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
     }
 }

+ 1 - 1
dbsyncer-plugin/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.5</version>
+        <version>1.2.6</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-plugin</artifactId>

+ 1 - 1
dbsyncer-storage/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.5</version>
+        <version>1.2.6</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-storage</artifactId>

+ 1 - 1
dbsyncer-web/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.5</version>
+        <version>1.2.6</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-web</artifactId>

+ 1 - 1
pom.xml

@@ -6,7 +6,7 @@
 
     <groupId>org.ghi</groupId>
     <artifactId>dbsyncer</artifactId>
-    <version>1.2.5</version>
+    <version>1.2.6</version>
     <packaging>pom</packaging>
     <name>dbsyncer</name>
     <url>https://gitee.com/ghi/dbsyncer</url>

+ 1 - 1
version.cmd

@@ -1,7 +1,7 @@
 @echo off
 
 set CURRENT_DATE=%date:~5,2%%date:~8,2%
-set VERSION=1.2.5_%CURRENT_DATE%
+set VERSION=1.2.6_%CURRENT_DATE%
 set /p APP_VERSION=Please enter a new version number(%VERSION%): || set APP_VERSION=%VERSION%
 echo %APP_VERSION%