AbstractWriterBinlog.java 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package org.dbsyncer.parser;
  2. import com.google.protobuf.ByteString;
  3. import org.dbsyncer.cache.CacheService;
  4. import org.dbsyncer.common.util.CollectionUtils;
  5. import org.dbsyncer.connector.model.Field;
  6. import org.dbsyncer.parser.flush.BufferActuator;
  7. import org.dbsyncer.parser.model.Picker;
  8. import org.dbsyncer.parser.model.TableGroup;
  9. import org.dbsyncer.parser.model.WriterRequest;
  10. import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
  11. import org.dbsyncer.storage.binlog.proto.BinlogMap;
  12. import org.dbsyncer.storage.binlog.proto.BinlogMessage;
  13. import org.dbsyncer.storage.binlog.proto.EventEnum;
  14. import org.dbsyncer.storage.util.BinlogMessageUtil;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import java.util.HashMap;
  19. import java.util.Map;
  20. import java.util.Queue;
  21. public abstract class AbstractWriterBinlog extends AbstractBinlogRecorder<WriterRequest> {
  22. private final Logger logger = LoggerFactory.getLogger(getClass());
  23. @Autowired
  24. private BufferActuator writerBufferActuator;
  25. @Autowired
  26. private CacheService cacheService;
  27. protected void flush(String tableGroupId, String event, Map<String, Object> data) {
  28. try {
  29. BinlogMap.Builder dataBuilder = BinlogMap.newBuilder();
  30. data.forEach((k, v) -> {
  31. if (null != v) {
  32. ByteString bytes = BinlogMessageUtil.serializeValue(v);
  33. if (null != bytes) {
  34. dataBuilder.putRow(k, bytes);
  35. }
  36. }
  37. });
  38. BinlogMessage builder = BinlogMessage.newBuilder()
  39. .setTableGroupId(tableGroupId)
  40. .setEvent(EventEnum.valueOf(event))
  41. .setData(dataBuilder.build())
  42. .build();
  43. super.flush(builder);
  44. } catch (Exception e) {
  45. logger.error(e.getMessage());
  46. }
  47. }
  48. @Override
  49. protected WriterRequest deserialize(String messageId, BinlogMessage message) {
  50. if (CollectionUtils.isEmpty(message.getData().getRowMap())) {
  51. return null;
  52. }
  53. // 1、获取配置信息
  54. final TableGroup tableGroup = cacheService.get(message.getTableGroupId(), TableGroup.class);
  55. // 2、反序列数据
  56. try {
  57. final Picker picker = new Picker(tableGroup.getFieldMapping());
  58. final Map<String, Field> fieldMap = picker.getTargetFieldMap();
  59. Map<String, Object> data = new HashMap<>();
  60. message.getData().getRowMap().forEach((k, v) -> {
  61. if (fieldMap.containsKey(k)) {
  62. data.put(k, BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v));
  63. }
  64. });
  65. return new WriterRequest(messageId, message.getTableGroupId(), message.getEvent().name(), data);
  66. } catch (Exception e) {
  67. logger.error(e.getMessage());
  68. }
  69. return null;
  70. }
  71. @Override
  72. public String getTaskName() {
  73. return "WriterBinlog";
  74. }
  75. @Override
  76. public Queue getQueue() {
  77. return writerBufferActuator.getQueue();
  78. }
  79. @Override
  80. public int getQueueCapacity() {
  81. return writerBufferActuator.getQueueCapacity();
  82. }
  83. }