AbstractWriterBinlog.java 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package org.dbsyncer.parser;
  2. import org.dbsyncer.cache.CacheService;
  3. import org.dbsyncer.common.util.CollectionUtils;
  4. import org.dbsyncer.connector.model.Field;
  5. import org.dbsyncer.connector.model.Table;
  6. import org.dbsyncer.parser.flush.BufferActuator;
  7. import org.dbsyncer.parser.model.Picker;
  8. import org.dbsyncer.parser.model.TableGroup;
  9. import org.dbsyncer.parser.model.WriterRequest;
  10. import org.dbsyncer.storage.binlog.AbstractBinlogService;
  11. import org.dbsyncer.storage.binlog.proto.BinlogMessage;
  12. import org.dbsyncer.storage.binlog.proto.EventEnum;
  13. import org.dbsyncer.storage.util.BinlogMessageUtil;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import java.util.HashMap;
  18. import java.util.Map;
  19. import java.util.Queue;
  20. public abstract class AbstractWriterBinlog extends AbstractBinlogService<WriterRequest> {
  21. private final Logger logger = LoggerFactory.getLogger(getClass());
  22. @Autowired
  23. private BufferActuator writerBufferActuator;
  24. @Autowired
  25. private CacheService cacheService;
  26. protected void flush(String tableGroupId, String event, Map<String, Object> data) {
  27. try {
  28. BinlogMessage builder = BinlogMessage.newBuilder()
  29. .setTableGroupId(tableGroupId)
  30. .setEvent(EventEnum.valueOf(event))
  31. .setData(BinlogMessageUtil.toBinlogMap(data))
  32. .build();
  33. super.flush(builder);
  34. } catch (Exception e) {
  35. logger.error(e.getMessage());
  36. }
  37. }
  38. @Override
  39. protected WriterRequest deserialize(String messageId, BinlogMessage message) {
  40. if (CollectionUtils.isEmpty(message.getData().getRowMap())) {
  41. return null;
  42. }
  43. // 1、获取配置信息
  44. final TableGroup tableGroup = cacheService.get(message.getTableGroupId(), TableGroup.class);
  45. // 2、反序列数据
  46. Map<String, Object> data = new HashMap<>();
  47. try {
  48. final Picker picker = new Picker(tableGroup.getFieldMapping());
  49. final Map<String, Field> fieldMap = picker.getSourceFieldMap();
  50. message.getData().getRowMap().forEach((k, v) -> {
  51. if (fieldMap.containsKey(k)) {
  52. data.put(k, BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v));
  53. }
  54. });
  55. return new WriterRequest(messageId, message.getTableGroupId(), message.getEvent().name(), data);
  56. } catch (Exception e) {
  57. Table sTable = tableGroup.getSourceTable();
  58. Table tTable = tableGroup.getTargetTable();
  59. logger.error("messageId:{}, sTable:{}, tTable:{}, event:{}, data:{}", messageId, sTable.getName(), tTable.getName(), message.getEvent().name(), data);
  60. logger.error(messageId, e);
  61. }
  62. return null;
  63. }
  64. @Override
  65. public String getTaskName() {
  66. return "WriterBinlog";
  67. }
  68. @Override
  69. public Queue getQueue() {
  70. return writerBufferActuator.getQueue();
  71. }
  72. @Override
  73. public int getQueueCapacity() {
  74. return writerBufferActuator.getQueueCapacity();
  75. }
  76. }