DataSyncServiceImpl.java 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. /**
  2. * DBSyncer Copyright 2020-2024 All Rights Reserved.
  3. */
  4. package org.dbsyncer.biz.impl;
  5. import com.google.protobuf.InvalidProtocolBufferException;
  6. import org.apache.lucene.index.IndexableField;
  7. import org.dbsyncer.biz.DataSyncService;
  8. import org.dbsyncer.biz.vo.BinlogColumnVo;
  9. import org.dbsyncer.biz.vo.MessageVo;
  10. import org.dbsyncer.common.model.Paging;
  11. import org.dbsyncer.common.util.CollectionUtils;
  12. import org.dbsyncer.common.util.DateFormatUtil;
  13. import org.dbsyncer.common.util.JsonUtil;
  14. import org.dbsyncer.common.util.NumberUtil;
  15. import org.dbsyncer.common.util.StringUtil;
  16. import org.dbsyncer.parser.ProfileComponent;
  17. import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
  18. import org.dbsyncer.parser.model.Meta;
  19. import org.dbsyncer.parser.model.Picker;
  20. import org.dbsyncer.parser.model.TableGroup;
  21. import org.dbsyncer.sdk.constant.ConfigConstant;
  22. import org.dbsyncer.sdk.constant.ConnectorConstant;
  23. import org.dbsyncer.sdk.enums.StorageEnum;
  24. import org.dbsyncer.sdk.filter.FieldResolver;
  25. import org.dbsyncer.sdk.filter.Query;
  26. import org.dbsyncer.sdk.listener.event.RowChangedEvent;
  27. import org.dbsyncer.sdk.model.Field;
  28. import org.dbsyncer.sdk.storage.StorageService;
  29. import org.dbsyncer.storage.binlog.proto.BinlogMap;
  30. import org.dbsyncer.storage.util.BinlogMessageUtil;
  31. import org.slf4j.Logger;
  32. import org.slf4j.LoggerFactory;
  33. import org.springframework.stereotype.Service;
  34. import org.springframework.util.Assert;
  35. import javax.annotation.Resource;
  36. import java.time.Instant;
  37. import java.util.ArrayList;
  38. import java.util.Arrays;
  39. import java.util.Collections;
  40. import java.util.HashMap;
  41. import java.util.List;
  42. import java.util.Map;
  43. import java.util.concurrent.ConcurrentHashMap;
  44. import java.util.stream.Collectors;
  45. /**
  46. * 数据同步服务
  47. *
  48. * @author AE86
  49. * @version 1.0.0
  50. * @date 2022/12/19 23:56
  51. */
  52. @Service
  53. public class DataSyncServiceImpl implements DataSyncService {
  54. private final Logger logger = LoggerFactory.getLogger(getClass());
  55. @Resource
  56. private BufferActuatorRouter bufferActuatorRouter;
  57. @Resource
  58. private ProfileComponent profileComponent;
  59. @Resource
  60. private StorageService storageService;
  61. @Override
  62. public MessageVo getMessageVo(String metaId, String messageId) {
  63. Assert.hasText(metaId, "The metaId is null.");
  64. Assert.hasText(messageId, "The messageId is null.");
  65. MessageVo messageVo = new MessageVo();
  66. try {
  67. Map row = getData(metaId, messageId);
  68. Map binlogData = getBinlogData(row, true);
  69. String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
  70. TableGroup tableGroup = profileComponent.getTableGroup(tableGroupId);
  71. messageVo.setSourceTableName(tableGroup.getSourceTable().getName());
  72. messageVo.setTargetTableName(tableGroup.getTargetTable().getName());
  73. messageVo.setId(messageId);
  74. if (!CollectionUtils.isEmpty(binlogData)) {
  75. Map<String, String> columnMap = tableGroup.getTargetTable().getColumn().stream().collect(Collectors.toMap(Field::getName, Field::getTypeName));
  76. List<BinlogColumnVo> columns = new ArrayList<>();
  77. binlogData.forEach((k, v) -> columns.add(new BinlogColumnVo((String) k, v, columnMap.get(k))));
  78. messageVo.setColumns(columns);
  79. }
  80. } catch (Exception e) {
  81. logger.error(e.getLocalizedMessage());
  82. }
  83. return messageVo;
  84. }
  85. @Override
  86. public Map getBinlogData(Map row, boolean prettyBytes) throws InvalidProtocolBufferException {
  87. String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
  88. // 1、获取配置信息
  89. final TableGroup tableGroup = profileComponent.getTableGroup(tableGroupId);
  90. if (tableGroup == null) {
  91. return Collections.EMPTY_MAP;
  92. }
  93. // 2、获取记录的数据
  94. byte[] bytes = (byte[]) row.get(ConfigConstant.BINLOG_DATA);
  95. if (null == bytes) {
  96. if (prettyBytes) {
  97. String json = (String) row.get(ConfigConstant.CONFIG_MODEL_JSON);
  98. return JsonUtil.parseMap(json);
  99. }
  100. return Collections.EMPTY_MAP;
  101. }
  102. // 3、获取DDL
  103. Map<String, Object> target = new HashMap<>();
  104. BinlogMap message = BinlogMap.parseFrom(bytes);
  105. String event = (String) row.get(ConfigConstant.DATA_EVENT);
  106. if (StringUtil.equals(event, ConnectorConstant.OPERTION_ALTER)) {
  107. message.getRowMap().forEach((k, v) -> target.put(k, v.toStringUtf8()));
  108. return target;
  109. }
  110. // 4、反序列
  111. final Picker picker = new Picker(tableGroup);
  112. final Map<String, Field> fieldMap = picker.getTargetFieldMap();
  113. message.getRowMap().forEach((k, v) -> {
  114. if (fieldMap.containsKey(k)) {
  115. try {
  116. Object val = BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v);
  117. // 处理二进制对象显示
  118. if (prettyBytes) {
  119. if (val instanceof byte[]) {
  120. byte[] b = (byte[]) val;
  121. if (b.length > 128) {
  122. target.put(k, String.format("byte[%d]", b.length));
  123. return;
  124. }
  125. target.put(k, Arrays.toString(b));
  126. return;
  127. }
  128. }
  129. target.put(k, val);
  130. } catch (Exception e) {
  131. logger.warn("解析Binlog数据类型异常:type=[{}], valueType=[{}], value=[{}]", fieldMap.get(k).getType(),
  132. (v == null ? null : v.getClass().getName()), v);
  133. }
  134. }
  135. });
  136. return target;
  137. }
  138. @Override
  139. public String sync(Map<String, String> params) throws InvalidProtocolBufferException {
  140. String metaId = params.get("metaId");
  141. String messageId = params.get("messageId");
  142. Assert.hasText(metaId, "The metaId is null.");
  143. Assert.hasText(messageId, "The messageId is null.");
  144. Map row = getData(metaId, messageId);
  145. Map binlogData = getBinlogData(row, false);
  146. if (CollectionUtils.isEmpty(binlogData)) {
  147. return messageId;
  148. }
  149. String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
  150. String event = (String) row.get(ConfigConstant.DATA_EVENT);
  151. // 有修改同步值
  152. String retryDataParams = params.get("retryDataParams");
  153. if (StringUtil.isNotBlank(retryDataParams)) {
  154. JsonUtil.parseMap(retryDataParams).forEach((k, v) -> binlogData.put(k, convertValue(binlogData.get(k), (String) v)));
  155. }
  156. TableGroup tableGroup = profileComponent.getTableGroup(tableGroupId);
  157. String sourceTableName = tableGroup.getSourceTable().getName();
  158. // 转换为源字段
  159. final Picker picker = new Picker(tableGroup);
  160. List<Object> changedRow = picker.pickSourceData(binlogData);
  161. RowChangedEvent changedEvent = new RowChangedEvent(sourceTableName, event, changedRow);
  162. // 执行同步是否成功
  163. bufferActuatorRouter.execute(metaId, changedEvent);
  164. storageService.remove(StorageEnum.DATA, metaId, messageId);
  165. // 更新失败数
  166. Meta meta = profileComponent.getMeta(metaId);
  167. Assert.notNull(meta, "Meta can not be null.");
  168. meta.getFail().decrementAndGet();
  169. meta.setUpdateTime(Instant.now().toEpochMilli());
  170. profileComponent.editConfigModel(meta);
  171. return messageId;
  172. }
  173. private Map getData(String metaId, String messageId) {
  174. Query query = new Query(1, 1);
  175. Map<String, FieldResolver> fieldResolvers = new ConcurrentHashMap<>();
  176. fieldResolvers.put(ConfigConstant.BINLOG_DATA, (FieldResolver<IndexableField>) field -> field.binaryValue().bytes);
  177. query.setFieldResolverMap(fieldResolvers);
  178. query.addFilter(ConfigConstant.CONFIG_MODEL_ID, messageId);
  179. query.setMetaId(metaId);
  180. query.setType(StorageEnum.DATA);
  181. Paging paging = storageService.query(query);
  182. if (!CollectionUtils.isEmpty(paging.getData())) {
  183. List<Map> data = (List<Map>) paging.getData();
  184. return data.get(0);
  185. }
  186. return Collections.EMPTY_MAP;
  187. }
  188. private Object convertValue(Object oldValue, String newValue) {
  189. if (oldValue == null) {
  190. return newValue;
  191. }
  192. Object newVal;
  193. String type = oldValue.getClass().getName();
  194. switch (type) {
  195. case "java.sql.Date":
  196. newVal = DateFormatUtil.stringToDate(newValue);
  197. break;
  198. case "java.sql.Timestamp":
  199. newVal = DateFormatUtil.stringToTimestamp(newValue);
  200. break;
  201. case "java.lang.Integer":
  202. case "java.lang.Short":
  203. newVal = NumberUtil.toInt(newValue);
  204. break;
  205. case "java.lang.Long":
  206. newVal = NumberUtil.toLong(newValue);
  207. break;
  208. case "java.lang.Float":
  209. newVal = Float.valueOf(newValue);
  210. break;
  211. case "java.lang.Double":
  212. newVal = Double.valueOf(newValue);
  213. break;
  214. case "[B":
  215. newVal = stringToBytes(newValue);
  216. break;
  217. default:
  218. newVal = newValue;
  219. }
  220. return newVal;
  221. }
  222. private byte[] stringToBytes(String s) {
  223. byte[] b = null;
  224. if (s.startsWith("[") && s.endsWith("]")) {
  225. s = StringUtil.substring(s, 1, s.length() - 1);
  226. String[] split = StringUtil.split(s, ",");
  227. int length = split.length;
  228. b = new byte[length];
  229. for (int i = 0; i < length; i++) {
  230. b[i] = Byte.valueOf(split[i].trim());
  231. }
  232. }
  233. return b;
  234. }
  235. }