浏览代码

gen proto

AE86 2 年之前
父节点
当前提交
f997477246

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/BufferedRandomAccessFile.java → dbsyncer-common/src/main/java/org/dbsyncer/common/file/BufferedRandomAccessFile.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.listener.file;
+package org.dbsyncer.common.file;
 
 import java.io.File;
 import java.io.FileNotFoundException;

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -2,7 +2,7 @@ package org.dbsyncer.listener.file;
 
 import org.apache.commons.io.IOUtils;
 import org.dbsyncer.common.event.RowChangedEvent;
-import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.file.BufferedRandomAccessFile;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.RandomUtil;
 import org.dbsyncer.common.util.StringUtil;

+ 0 - 6
dbsyncer-parser/pom.xml

@@ -39,12 +39,6 @@
             <version>${project.parent.version}</version>
         </dependency>
 
-        <!-- protobuf -->
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-        </dependency>
-
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>

+ 3 - 12
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -21,14 +21,13 @@ import java.util.concurrent.locks.ReentrantLock;
 /**
  * 任务缓存执行器
  * <p>1. 任务优先进入缓存队列
- * <p>2. 任务数超过队列阈值75%时,序列化写入磁盘
- * <p>3. 内置定时同步线程,在队列空闲时,将磁盘数据刷入缓存
+ * <p>2. 将任务分区合并,批量执行
  *
  * @author AE86
  * @version 1.0.0
  * @date 2022/3/27 17:36
  */
-public abstract class AbstractBufferActuator<Request, Response> extends AbstractBinlogRecorder implements BufferActuator, ScheduledTaskJob {
+public abstract class AbstractBufferActuator<Request, Response> implements BufferActuator, ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -81,18 +80,12 @@ public abstract class AbstractBufferActuator<Request, Response> extends Abstract
     protected abstract void pull(Response response);
 
     @Override
-    protected Queue getQueue() {
+    public Queue getQueue() {
         return buffer;
     }
 
     @Override
     public void offer(BufferRequest request) {
-//        if (running || buffer.size() >= (CAPACITY * BUFFER_THRESHOLD)) {
-//            flushBinlog(request);
-//        } else {
-//            buffer.offer((Request) request);
-//        }
-
         buffer.offer((Request) request);
 
         // TODO 临时解决方案:生产大于消费问题,限制生产速度
@@ -129,8 +122,6 @@ public abstract class AbstractBufferActuator<Request, Response> extends Abstract
                 bufferLock.unlock();
             }
         }
-
-//        parseBinlog();
     }
 
     private void flush(Queue<Request> queue) throws IllegalAccessException, InstantiationException {

+ 14 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.parser.flush;
 
+import java.util.Queue;
+
 /**
  * @author AE86
  * @version 1.0.0
@@ -7,6 +9,18 @@ package org.dbsyncer.parser.flush;
  */
 public interface BufferActuator {
 
+    /**
+     * 获取缓存队列
+     *
+     * @return
+     */
+    Queue getQueue();
+
+    /**
+     * 提交任务
+     *
+     * @param request
+     */
     void offer(BufferRequest request);
 
 }

+ 0 - 26
dbsyncer-parser/src/main/proto/WriterRequestMessage.proto

@@ -1,26 +0,0 @@
-syntax = "proto3";
-
-option java_package = "org.dbsyncer.parser.proto";
-option java_outer_classname = "WriterRequestMessage";
-option optimize_for = SPEED;
-
-message WriterRequestMessage {
-    string meta_id = 1;
-    string target_connector_id = 2;
-    string source_table_name = 3;
-    string target_table_name = 4;
-    repeated string event = 5;
-    repeated Field fields = 6;
-    repeated Command command = 7;
-}
-
-message Field {
-    string name = 1;
-    string type_name = 2;
-    int32 type = 3;
-    bool pk = 4;
-}
-
-message Command {
-    map<string, string> a = 1;
-}

+ 6 - 0
dbsyncer-storage/pom.xml

@@ -32,6 +32,12 @@
             <version>${project.parent.version}</version>
         </dependency>
 
+        <!-- protobuf -->
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-log4j2</artifactId>

+ 13 - 28
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBinlogRecorder.java → dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -1,10 +1,10 @@
-package org.dbsyncer.parser.flush;
+package org.dbsyncer.storage.binlog;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.dbsyncer.common.file.BufferedRandomAccessFile;
 import org.dbsyncer.common.util.JsonUtil;
-import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.listener.file.BufferedRandomAccessFile;
+import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
@@ -12,7 +12,6 @@ import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
 import java.io.*;
-import java.lang.reflect.ParameterizedType;
 import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.Queue;
@@ -55,11 +54,8 @@ public abstract class AbstractBinlogRecorder implements BinlogRecorder, Disposab
 
     private OutputStream out;
 
-    private Class<?> requestClazz;
-
     @PostConstruct
     private void init() throws IOException {
-        requestClazz = (Class<?>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
         // /data/binlog/{BufferActuator}/
         path = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar)
                 .append("data").append(File.separatorChar)
@@ -98,14 +94,13 @@ public abstract class AbstractBinlogRecorder implements BinlogRecorder, Disposab
         cycle.set(0);
 
         try {
-            String line;
+            byte[] line;
             boolean hasLine = false;
             while (null != (line = pipeline.readLine())) {
-                if (StringUtil.isNotBlank(line)) {
-                    // TODO 替换序列化方案
-                    getQueue().offer(JsonUtil.jsonToObj(line, requestClazz));
-                    hasLine = true;
-                }
+                BinlogMessage message = BinlogMessage.parseFrom(line);
+                logger.info("parse message:{}", message.toString());
+//                getQueue().offer(message);
+                hasLine = true;
             }
 
             if (hasLine) {
@@ -118,10 +113,10 @@ public abstract class AbstractBinlogRecorder implements BinlogRecorder, Disposab
     }
 
     @Override
-    public void flushBinlog(BufferRequest request) {
+    public void flushBinlog(BinlogMessage message) {
         try {
-            // TODO 替换序列化方案
-            writeLine(out, JsonUtil.objToJson(request));
+            out.write(message.toByteArray());
+            out.write(LINE_SEPARATOR.getBytes(DEFAULT_CHARSET));
         } catch (IOException e) {
             logger.error(e.getMessage());
         }
@@ -159,16 +154,6 @@ public abstract class AbstractBinlogRecorder implements BinlogRecorder, Disposab
         out = new FileOutputStream(binlogFile, true);
     }
 
-    private void writeLine(final OutputStream output, final String line) throws IOException {
-        if (line == null) {
-            return;
-        }
-        if (line != null) {
-            output.write(line.getBytes(DEFAULT_CHARSET));
-        }
-        output.write(LINE_SEPARATOR.getBytes(DEFAULT_CHARSET));
-    }
-
     private String createBinlogName(int index) {
         return String.format("%s.%06d", BINLOG, index <= 0 ? 1 : index);
     }
@@ -182,7 +167,7 @@ public abstract class AbstractBinlogRecorder implements BinlogRecorder, Disposab
             this.raf = raf;
         }
 
-        public String readLine() throws IOException {
+        public byte[] readLine() throws IOException {
             this.filePointer = raf.getFilePointer();
             if (filePointer >= raf.length()) {
                 b = new byte[0];
@@ -208,7 +193,7 @@ public abstract class AbstractBinlogRecorder implements BinlogRecorder, Disposab
             byte[] _b = stream.toByteArray();
             stream.close();
             stream = null;
-            return new String(_b, DEFAULT_CHARSET);
+            return _b;
         }
     }
 

+ 5 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BinlogRecorder.java → dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogRecorder.java

@@ -1,4 +1,6 @@
-package org.dbsyncer.parser.flush;
+package org.dbsyncer.storage.binlog;
+
+import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 
 /**
  * @author AE86
@@ -10,8 +12,8 @@ public interface BinlogRecorder {
     /**
      * 将任务序列化刷入磁盘
      *
-     * @param request
+     * @param message
      */
-    void flushBinlog(BufferRequest request);
+    void flushBinlog(BinlogMessage message);
 
 }

+ 1095 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/proto/BinlogMessage.java

@@ -0,0 +1,1095 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: BinlogMessageProto.proto
+
+package org.dbsyncer.storage.binlog.proto;
+
+/**
+ * Protobuf type {@code BinlogMessage}
+ */
+public final class BinlogMessage extends
+        com.google.protobuf.GeneratedMessageV3 implements
+        // @@protoc_insertion_point(message_implements:BinlogMessage)
+        BinlogMessageOrBuilder {
+  private static final long serialVersionUID = 0L;
+
+  // Use BinlogMessage.newBuilder() to construct.
+  private BinlogMessage(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
+    super(builder);
+  }
+
+  private BinlogMessage() {
+    tableGroupId_ = "";
+    event_ = 0;
+    data_ = java.util.Collections.emptyList();
+  }
+
+  @java.lang.Override
+  @SuppressWarnings({"unused"})
+  protected java.lang.Object newInstance(
+          UnusedPrivateParameter unused) {
+    return new BinlogMessage();
+  }
+
+  @java.lang.Override
+  public final com.google.protobuf.UnknownFieldSet
+  getUnknownFields() {
+    return this.unknownFields;
+  }
+
+  private BinlogMessage(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+    this();
+    if (extensionRegistry == null) {
+      throw new java.lang.NullPointerException();
+    }
+    int mutable_bitField0_ = 0;
+    com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+            com.google.protobuf.UnknownFieldSet.newBuilder();
+    try {
+      boolean done = false;
+      while (!done) {
+        int tag = input.readTag();
+        switch (tag) {
+          case 0:
+            done = true;
+            break;
+          case 10: {
+            java.lang.String s = input.readStringRequireUtf8();
+
+            tableGroupId_ = s;
+            break;
+          }
+          case 16: {
+            int rawValue = input.readEnum();
+
+            event_ = rawValue;
+            break;
+          }
+          case 26: {
+            if (!((mutable_bitField0_ & 0x00000001) != 0)) {
+              data_ = new java.util.ArrayList<org.dbsyncer.storage.binlog.proto.Data>();
+              mutable_bitField0_ |= 0x00000001;
+            }
+            data_.add(
+                    input.readMessage(org.dbsyncer.storage.binlog.proto.Data.parser(), extensionRegistry));
+            break;
+          }
+          default: {
+            if (!parseUnknownField(
+                    input, unknownFields, extensionRegistry, tag)) {
+              done = true;
+            }
+            break;
+          }
+        }
+      }
+    } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+      throw e.setUnfinishedMessage(this);
+    } catch (com.google.protobuf.UninitializedMessageException e) {
+      throw e.asInvalidProtocolBufferException().setUnfinishedMessage(this);
+    } catch (java.io.IOException e) {
+      throw new com.google.protobuf.InvalidProtocolBufferException(
+              e).setUnfinishedMessage(this);
+    } finally {
+      if (((mutable_bitField0_ & 0x00000001) != 0)) {
+        data_ = java.util.Collections.unmodifiableList(data_);
+      }
+      this.unknownFields = unknownFields.build();
+      makeExtensionsImmutable();
+    }
+  }
+
+  public static final com.google.protobuf.Descriptors.Descriptor
+  getDescriptor() {
+    return org.dbsyncer.storage.binlog.proto.BinlogMessageProto.internal_static_BinlogMessage_descriptor;
+  }
+
+  @java.lang.Override
+  protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+  internalGetFieldAccessorTable() {
+    return org.dbsyncer.storage.binlog.proto.BinlogMessageProto.internal_static_BinlogMessage_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                    org.dbsyncer.storage.binlog.proto.BinlogMessage.class, org.dbsyncer.storage.binlog.proto.BinlogMessage.Builder.class);
+  }
+
+  public static final int TABLE_GROUP_ID_FIELD_NUMBER = 1;
+  private volatile java.lang.Object tableGroupId_;
+
+  /**
+   * <code>string table_group_id = 1;</code>
+   *
+   * @return The tableGroupId.
+   */
+  @java.lang.Override
+  public java.lang.String getTableGroupId() {
+    java.lang.Object ref = tableGroupId_;
+    if (ref instanceof java.lang.String) {
+      return (java.lang.String) ref;
+    } else {
+      com.google.protobuf.ByteString bs =
+              (com.google.protobuf.ByteString) ref;
+      java.lang.String s = bs.toStringUtf8();
+      tableGroupId_ = s;
+      return s;
+    }
+  }
+
+  /**
+   * <code>string table_group_id = 1;</code>
+   *
+   * @return The bytes for tableGroupId.
+   */
+  @java.lang.Override
+  public com.google.protobuf.ByteString
+  getTableGroupIdBytes() {
+    java.lang.Object ref = tableGroupId_;
+    if (ref instanceof java.lang.String) {
+      com.google.protobuf.ByteString b =
+              com.google.protobuf.ByteString.copyFromUtf8(
+                      (java.lang.String) ref);
+      tableGroupId_ = b;
+      return b;
+    } else {
+      return (com.google.protobuf.ByteString) ref;
+    }
+  }
+
+  public static final int EVENT_FIELD_NUMBER = 2;
+  private int event_;
+
+  /**
+   * <code>.Event event = 2;</code>
+   *
+   * @return The enum numeric value on the wire for event.
+   */
+  @java.lang.Override
+  public int getEventValue() {
+    return event_;
+  }
+
+  /**
+   * <code>.Event event = 2;</code>
+   *
+   * @return The event.
+   */
+  @java.lang.Override
+  public org.dbsyncer.storage.binlog.proto.Event getEvent() {
+    @SuppressWarnings("deprecation")
+    org.dbsyncer.storage.binlog.proto.Event result = org.dbsyncer.storage.binlog.proto.Event.valueOf(event_);
+    return result == null ? org.dbsyncer.storage.binlog.proto.Event.UNRECOGNIZED : result;
+  }
+
+  public static final int DATA_FIELD_NUMBER = 3;
+  private java.util.List<org.dbsyncer.storage.binlog.proto.Data> data_;
+
+  /**
+   * <code>repeated .Data data = 3;</code>
+   */
+  @java.lang.Override
+  public java.util.List<org.dbsyncer.storage.binlog.proto.Data> getDataList() {
+    return data_;
+  }
+
+  /**
+   * <code>repeated .Data data = 3;</code>
+   */
+  @java.lang.Override
+  public java.util.List<? extends org.dbsyncer.storage.binlog.proto.DataOrBuilder>
+  getDataOrBuilderList() {
+    return data_;
+  }
+
+  /**
+   * <code>repeated .Data data = 3;</code>
+   */
+  @java.lang.Override
+  public int getDataCount() {
+    return data_.size();
+  }
+
+  /**
+   * <code>repeated .Data data = 3;</code>
+   */
+  @java.lang.Override
+  public org.dbsyncer.storage.binlog.proto.Data getData(int index) {
+    return data_.get(index);
+  }
+
+  /**
+   * <code>repeated .Data data = 3;</code>
+   */
+  @java.lang.Override
+  public org.dbsyncer.storage.binlog.proto.DataOrBuilder getDataOrBuilder(
+          int index) {
+    return data_.get(index);
+  }
+
+  private byte memoizedIsInitialized = -1;
+
+  @java.lang.Override
+  public final boolean isInitialized() {
+    byte isInitialized = memoizedIsInitialized;
+    if (isInitialized == 1) return true;
+    if (isInitialized == 0) return false;
+
+    memoizedIsInitialized = 1;
+    return true;
+  }
+
+  @java.lang.Override
+  public void writeTo(com.google.protobuf.CodedOutputStream output)
+          throws java.io.IOException {
+    if (!com.google.protobuf.GeneratedMessageV3.isStringEmpty(tableGroupId_)) {
+      com.google.protobuf.GeneratedMessageV3.writeString(output, 1, tableGroupId_);
+    }
+    if (event_ != org.dbsyncer.storage.binlog.proto.Event.UPDATE.getNumber()) {
+      output.writeEnum(2, event_);
+    }
+    for (int i = 0; i < data_.size(); i++) {
+      output.writeMessage(3, data_.get(i));
+    }
+    unknownFields.writeTo(output);
+  }
+
+  @java.lang.Override
+  public int getSerializedSize() {
+    int size = memoizedSize;
+    if (size != -1) return size;
+
+    size = 0;
+    if (!com.google.protobuf.GeneratedMessageV3.isStringEmpty(tableGroupId_)) {
+      size += com.google.protobuf.GeneratedMessageV3.computeStringSize(1, tableGroupId_);
+    }
+    if (event_ != org.dbsyncer.storage.binlog.proto.Event.UPDATE.getNumber()) {
+      size += com.google.protobuf.CodedOutputStream
+              .computeEnumSize(2, event_);
+    }
+    for (int i = 0; i < data_.size(); i++) {
+      size += com.google.protobuf.CodedOutputStream
+              .computeMessageSize(3, data_.get(i));
+    }
+    size += unknownFields.getSerializedSize();
+    memoizedSize = size;
+    return size;
+  }
+
+  @java.lang.Override
+  public boolean equals(final java.lang.Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (!(obj instanceof org.dbsyncer.storage.binlog.proto.BinlogMessage)) {
+      return super.equals(obj);
+    }
+    org.dbsyncer.storage.binlog.proto.BinlogMessage other = (org.dbsyncer.storage.binlog.proto.BinlogMessage) obj;
+
+    if (!getTableGroupId()
+            .equals(other.getTableGroupId())) return false;
+    if (event_ != other.event_) return false;
+    if (!getDataList()
+            .equals(other.getDataList())) return false;
+    if (!unknownFields.equals(other.unknownFields)) return false;
+    return true;
+  }
+
+  @java.lang.Override
+  public int hashCode() {
+    if (memoizedHashCode != 0) {
+      return memoizedHashCode;
+    }
+    int hash = 41;
+    hash = (19 * hash) + getDescriptor().hashCode();
+    hash = (37 * hash) + TABLE_GROUP_ID_FIELD_NUMBER;
+    hash = (53 * hash) + getTableGroupId().hashCode();
+    hash = (37 * hash) + EVENT_FIELD_NUMBER;
+    hash = (53 * hash) + event_;
+    if (getDataCount() > 0) {
+      hash = (37 * hash) + DATA_FIELD_NUMBER;
+      hash = (53 * hash) + getDataList().hashCode();
+    }
+    hash = (29 * hash) + unknownFields.hashCode();
+    memoizedHashCode = hash;
+    return hash;
+  }
+
+  public static org.dbsyncer.storage.binlog.proto.BinlogMessage parseFrom(
+          java.nio.ByteBuffer data)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data);
+  }
+
+  public static org.dbsyncer.storage.binlog.proto.BinlogMessage parseFrom(
+          java.nio.ByteBuffer data,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data, extensionRegistry);
+  }
+
+  public static org.dbsyncer.storage.binlog.proto.BinlogMessage parseFrom(
+          com.google.protobuf.ByteString data)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data);
+  }
+
+  public static org.dbsyncer.storage.binlog.proto.BinlogMessage parseFrom(
+          com.google.protobuf.ByteString data,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data, extensionRegistry);
+  }
+
+  public static org.dbsyncer.storage.binlog.proto.BinlogMessage parseFrom(byte[] data)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data);
+  }
+
+  public static org.dbsyncer.storage.binlog.proto.BinlogMessage parseFrom(
+          byte[] data,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data, extensionRegistry);
+  }
+
+  public static org.dbsyncer.storage.binlog.proto.BinlogMessage parseFrom(java.io.InputStream input)
+          throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+            .parseWithIOException(PARSER, input);
+  }
+
+  public static org.dbsyncer.storage.binlog.proto.BinlogMessage parseFrom(
+          java.io.InputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+            .parseWithIOException(PARSER, input, extensionRegistry);
+  }
+
+  public static org.dbsyncer.storage.binlog.proto.BinlogMessage parseDelimitedFrom(java.io.InputStream input)
+          throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+            .parseDelimitedWithIOException(PARSER, input);
+  }
+
+  public static org.dbsyncer.storage.binlog.proto.BinlogMessage parseDelimitedFrom(
+          java.io.InputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+            .parseDelimitedWithIOException(PARSER, input, extensionRegistry);
+  }
+
+  public static org.dbsyncer.storage.binlog.proto.BinlogMessage parseFrom(
+          com.google.protobuf.CodedInputStream input)
+          throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+            .parseWithIOException(PARSER, input);
+  }
+
+  public static org.dbsyncer.storage.binlog.proto.BinlogMessage parseFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+            .parseWithIOException(PARSER, input, extensionRegistry);
+  }
+
+  @java.lang.Override
+  public Builder newBuilderForType() {
+    return newBuilder();
+  }
+
+  public static Builder newBuilder() {
+    return DEFAULT_INSTANCE.toBuilder();
+  }
+
+  public static Builder newBuilder(org.dbsyncer.storage.binlog.proto.BinlogMessage prototype) {
+    return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
+  }
+
+  @java.lang.Override
+  public Builder toBuilder() {
+    return this == DEFAULT_INSTANCE
+            ? new Builder() : new Builder().mergeFrom(this);
+  }
+
+  @java.lang.Override
+  protected Builder newBuilderForType(
+          com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+    Builder builder = new Builder(parent);
+    return builder;
+  }
+
+  /**
+   * Protobuf type {@code BinlogMessage}
+   */
+  public static final class Builder extends
+          com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements
+          // @@protoc_insertion_point(builder_implements:BinlogMessage)
+          org.dbsyncer.storage.binlog.proto.BinlogMessageOrBuilder {
+    public static final com.google.protobuf.Descriptors.Descriptor
+    getDescriptor() {
+      return org.dbsyncer.storage.binlog.proto.BinlogMessageProto.internal_static_BinlogMessage_descriptor;
+    }
+
+    @java.lang.Override
+    protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+    internalGetFieldAccessorTable() {
+      return org.dbsyncer.storage.binlog.proto.BinlogMessageProto.internal_static_BinlogMessage_fieldAccessorTable
+              .ensureFieldAccessorsInitialized(
+                      org.dbsyncer.storage.binlog.proto.BinlogMessage.class, org.dbsyncer.storage.binlog.proto.BinlogMessage.Builder.class);
+    }
+
+    // Construct using org.dbsyncer.storage.binlog.proto.BinlogMessage.newBuilder()
+    private Builder() {
+      maybeForceBuilderInitialization();
+    }
+
+    private Builder(
+            com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+      super(parent);
+      maybeForceBuilderInitialization();
+    }
+
+    private void maybeForceBuilderInitialization() {
+      if (com.google.protobuf.GeneratedMessageV3
+              .alwaysUseFieldBuilders) {
+        getDataFieldBuilder();
+      }
+    }
+
+    @java.lang.Override
+    public Builder clear() {
+      super.clear();
+      tableGroupId_ = "";
+
+      event_ = 0;
+
+      if (dataBuilder_ == null) {
+        data_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000001);
+      } else {
+        dataBuilder_.clear();
+      }
+      return this;
+    }
+
+    @java.lang.Override
+    public com.google.protobuf.Descriptors.Descriptor
+    getDescriptorForType() {
+      return org.dbsyncer.storage.binlog.proto.BinlogMessageProto.internal_static_BinlogMessage_descriptor;
+    }
+
+    @java.lang.Override
+    public org.dbsyncer.storage.binlog.proto.BinlogMessage getDefaultInstanceForType() {
+      return org.dbsyncer.storage.binlog.proto.BinlogMessage.getDefaultInstance();
+    }
+
+    @java.lang.Override
+    public org.dbsyncer.storage.binlog.proto.BinlogMessage build() {
+      org.dbsyncer.storage.binlog.proto.BinlogMessage result = buildPartial();
+      if (!result.isInitialized()) {
+        throw newUninitializedMessageException(result);
+      }
+      return result;
+    }
+
+    @java.lang.Override
+    public org.dbsyncer.storage.binlog.proto.BinlogMessage buildPartial() {
+      org.dbsyncer.storage.binlog.proto.BinlogMessage result = new org.dbsyncer.storage.binlog.proto.BinlogMessage(this);
+      int from_bitField0_ = bitField0_;
+      result.tableGroupId_ = tableGroupId_;
+      result.event_ = event_;
+      if (dataBuilder_ == null) {
+        if (((bitField0_ & 0x00000001) != 0)) {
+          data_ = java.util.Collections.unmodifiableList(data_);
+          bitField0_ = (bitField0_ & ~0x00000001);
+        }
+        result.data_ = data_;
+      } else {
+        result.data_ = dataBuilder_.build();
+      }
+      onBuilt();
+      return result;
+    }
+
+    @java.lang.Override
+    public Builder clone() {
+      return super.clone();
+    }
+
+    @java.lang.Override
+    public Builder setField(
+            com.google.protobuf.Descriptors.FieldDescriptor field,
+            java.lang.Object value) {
+      return super.setField(field, value);
+    }
+
+    @java.lang.Override
+    public Builder clearField(
+            com.google.protobuf.Descriptors.FieldDescriptor field) {
+      return super.clearField(field);
+    }
+
+    @java.lang.Override
+    public Builder clearOneof(
+            com.google.protobuf.Descriptors.OneofDescriptor oneof) {
+      return super.clearOneof(oneof);
+    }
+
+    @java.lang.Override
+    public Builder setRepeatedField(
+            com.google.protobuf.Descriptors.FieldDescriptor field,
+            int index, java.lang.Object value) {
+      return super.setRepeatedField(field, index, value);
+    }
+
+    @java.lang.Override
+    public Builder addRepeatedField(
+            com.google.protobuf.Descriptors.FieldDescriptor field,
+            java.lang.Object value) {
+      return super.addRepeatedField(field, value);
+    }
+
+    @java.lang.Override
+    public Builder mergeFrom(com.google.protobuf.Message other) {
+      if (other instanceof org.dbsyncer.storage.binlog.proto.BinlogMessage) {
+        return mergeFrom((org.dbsyncer.storage.binlog.proto.BinlogMessage) other);
+      } else {
+        super.mergeFrom(other);
+        return this;
+      }
+    }
+
+    public Builder mergeFrom(org.dbsyncer.storage.binlog.proto.BinlogMessage other) {
+      if (other == org.dbsyncer.storage.binlog.proto.BinlogMessage.getDefaultInstance()) return this;
+      if (!other.getTableGroupId().isEmpty()) {
+        tableGroupId_ = other.tableGroupId_;
+        onChanged();
+      }
+      if (other.event_ != 0) {
+        setEventValue(other.getEventValue());
+      }
+      if (dataBuilder_ == null) {
+        if (!other.data_.isEmpty()) {
+          if (data_.isEmpty()) {
+            data_ = other.data_;
+            bitField0_ = (bitField0_ & ~0x00000001);
+          } else {
+            ensureDataIsMutable();
+            data_.addAll(other.data_);
+          }
+          onChanged();
+        }
+      } else {
+        if (!other.data_.isEmpty()) {
+          if (dataBuilder_.isEmpty()) {
+            dataBuilder_.dispose();
+            dataBuilder_ = null;
+            data_ = other.data_;
+            bitField0_ = (bitField0_ & ~0x00000001);
+            dataBuilder_ =
+                    com.google.protobuf.GeneratedMessageV3.alwaysUseFieldBuilders ?
+                            getDataFieldBuilder() : null;
+          } else {
+            dataBuilder_.addAllMessages(other.data_);
+          }
+        }
+      }
+      this.mergeUnknownFields(other.unknownFields);
+      onChanged();
+      return this;
+    }
+
+    @java.lang.Override
+    public final boolean isInitialized() {
+      return true;
+    }
+
+    @java.lang.Override
+    public Builder mergeFrom(
+            com.google.protobuf.CodedInputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws java.io.IOException {
+      org.dbsyncer.storage.binlog.proto.BinlogMessage parsedMessage = null;
+      try {
+        parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        parsedMessage = (org.dbsyncer.storage.binlog.proto.BinlogMessage) e.getUnfinishedMessage();
+        throw e.unwrapIOException();
+      } finally {
+        if (parsedMessage != null) {
+          mergeFrom(parsedMessage);
+        }
+      }
+      return this;
+    }
+
+    private int bitField0_;
+
+    private java.lang.Object tableGroupId_ = "";
+
+    /**
+     * <code>string table_group_id = 1;</code>
+     *
+     * @return The tableGroupId.
+     */
+    public java.lang.String getTableGroupId() {
+      java.lang.Object ref = tableGroupId_;
+      if (!(ref instanceof java.lang.String)) {
+        com.google.protobuf.ByteString bs =
+                (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        tableGroupId_ = s;
+        return s;
+      } else {
+        return (java.lang.String) ref;
+      }
+    }
+
+    /**
+     * <code>string table_group_id = 1;</code>
+     *
+     * @return The bytes for tableGroupId.
+     */
+    public com.google.protobuf.ByteString
+    getTableGroupIdBytes() {
+      java.lang.Object ref = tableGroupId_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b =
+                com.google.protobuf.ByteString.copyFromUtf8(
+                        (java.lang.String) ref);
+        tableGroupId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    /**
+     * <code>string table_group_id = 1;</code>
+     *
+     * @param value The tableGroupId to set.
+     * @return This builder for chaining.
+     */
+    public Builder setTableGroupId(
+            java.lang.String value) {
+      if (value == null) {
+        throw new NullPointerException();
+      }
+
+      tableGroupId_ = value;
+      onChanged();
+      return this;
+    }
+
+    /**
+     * <code>string table_group_id = 1;</code>
+     *
+     * @return This builder for chaining.
+     */
+    public Builder clearTableGroupId() {
+
+      tableGroupId_ = getDefaultInstance().getTableGroupId();
+      onChanged();
+      return this;
+    }
+
+    /**
+     * <code>string table_group_id = 1;</code>
+     *
+     * @param value The bytes for tableGroupId to set.
+     * @return This builder for chaining.
+     */
+    public Builder setTableGroupIdBytes(
+            com.google.protobuf.ByteString value) {
+      if (value == null) {
+        throw new NullPointerException();
+      }
+      checkByteStringIsUtf8(value);
+
+      tableGroupId_ = value;
+      onChanged();
+      return this;
+    }
+
+    private int event_ = 0;
+
+    /**
+     * <code>.Event event = 2;</code>
+     *
+     * @return The enum numeric value on the wire for event.
+     */
+    @java.lang.Override
+    public int getEventValue() {
+      return event_;
+    }
+
+    /**
+     * <code>.Event event = 2;</code>
+     *
+     * @param value The enum numeric value on the wire for event to set.
+     * @return This builder for chaining.
+     */
+    public Builder setEventValue(int value) {
+
+      event_ = value;
+      onChanged();
+      return this;
+    }
+
+    /**
+     * <code>.Event event = 2;</code>
+     *
+     * @return The event.
+     */
+    @java.lang.Override
+    public org.dbsyncer.storage.binlog.proto.Event getEvent() {
+      @SuppressWarnings("deprecation")
+      org.dbsyncer.storage.binlog.proto.Event result = org.dbsyncer.storage.binlog.proto.Event.valueOf(event_);
+      return result == null ? org.dbsyncer.storage.binlog.proto.Event.UNRECOGNIZED : result;
+    }
+
+    /**
+     * <code>.Event event = 2;</code>
+     *
+     * @param value The event to set.
+     * @return This builder for chaining.
+     */
+    public Builder setEvent(org.dbsyncer.storage.binlog.proto.Event value) {
+      if (value == null) {
+        throw new NullPointerException();
+      }
+
+      event_ = value.getNumber();
+      onChanged();
+      return this;
+    }
+
+    /**
+     * <code>.Event event = 2;</code>
+     *
+     * @return This builder for chaining.
+     */
+    public Builder clearEvent() {
+
+      event_ = 0;
+      onChanged();
+      return this;
+    }
+
+    private java.util.List<org.dbsyncer.storage.binlog.proto.Data> data_ =
+            java.util.Collections.emptyList();
+
+    private void ensureDataIsMutable() {
+      if (!((bitField0_ & 0x00000001) != 0)) {
+        data_ = new java.util.ArrayList<org.dbsyncer.storage.binlog.proto.Data>(data_);
+        bitField0_ |= 0x00000001;
+      }
+    }
+
+    private com.google.protobuf.RepeatedFieldBuilderV3<
+            org.dbsyncer.storage.binlog.proto.Data, org.dbsyncer.storage.binlog.proto.Data.Builder, org.dbsyncer.storage.binlog.proto.DataOrBuilder> dataBuilder_;
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public java.util.List<org.dbsyncer.storage.binlog.proto.Data> getDataList() {
+      if (dataBuilder_ == null) {
+        return java.util.Collections.unmodifiableList(data_);
+      } else {
+        return dataBuilder_.getMessageList();
+      }
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public int getDataCount() {
+      if (dataBuilder_ == null) {
+        return data_.size();
+      } else {
+        return dataBuilder_.getCount();
+      }
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public org.dbsyncer.storage.binlog.proto.Data getData(int index) {
+      if (dataBuilder_ == null) {
+        return data_.get(index);
+      } else {
+        return dataBuilder_.getMessage(index);
+      }
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public Builder setData(
+            int index, org.dbsyncer.storage.binlog.proto.Data value) {
+      if (dataBuilder_ == null) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureDataIsMutable();
+        data_.set(index, value);
+        onChanged();
+      } else {
+        dataBuilder_.setMessage(index, value);
+      }
+      return this;
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public Builder setData(
+            int index, org.dbsyncer.storage.binlog.proto.Data.Builder builderForValue) {
+      if (dataBuilder_ == null) {
+        ensureDataIsMutable();
+        data_.set(index, builderForValue.build());
+        onChanged();
+      } else {
+        dataBuilder_.setMessage(index, builderForValue.build());
+      }
+      return this;
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public Builder addData(org.dbsyncer.storage.binlog.proto.Data value) {
+      if (dataBuilder_ == null) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureDataIsMutable();
+        data_.add(value);
+        onChanged();
+      } else {
+        dataBuilder_.addMessage(value);
+      }
+      return this;
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public Builder addData(
+            int index, org.dbsyncer.storage.binlog.proto.Data value) {
+      if (dataBuilder_ == null) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureDataIsMutable();
+        data_.add(index, value);
+        onChanged();
+      } else {
+        dataBuilder_.addMessage(index, value);
+      }
+      return this;
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public Builder addData(
+            org.dbsyncer.storage.binlog.proto.Data.Builder builderForValue) {
+      if (dataBuilder_ == null) {
+        ensureDataIsMutable();
+        data_.add(builderForValue.build());
+        onChanged();
+      } else {
+        dataBuilder_.addMessage(builderForValue.build());
+      }
+      return this;
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public Builder addData(
+            int index, org.dbsyncer.storage.binlog.proto.Data.Builder builderForValue) {
+      if (dataBuilder_ == null) {
+        ensureDataIsMutable();
+        data_.add(index, builderForValue.build());
+        onChanged();
+      } else {
+        dataBuilder_.addMessage(index, builderForValue.build());
+      }
+      return this;
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public Builder addAllData(
+            java.lang.Iterable<? extends org.dbsyncer.storage.binlog.proto.Data> values) {
+      if (dataBuilder_ == null) {
+        ensureDataIsMutable();
+        com.google.protobuf.AbstractMessageLite.Builder.addAll(
+                values, data_);
+        onChanged();
+      } else {
+        dataBuilder_.addAllMessages(values);
+      }
+      return this;
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public Builder clearData() {
+      if (dataBuilder_ == null) {
+        data_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000001);
+        onChanged();
+      } else {
+        dataBuilder_.clear();
+      }
+      return this;
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public Builder removeData(int index) {
+      if (dataBuilder_ == null) {
+        ensureDataIsMutable();
+        data_.remove(index);
+        onChanged();
+      } else {
+        dataBuilder_.remove(index);
+      }
+      return this;
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public org.dbsyncer.storage.binlog.proto.Data.Builder getDataBuilder(
+            int index) {
+      return getDataFieldBuilder().getBuilder(index);
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public org.dbsyncer.storage.binlog.proto.DataOrBuilder getDataOrBuilder(
+            int index) {
+      if (dataBuilder_ == null) {
+        return data_.get(index);
+      } else {
+        return dataBuilder_.getMessageOrBuilder(index);
+      }
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public java.util.List<? extends org.dbsyncer.storage.binlog.proto.DataOrBuilder>
+    getDataOrBuilderList() {
+      if (dataBuilder_ != null) {
+        return dataBuilder_.getMessageOrBuilderList();
+      } else {
+        return java.util.Collections.unmodifiableList(data_);
+      }
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public org.dbsyncer.storage.binlog.proto.Data.Builder addDataBuilder() {
+      return getDataFieldBuilder().addBuilder(
+              org.dbsyncer.storage.binlog.proto.Data.getDefaultInstance());
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public org.dbsyncer.storage.binlog.proto.Data.Builder addDataBuilder(
+            int index) {
+      return getDataFieldBuilder().addBuilder(
+              index, org.dbsyncer.storage.binlog.proto.Data.getDefaultInstance());
+    }
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    public java.util.List<org.dbsyncer.storage.binlog.proto.Data.Builder>
+    getDataBuilderList() {
+      return getDataFieldBuilder().getBuilderList();
+    }
+
+    private com.google.protobuf.RepeatedFieldBuilderV3<
+            org.dbsyncer.storage.binlog.proto.Data, org.dbsyncer.storage.binlog.proto.Data.Builder, org.dbsyncer.storage.binlog.proto.DataOrBuilder>
+    getDataFieldBuilder() {
+      if (dataBuilder_ == null) {
+        dataBuilder_ = new com.google.protobuf.RepeatedFieldBuilderV3<
+                org.dbsyncer.storage.binlog.proto.Data, org.dbsyncer.storage.binlog.proto.Data.Builder, org.dbsyncer.storage.binlog.proto.DataOrBuilder>(
+                data_,
+                ((bitField0_ & 0x00000001) != 0),
+                getParentForChildren(),
+                isClean());
+        data_ = null;
+      }
+      return dataBuilder_;
+    }
+
+    @java.lang.Override
+    public final Builder setUnknownFields(
+            final com.google.protobuf.UnknownFieldSet unknownFields) {
+      return super.setUnknownFields(unknownFields);
+    }
+
+    @java.lang.Override
+    public final Builder mergeUnknownFields(
+            final com.google.protobuf.UnknownFieldSet unknownFields) {
+      return super.mergeUnknownFields(unknownFields);
+    }
+
+
+    // @@protoc_insertion_point(builder_scope:BinlogMessage)
+  }
+
+  // @@protoc_insertion_point(class_scope:BinlogMessage)
+  private static final org.dbsyncer.storage.binlog.proto.BinlogMessage DEFAULT_INSTANCE;
+
+  static {
+    DEFAULT_INSTANCE = new org.dbsyncer.storage.binlog.proto.BinlogMessage();
+  }
+
+  public static org.dbsyncer.storage.binlog.proto.BinlogMessage getDefaultInstance() {
+    return DEFAULT_INSTANCE;
+  }
+
+  private static final com.google.protobuf.Parser<BinlogMessage>
+          PARSER = new com.google.protobuf.AbstractParser<BinlogMessage>() {
+    @java.lang.Override
+    public BinlogMessage parsePartialFrom(
+            com.google.protobuf.CodedInputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+      return new BinlogMessage(input, extensionRegistry);
+    }
+  };
+
+  public static com.google.protobuf.Parser<BinlogMessage> parser() {
+    return PARSER;
+  }
+
+  @java.lang.Override
+  public com.google.protobuf.Parser<BinlogMessage> getParserForType() {
+    return PARSER;
+  }
+
+  @java.lang.Override
+  public org.dbsyncer.storage.binlog.proto.BinlogMessage getDefaultInstanceForType() {
+    return DEFAULT_INSTANCE;
+  }
+
+}
+

+ 66 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/proto/BinlogMessageOrBuilder.java

@@ -0,0 +1,66 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: BinlogMessageProto.proto
+
+package org.dbsyncer.storage.binlog.proto;
+
+public interface BinlogMessageOrBuilder extends
+        // @@protoc_insertion_point(interface_extends:BinlogMessage)
+        com.google.protobuf.MessageOrBuilder {
+
+    /**
+     * <code>string table_group_id = 1;</code>
+     *
+     * @return The tableGroupId.
+     */
+    java.lang.String getTableGroupId();
+
+    /**
+     * <code>string table_group_id = 1;</code>
+     *
+     * @return The bytes for tableGroupId.
+     */
+    com.google.protobuf.ByteString
+    getTableGroupIdBytes();
+
+    /**
+     * <code>.Event event = 2;</code>
+     *
+     * @return The enum numeric value on the wire for event.
+     */
+    int getEventValue();
+
+    /**
+     * <code>.Event event = 2;</code>
+     *
+     * @return The event.
+     */
+    org.dbsyncer.storage.binlog.proto.Event getEvent();
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    java.util.List<org.dbsyncer.storage.binlog.proto.Data>
+    getDataList();
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    org.dbsyncer.storage.binlog.proto.Data getData(int index);
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    int getDataCount();
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    java.util.List<? extends org.dbsyncer.storage.binlog.proto.DataOrBuilder>
+    getDataOrBuilderList();
+
+    /**
+     * <code>repeated .Data data = 3;</code>
+     */
+    org.dbsyncer.storage.binlog.proto.DataOrBuilder getDataOrBuilder(
+            int index);
+}

+ 80 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/proto/BinlogMessageProto.java

@@ -0,0 +1,80 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: BinlogMessageProto.proto
+
+package org.dbsyncer.storage.binlog.proto;
+
+public final class BinlogMessageProto {
+    private BinlogMessageProto() {
+    }
+
+    public static void registerAllExtensions(
+            com.google.protobuf.ExtensionRegistryLite registry) {
+    }
+
+    public static void registerAllExtensions(
+            com.google.protobuf.ExtensionRegistry registry) {
+        registerAllExtensions(
+                (com.google.protobuf.ExtensionRegistryLite) registry);
+    }
+
+    static final com.google.protobuf.Descriptors.Descriptor
+            internal_static_BinlogMessage_descriptor;
+    static final
+    com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+            internal_static_BinlogMessage_fieldAccessorTable;
+    static final com.google.protobuf.Descriptors.Descriptor
+            internal_static_Data_descriptor;
+    static final
+    com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+            internal_static_Data_fieldAccessorTable;
+    static final com.google.protobuf.Descriptors.Descriptor
+            internal_static_Data_RowEntry_descriptor;
+    static final
+    com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+            internal_static_Data_RowEntry_fieldAccessorTable;
+
+    public static com.google.protobuf.Descriptors.FileDescriptor
+    getDescriptor() {
+        return descriptor;
+    }
+
+    private static com.google.protobuf.Descriptors.FileDescriptor
+            descriptor;
+
+    static {
+        java.lang.String[] descriptorData = {
+                "\n\030BinlogMessageProto.proto\"S\n\rBinlogMess" +
+                        "age\022\026\n\016table_group_id\030\001 \001(\t\022\025\n\005event\030\002 \001" +
+                        "(\0162\006.Event\022\023\n\004data\030\003 \003(\0132\005.Data\"O\n\004Data\022" +
+                        "\033\n\003row\030\001 \003(\0132\016.Data.RowEntry\032*\n\010RowEntry" +
+                        "\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\014:\0028\001*+\n\005Even" +
+                        "t\022\n\n\006UPDATE\020\000\022\n\n\006INSERT\020\001\022\n\n\006DELETE\020\002B;\n" +
+                        "!org.dbsyncer.storage.binlog.protoB\022Binl" +
+                        "ogMessageProtoH\001P\001b\006proto3"
+        };
+        descriptor = com.google.protobuf.Descriptors.FileDescriptor
+                .internalBuildGeneratedFileFrom(descriptorData,
+                        new com.google.protobuf.Descriptors.FileDescriptor[]{
+                        });
+        internal_static_BinlogMessage_descriptor =
+                getDescriptor().getMessageTypes().get(0);
+        internal_static_BinlogMessage_fieldAccessorTable = new
+                com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
+                internal_static_BinlogMessage_descriptor,
+                new java.lang.String[]{"TableGroupId", "Event", "Data",});
+        internal_static_Data_descriptor =
+                getDescriptor().getMessageTypes().get(1);
+        internal_static_Data_fieldAccessorTable = new
+                com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
+                internal_static_Data_descriptor,
+                new java.lang.String[]{"Row",});
+        internal_static_Data_RowEntry_descriptor =
+                internal_static_Data_descriptor.getNestedTypes().get(0);
+        internal_static_Data_RowEntry_fieldAccessorTable = new
+                com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
+                internal_static_Data_RowEntry_descriptor,
+                new java.lang.String[]{"Key", "Value",});
+    }
+
+    // @@protoc_insertion_point(outer_class_scope)
+}

+ 782 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/proto/Data.java

@@ -0,0 +1,782 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: BinlogMessageProto.proto
+
+package org.dbsyncer.storage.binlog.proto;
+
+/**
+ * Protobuf type {@code Data}
+ */
+public final class Data extends
+        com.google.protobuf.GeneratedMessageV3 implements
+        // @@protoc_insertion_point(message_implements:Data)
+        DataOrBuilder {
+    private static final long serialVersionUID = 0L;
+
+    // Use Data.newBuilder() to construct.
+    private Data(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
+        super(builder);
+    }
+
+    private Data() {
+    }
+
+    @java.lang.Override
+    @SuppressWarnings({"unused"})
+    protected java.lang.Object newInstance(
+            UnusedPrivateParameter unused) {
+        return new Data();
+    }
+
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+    getUnknownFields() {
+        return this.unknownFields;
+    }
+
+    private Data(
+            com.google.protobuf.CodedInputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+        this();
+        if (extensionRegistry == null) {
+            throw new java.lang.NullPointerException();
+        }
+        int mutable_bitField0_ = 0;
+        com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+                com.google.protobuf.UnknownFieldSet.newBuilder();
+        try {
+            boolean done = false;
+            while (!done) {
+                int tag = input.readTag();
+                switch (tag) {
+                    case 0:
+                        done = true;
+                        break;
+                    case 10: {
+                        if (!((mutable_bitField0_ & 0x00000001) != 0)) {
+                            row_ = com.google.protobuf.MapField.newMapField(
+                                    RowDefaultEntryHolder.defaultEntry);
+                            mutable_bitField0_ |= 0x00000001;
+                        }
+                        com.google.protobuf.MapEntry<java.lang.String, com.google.protobuf.ByteString>
+                                row__ = input.readMessage(
+                                RowDefaultEntryHolder.defaultEntry.getParserForType(), extensionRegistry);
+                        row_.getMutableMap().put(
+                                row__.getKey(), row__.getValue());
+                        break;
+                    }
+                    default: {
+                        if (!parseUnknownField(
+                                input, unknownFields, extensionRegistry, tag)) {
+                            done = true;
+                        }
+                        break;
+                    }
+                }
+            }
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+            throw e.setUnfinishedMessage(this);
+        } catch (com.google.protobuf.UninitializedMessageException e) {
+            throw e.asInvalidProtocolBufferException().setUnfinishedMessage(this);
+        } catch (java.io.IOException e) {
+            throw new com.google.protobuf.InvalidProtocolBufferException(
+                    e).setUnfinishedMessage(this);
+        } finally {
+            this.unknownFields = unknownFields.build();
+            makeExtensionsImmutable();
+        }
+    }
+
+    public static final com.google.protobuf.Descriptors.Descriptor
+    getDescriptor() {
+        return org.dbsyncer.storage.binlog.proto.BinlogMessageProto.internal_static_Data_descriptor;
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    @java.lang.Override
+    protected com.google.protobuf.MapField internalGetMapField(
+            int number) {
+        switch (number) {
+            case 1:
+                return internalGetRow();
+            default:
+                throw new RuntimeException(
+                        "Invalid map field number: " + number);
+        }
+    }
+
+    @java.lang.Override
+    protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+    internalGetFieldAccessorTable() {
+        return org.dbsyncer.storage.binlog.proto.BinlogMessageProto.internal_static_Data_fieldAccessorTable
+                .ensureFieldAccessorsInitialized(
+                        org.dbsyncer.storage.binlog.proto.Data.class, org.dbsyncer.storage.binlog.proto.Data.Builder.class);
+    }
+
+    public static final int ROW_FIELD_NUMBER = 1;
+
+    private static final class RowDefaultEntryHolder {
+        static final com.google.protobuf.MapEntry<
+                java.lang.String, com.google.protobuf.ByteString> defaultEntry =
+                com.google.protobuf.MapEntry
+                        .<java.lang.String, com.google.protobuf.ByteString>newDefaultInstance(
+                                org.dbsyncer.storage.binlog.proto.BinlogMessageProto.internal_static_Data_RowEntry_descriptor,
+                                com.google.protobuf.WireFormat.FieldType.STRING,
+                                "",
+                                com.google.protobuf.WireFormat.FieldType.BYTES,
+                                com.google.protobuf.ByteString.EMPTY);
+    }
+
+    private com.google.protobuf.MapField<
+            java.lang.String, com.google.protobuf.ByteString> row_;
+
+    private com.google.protobuf.MapField<java.lang.String, com.google.protobuf.ByteString>
+    internalGetRow() {
+        if (row_ == null) {
+            return com.google.protobuf.MapField.emptyMapField(
+                    RowDefaultEntryHolder.defaultEntry);
+        }
+        return row_;
+    }
+
+    public int getRowCount() {
+        return internalGetRow().getMap().size();
+    }
+
+    /**
+     * <code>map&lt;string, bytes&gt; row = 1;</code>
+     */
+
+    @java.lang.Override
+    public boolean containsRow(
+            java.lang.String key) {
+        if (key == null) {
+            throw new NullPointerException("map key");
+        }
+        return internalGetRow().getMap().containsKey(key);
+    }
+
+    /**
+     * Use {@link #getRowMap()} instead.
+     */
+    @java.lang.Override
+    @java.lang.Deprecated
+    public java.util.Map<java.lang.String, com.google.protobuf.ByteString> getRow() {
+        return getRowMap();
+    }
+
+    /**
+     * <code>map&lt;string, bytes&gt; row = 1;</code>
+     */
+    @java.lang.Override
+
+    public java.util.Map<java.lang.String, com.google.protobuf.ByteString> getRowMap() {
+        return internalGetRow().getMap();
+    }
+
+    /**
+     * <code>map&lt;string, bytes&gt; row = 1;</code>
+     */
+    @java.lang.Override
+
+    public com.google.protobuf.ByteString getRowOrDefault(
+            java.lang.String key,
+            com.google.protobuf.ByteString defaultValue) {
+        if (key == null) {
+            throw new NullPointerException("map key");
+        }
+        java.util.Map<java.lang.String, com.google.protobuf.ByteString> map =
+                internalGetRow().getMap();
+        return map.containsKey(key) ? map.get(key) : defaultValue;
+    }
+
+    /**
+     * <code>map&lt;string, bytes&gt; row = 1;</code>
+     */
+    @java.lang.Override
+
+    public com.google.protobuf.ByteString getRowOrThrow(
+            java.lang.String key) {
+        if (key == null) {
+            throw new NullPointerException("map key");
+        }
+        java.util.Map<java.lang.String, com.google.protobuf.ByteString> map =
+                internalGetRow().getMap();
+        if (!map.containsKey(key)) {
+            throw new java.lang.IllegalArgumentException();
+        }
+        return map.get(key);
+    }
+
+    private byte memoizedIsInitialized = -1;
+
+    @java.lang.Override
+    public final boolean isInitialized() {
+        byte isInitialized = memoizedIsInitialized;
+        if (isInitialized == 1) return true;
+        if (isInitialized == 0) return false;
+
+        memoizedIsInitialized = 1;
+        return true;
+    }
+
+    @java.lang.Override
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+            throws java.io.IOException {
+        com.google.protobuf.GeneratedMessageV3
+                .serializeStringMapTo(
+                        output,
+                        internalGetRow(),
+                        RowDefaultEntryHolder.defaultEntry,
+                        1);
+        unknownFields.writeTo(output);
+    }
+
+    @java.lang.Override
+    public int getSerializedSize() {
+        int size = memoizedSize;
+        if (size != -1) return size;
+
+        size = 0;
+        for (java.util.Map.Entry<java.lang.String, com.google.protobuf.ByteString> entry
+                : internalGetRow().getMap().entrySet()) {
+            com.google.protobuf.MapEntry<java.lang.String, com.google.protobuf.ByteString>
+                    row__ = RowDefaultEntryHolder.defaultEntry.newBuilderForType()
+                    .setKey(entry.getKey())
+                    .setValue(entry.getValue())
+                    .build();
+            size += com.google.protobuf.CodedOutputStream
+                    .computeMessageSize(1, row__);
+        }
+        size += unknownFields.getSerializedSize();
+        memoizedSize = size;
+        return size;
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (!(obj instanceof org.dbsyncer.storage.binlog.proto.Data)) {
+            return super.equals(obj);
+        }
+        org.dbsyncer.storage.binlog.proto.Data other = (org.dbsyncer.storage.binlog.proto.Data) obj;
+
+        if (!internalGetRow().equals(
+                other.internalGetRow())) return false;
+        if (!unknownFields.equals(other.unknownFields)) return false;
+        return true;
+    }
+
+    @java.lang.Override
+    public int hashCode() {
+        if (memoizedHashCode != 0) {
+            return memoizedHashCode;
+        }
+        int hash = 41;
+        hash = (19 * hash) + getDescriptor().hashCode();
+        if (!internalGetRow().getMap().isEmpty()) {
+            hash = (37 * hash) + ROW_FIELD_NUMBER;
+            hash = (53 * hash) + internalGetRow().hashCode();
+        }
+        hash = (29 * hash) + unknownFields.hashCode();
+        memoizedHashCode = hash;
+        return hash;
+    }
+
+    public static org.dbsyncer.storage.binlog.proto.Data parseFrom(
+            java.nio.ByteBuffer data)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+        return PARSER.parseFrom(data);
+    }
+
+    public static org.dbsyncer.storage.binlog.proto.Data parseFrom(
+            java.nio.ByteBuffer data,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+        return PARSER.parseFrom(data, extensionRegistry);
+    }
+
+    public static org.dbsyncer.storage.binlog.proto.Data parseFrom(
+            com.google.protobuf.ByteString data)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+        return PARSER.parseFrom(data);
+    }
+
+    public static org.dbsyncer.storage.binlog.proto.Data parseFrom(
+            com.google.protobuf.ByteString data,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+        return PARSER.parseFrom(data, extensionRegistry);
+    }
+
+    public static org.dbsyncer.storage.binlog.proto.Data parseFrom(byte[] data)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+        return PARSER.parseFrom(data);
+    }
+
+    public static org.dbsyncer.storage.binlog.proto.Data parseFrom(
+            byte[] data,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+        return PARSER.parseFrom(data, extensionRegistry);
+    }
+
+    public static org.dbsyncer.storage.binlog.proto.Data parseFrom(java.io.InputStream input)
+            throws java.io.IOException {
+        return com.google.protobuf.GeneratedMessageV3
+                .parseWithIOException(PARSER, input);
+    }
+
+    public static org.dbsyncer.storage.binlog.proto.Data parseFrom(
+            java.io.InputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws java.io.IOException {
+        return com.google.protobuf.GeneratedMessageV3
+                .parseWithIOException(PARSER, input, extensionRegistry);
+    }
+
+    public static org.dbsyncer.storage.binlog.proto.Data parseDelimitedFrom(java.io.InputStream input)
+            throws java.io.IOException {
+        return com.google.protobuf.GeneratedMessageV3
+                .parseDelimitedWithIOException(PARSER, input);
+    }
+
+    public static org.dbsyncer.storage.binlog.proto.Data parseDelimitedFrom(
+            java.io.InputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws java.io.IOException {
+        return com.google.protobuf.GeneratedMessageV3
+                .parseDelimitedWithIOException(PARSER, input, extensionRegistry);
+    }
+
+    public static org.dbsyncer.storage.binlog.proto.Data parseFrom(
+            com.google.protobuf.CodedInputStream input)
+            throws java.io.IOException {
+        return com.google.protobuf.GeneratedMessageV3
+                .parseWithIOException(PARSER, input);
+    }
+
+    public static org.dbsyncer.storage.binlog.proto.Data parseFrom(
+            com.google.protobuf.CodedInputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws java.io.IOException {
+        return com.google.protobuf.GeneratedMessageV3
+                .parseWithIOException(PARSER, input, extensionRegistry);
+    }
+
+    @java.lang.Override
+    public Builder newBuilderForType() {
+        return newBuilder();
+    }
+
+    public static Builder newBuilder() {
+        return DEFAULT_INSTANCE.toBuilder();
+    }
+
+    public static Builder newBuilder(org.dbsyncer.storage.binlog.proto.Data prototype) {
+        return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
+    }
+
+    @java.lang.Override
+    public Builder toBuilder() {
+        return this == DEFAULT_INSTANCE
+                ? new Builder() : new Builder().mergeFrom(this);
+    }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+            com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+        Builder builder = new Builder(parent);
+        return builder;
+    }
+
+    /**
+     * Protobuf type {@code Data}
+     */
+    public static final class Builder extends
+            com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements
+            // @@protoc_insertion_point(builder_implements:Data)
+            org.dbsyncer.storage.binlog.proto.DataOrBuilder {
+        public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+            return org.dbsyncer.storage.binlog.proto.BinlogMessageProto.internal_static_Data_descriptor;
+        }
+
+        @SuppressWarnings({"rawtypes"})
+        protected com.google.protobuf.MapField internalGetMapField(
+                int number) {
+            switch (number) {
+                case 1:
+                    return internalGetRow();
+                default:
+                    throw new RuntimeException(
+                            "Invalid map field number: " + number);
+            }
+        }
+
+        @SuppressWarnings({"rawtypes"})
+        protected com.google.protobuf.MapField internalGetMutableMapField(
+                int number) {
+            switch (number) {
+                case 1:
+                    return internalGetMutableRow();
+                default:
+                    throw new RuntimeException(
+                            "Invalid map field number: " + number);
+            }
+        }
+
+        @java.lang.Override
+        protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+            return org.dbsyncer.storage.binlog.proto.BinlogMessageProto.internal_static_Data_fieldAccessorTable
+                    .ensureFieldAccessorsInitialized(
+                            org.dbsyncer.storage.binlog.proto.Data.class, org.dbsyncer.storage.binlog.proto.Data.Builder.class);
+        }
+
+        // Construct using org.dbsyncer.storage.binlog.proto.Data.newBuilder()
+        private Builder() {
+            maybeForceBuilderInitialization();
+        }
+
+        private Builder(
+                com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+            super(parent);
+            maybeForceBuilderInitialization();
+        }
+
+        private void maybeForceBuilderInitialization() {
+            if (com.google.protobuf.GeneratedMessageV3
+                    .alwaysUseFieldBuilders) {
+            }
+        }
+
+        @java.lang.Override
+        public Builder clear() {
+            super.clear();
+            internalGetMutableRow().clear();
+            return this;
+        }
+
+        @java.lang.Override
+        public com.google.protobuf.Descriptors.Descriptor
+        getDescriptorForType() {
+            return org.dbsyncer.storage.binlog.proto.BinlogMessageProto.internal_static_Data_descriptor;
+        }
+
+        @java.lang.Override
+        public org.dbsyncer.storage.binlog.proto.Data getDefaultInstanceForType() {
+            return org.dbsyncer.storage.binlog.proto.Data.getDefaultInstance();
+        }
+
+        @java.lang.Override
+        public org.dbsyncer.storage.binlog.proto.Data build() {
+            org.dbsyncer.storage.binlog.proto.Data result = buildPartial();
+            if (!result.isInitialized()) {
+                throw newUninitializedMessageException(result);
+            }
+            return result;
+        }
+
+        @java.lang.Override
+        public org.dbsyncer.storage.binlog.proto.Data buildPartial() {
+            org.dbsyncer.storage.binlog.proto.Data result = new org.dbsyncer.storage.binlog.proto.Data(this);
+            int from_bitField0_ = bitField0_;
+            result.row_ = internalGetRow();
+            result.row_.makeImmutable();
+            onBuilt();
+            return result;
+        }
+
+        @java.lang.Override
+        public Builder clone() {
+            return super.clone();
+        }
+
+        @java.lang.Override
+        public Builder setField(
+                com.google.protobuf.Descriptors.FieldDescriptor field,
+                java.lang.Object value) {
+            return super.setField(field, value);
+        }
+
+        @java.lang.Override
+        public Builder clearField(
+                com.google.protobuf.Descriptors.FieldDescriptor field) {
+            return super.clearField(field);
+        }
+
+        @java.lang.Override
+        public Builder clearOneof(
+                com.google.protobuf.Descriptors.OneofDescriptor oneof) {
+            return super.clearOneof(oneof);
+        }
+
+        @java.lang.Override
+        public Builder setRepeatedField(
+                com.google.protobuf.Descriptors.FieldDescriptor field,
+                int index, java.lang.Object value) {
+            return super.setRepeatedField(field, index, value);
+        }
+
+        @java.lang.Override
+        public Builder addRepeatedField(
+                com.google.protobuf.Descriptors.FieldDescriptor field,
+                java.lang.Object value) {
+            return super.addRepeatedField(field, value);
+        }
+
+        @java.lang.Override
+        public Builder mergeFrom(com.google.protobuf.Message other) {
+            if (other instanceof org.dbsyncer.storage.binlog.proto.Data) {
+                return mergeFrom((org.dbsyncer.storage.binlog.proto.Data) other);
+            } else {
+                super.mergeFrom(other);
+                return this;
+            }
+        }
+
+        public Builder mergeFrom(org.dbsyncer.storage.binlog.proto.Data other) {
+            if (other == org.dbsyncer.storage.binlog.proto.Data.getDefaultInstance()) return this;
+            internalGetMutableRow().mergeFrom(
+                    other.internalGetRow());
+            this.mergeUnknownFields(other.unknownFields);
+            onChanged();
+            return this;
+        }
+
+        @java.lang.Override
+        public final boolean isInitialized() {
+            return true;
+        }
+
+        @java.lang.Override
+        public Builder mergeFrom(
+                com.google.protobuf.CodedInputStream input,
+                com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                throws java.io.IOException {
+            org.dbsyncer.storage.binlog.proto.Data parsedMessage = null;
+            try {
+                parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+            } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+                parsedMessage = (org.dbsyncer.storage.binlog.proto.Data) e.getUnfinishedMessage();
+                throw e.unwrapIOException();
+            } finally {
+                if (parsedMessage != null) {
+                    mergeFrom(parsedMessage);
+                }
+            }
+            return this;
+        }
+
+        private int bitField0_;
+
+        private com.google.protobuf.MapField<
+                java.lang.String, com.google.protobuf.ByteString> row_;
+
+        private com.google.protobuf.MapField<java.lang.String, com.google.protobuf.ByteString>
+        internalGetRow() {
+            if (row_ == null) {
+                return com.google.protobuf.MapField.emptyMapField(
+                        RowDefaultEntryHolder.defaultEntry);
+            }
+            return row_;
+        }
+
+        private com.google.protobuf.MapField<java.lang.String, com.google.protobuf.ByteString>
+        internalGetMutableRow() {
+            onChanged();
+            ;
+            if (row_ == null) {
+                row_ = com.google.protobuf.MapField.newMapField(
+                        RowDefaultEntryHolder.defaultEntry);
+            }
+            if (!row_.isMutable()) {
+                row_ = row_.copy();
+            }
+            return row_;
+        }
+
+        public int getRowCount() {
+            return internalGetRow().getMap().size();
+        }
+
+        /**
+         * <code>map&lt;string, bytes&gt; row = 1;</code>
+         */
+
+        @java.lang.Override
+        public boolean containsRow(
+                java.lang.String key) {
+            if (key == null) {
+                throw new NullPointerException("map key");
+            }
+            return internalGetRow().getMap().containsKey(key);
+        }
+
+        /**
+         * Use {@link #getRowMap()} instead.
+         */
+        @java.lang.Override
+        @java.lang.Deprecated
+        public java.util.Map<java.lang.String, com.google.protobuf.ByteString> getRow() {
+            return getRowMap();
+        }
+
+        /**
+         * <code>map&lt;string, bytes&gt; row = 1;</code>
+         */
+        @java.lang.Override
+
+        public java.util.Map<java.lang.String, com.google.protobuf.ByteString> getRowMap() {
+            return internalGetRow().getMap();
+        }
+
+        /**
+         * <code>map&lt;string, bytes&gt; row = 1;</code>
+         */
+        @java.lang.Override
+
+        public com.google.protobuf.ByteString getRowOrDefault(
+                java.lang.String key,
+                com.google.protobuf.ByteString defaultValue) {
+            if (key == null) {
+                throw new NullPointerException("map key");
+            }
+            java.util.Map<java.lang.String, com.google.protobuf.ByteString> map =
+                    internalGetRow().getMap();
+            return map.containsKey(key) ? map.get(key) : defaultValue;
+        }
+
+        /**
+         * <code>map&lt;string, bytes&gt; row = 1;</code>
+         */
+        @java.lang.Override
+
+        public com.google.protobuf.ByteString getRowOrThrow(
+                java.lang.String key) {
+            if (key == null) {
+                throw new NullPointerException("map key");
+            }
+            java.util.Map<java.lang.String, com.google.protobuf.ByteString> map =
+                    internalGetRow().getMap();
+            if (!map.containsKey(key)) {
+                throw new java.lang.IllegalArgumentException();
+            }
+            return map.get(key);
+        }
+
+        public Builder clearRow() {
+            internalGetMutableRow().getMutableMap()
+                    .clear();
+            return this;
+        }
+
+        /**
+         * <code>map&lt;string, bytes&gt; row = 1;</code>
+         */
+
+        public Builder removeRow(
+                java.lang.String key) {
+            if (key == null) {
+                throw new NullPointerException("map key");
+            }
+            internalGetMutableRow().getMutableMap()
+                    .remove(key);
+            return this;
+        }
+
+        /**
+         * Use alternate mutation accessors instead.
+         */
+        @java.lang.Deprecated
+        public java.util.Map<java.lang.String, com.google.protobuf.ByteString>
+        getMutableRow() {
+            return internalGetMutableRow().getMutableMap();
+        }
+
+        /**
+         * <code>map&lt;string, bytes&gt; row = 1;</code>
+         */
+        public Builder putRow(
+                java.lang.String key,
+                com.google.protobuf.ByteString value) {
+            if (key == null) {
+                throw new NullPointerException("map key");
+            }
+            if (value == null) {
+                throw new NullPointerException("map value");
+            }
+
+            internalGetMutableRow().getMutableMap()
+                    .put(key, value);
+            return this;
+        }
+
+        /**
+         * <code>map&lt;string, bytes&gt; row = 1;</code>
+         */
+
+        public Builder putAllRow(
+                java.util.Map<java.lang.String, com.google.protobuf.ByteString> values) {
+            internalGetMutableRow().getMutableMap()
+                    .putAll(values);
+            return this;
+        }
+
+        @java.lang.Override
+        public final Builder setUnknownFields(
+                final com.google.protobuf.UnknownFieldSet unknownFields) {
+            return super.setUnknownFields(unknownFields);
+        }
+
+        @java.lang.Override
+        public final Builder mergeUnknownFields(
+                final com.google.protobuf.UnknownFieldSet unknownFields) {
+            return super.mergeUnknownFields(unknownFields);
+        }
+
+
+        // @@protoc_insertion_point(builder_scope:Data)
+    }
+
+    // @@protoc_insertion_point(class_scope:Data)
+    private static final org.dbsyncer.storage.binlog.proto.Data DEFAULT_INSTANCE;
+
+    static {
+        DEFAULT_INSTANCE = new org.dbsyncer.storage.binlog.proto.Data();
+    }
+
+    public static org.dbsyncer.storage.binlog.proto.Data getDefaultInstance() {
+        return DEFAULT_INSTANCE;
+    }
+
+    private static final com.google.protobuf.Parser<Data>
+            PARSER = new com.google.protobuf.AbstractParser<Data>() {
+        @java.lang.Override
+        public Data parsePartialFrom(
+                com.google.protobuf.CodedInputStream input,
+                com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                throws com.google.protobuf.InvalidProtocolBufferException {
+            return new Data(input, extensionRegistry);
+        }
+    };
+
+    public static com.google.protobuf.Parser<Data> parser() {
+        return PARSER;
+    }
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<Data> getParserForType() {
+        return PARSER;
+    }
+
+    @java.lang.Override
+    public org.dbsyncer.storage.binlog.proto.Data getDefaultInstanceForType() {
+        return DEFAULT_INSTANCE;
+    }
+
+}
+

+ 50 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/proto/DataOrBuilder.java

@@ -0,0 +1,50 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: BinlogMessageProto.proto
+
+package org.dbsyncer.storage.binlog.proto;
+
+public interface DataOrBuilder extends
+        // @@protoc_insertion_point(interface_extends:Data)
+        com.google.protobuf.MessageOrBuilder {
+
+    /**
+     * <code>map&lt;string, bytes&gt; row = 1;</code>
+     */
+    int getRowCount();
+
+    /**
+     * <code>map&lt;string, bytes&gt; row = 1;</code>
+     */
+    boolean containsRow(
+            java.lang.String key);
+
+    /**
+     * Use {@link #getRowMap()} instead.
+     */
+    @java.lang.Deprecated
+    java.util.Map<java.lang.String, com.google.protobuf.ByteString>
+    getRow();
+
+    /**
+     * <code>map&lt;string, bytes&gt; row = 1;</code>
+     */
+    java.util.Map<java.lang.String, com.google.protobuf.ByteString>
+    getRowMap();
+
+    /**
+     * <code>map&lt;string, bytes&gt; row = 1;</code>
+     */
+
+    /* nullable */
+    com.google.protobuf.ByteString getRowOrDefault(
+            java.lang.String key,
+            /* nullable */
+            com.google.protobuf.ByteString defaultValue);
+
+    /**
+     * <code>map&lt;string, bytes&gt; row = 1;</code>
+     */
+
+    com.google.protobuf.ByteString getRowOrThrow(
+            java.lang.String key);
+}

+ 129 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/proto/Event.java

@@ -0,0 +1,129 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: BinlogMessageProto.proto
+
+package org.dbsyncer.storage.binlog.proto;
+
+/**
+ * Protobuf enum {@code Event}
+ */
+public enum Event
+        implements com.google.protobuf.ProtocolMessageEnum {
+  /**
+   * <code>UPDATE = 0;</code>
+   */
+  UPDATE(0),
+  /**
+   * <code>INSERT = 1;</code>
+   */
+  INSERT(1),
+  /**
+   * <code>DELETE = 2;</code>
+   */
+  DELETE(2),
+  UNRECOGNIZED(-1),
+  ;
+
+  /**
+   * <code>UPDATE = 0;</code>
+   */
+  public static final int UPDATE_VALUE = 0;
+  /**
+   * <code>INSERT = 1;</code>
+   */
+  public static final int INSERT_VALUE = 1;
+  /**
+   * <code>DELETE = 2;</code>
+   */
+  public static final int DELETE_VALUE = 2;
+
+
+  public final int getNumber() {
+    if (this == UNRECOGNIZED) {
+      throw new java.lang.IllegalArgumentException(
+              "Can't get the number of an unknown enum value.");
+    }
+    return value;
+  }
+
+  /**
+   * @param value The numeric wire value of the corresponding enum entry.
+   * @return The enum associated with the given numeric wire value.
+   * @deprecated Use {@link #forNumber(int)} instead.
+   */
+  @java.lang.Deprecated
+  public static Event valueOf(int value) {
+    return forNumber(value);
+  }
+
+  /**
+   * @param value The numeric wire value of the corresponding enum entry.
+   * @return The enum associated with the given numeric wire value.
+   */
+  public static Event forNumber(int value) {
+    switch (value) {
+      case 0:
+        return UPDATE;
+      case 1:
+        return INSERT;
+      case 2:
+        return DELETE;
+      default:
+        return null;
+    }
+  }
+
+  public static com.google.protobuf.Internal.EnumLiteMap<Event>
+  internalGetValueMap() {
+    return internalValueMap;
+  }
+
+  private static final com.google.protobuf.Internal.EnumLiteMap<
+          Event> internalValueMap =
+          new com.google.protobuf.Internal.EnumLiteMap<Event>() {
+            public Event findValueByNumber(int number) {
+              return Event.forNumber(number);
+            }
+          };
+
+  public final com.google.protobuf.Descriptors.EnumValueDescriptor
+  getValueDescriptor() {
+    if (this == UNRECOGNIZED) {
+      throw new java.lang.IllegalStateException(
+              "Can't get the descriptor of an unrecognized enum value.");
+    }
+    return getDescriptor().getValues().get(ordinal());
+  }
+
+  public final com.google.protobuf.Descriptors.EnumDescriptor
+  getDescriptorForType() {
+    return getDescriptor();
+  }
+
+  public static final com.google.protobuf.Descriptors.EnumDescriptor
+  getDescriptor() {
+    return org.dbsyncer.storage.binlog.proto.BinlogMessageProto.getDescriptor().getEnumTypes().get(0);
+  }
+
+  private static final Event[] VALUES = values();
+
+  public static Event valueOf(
+          com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+    if (desc.getType() != getDescriptor()) {
+      throw new java.lang.IllegalArgumentException(
+              "EnumValueDescriptor is not for this type.");
+    }
+    if (desc.getIndex() == -1) {
+      return UNRECOGNIZED;
+    }
+    return VALUES[desc.getIndex()];
+  }
+
+  private final int value;
+
+  private Event(int value) {
+    this.value = value;
+  }
+
+  // @@protoc_insertion_point(enum_scope:Event)
+}
+

+ 22 - 0
dbsyncer-storage/src/main/proto/BinlogMessageProto.proto

@@ -0,0 +1,22 @@
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.dbsyncer.storage.binlog.proto";
+option java_outer_classname = "BinlogMessageProto";
+option optimize_for = SPEED;
+
+message BinlogMessage {
+    string table_group_id = 1;
+    Event event = 2;
+    repeated Data data = 3;
+}
+
+enum Event {
+    UPDATE = 0;
+    INSERT = 1;
+    DELETE = 2;
+}
+
+message Data {
+    map<string, bytes> row = 1;
+}

+ 43 - 0
dbsyncer-storage/src/main/test/BinlogMessageTest.java

@@ -0,0 +1,43 @@
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.dbsyncer.common.util.DateFormatUtil;
+import org.dbsyncer.storage.binlog.proto.BinlogMessage;
+import org.dbsyncer.storage.binlog.proto.Data;
+import org.dbsyncer.storage.binlog.proto.Event;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalTime;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/6/18 23:46
+ */
+public class BinlogMessageTest {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Test
+    public void testBinlogMessage() throws InvalidProtocolBufferException {
+        LocalTime localTime = DateFormatUtil.stringToLocalTime("2022-06-18 22:59:59");
+        String s = localTime.toString();
+
+        BinlogMessage build = BinlogMessage.newBuilder()
+                .setTableGroupId("123456788888")
+                .setEvent(Event.UPDATE)
+                .addData(Data.newBuilder().putRow("aaa", ByteString.copyFromUtf8("hello,中国")).putRow("aaa111", ByteString.copyFromUtf8(s)))
+                .build();
+
+        byte[] bytes = build.toByteArray();
+        logger.info("序列化长度:{}", bytes.length);
+        logger.info("{}", bytes);
+
+        BinlogMessage message = BinlogMessage.parseFrom(bytes);
+        logger.info(message.getTableGroupId());
+        logger.info(message.getEvent().name());
+        logger.info(message.getDataList().toString());
+    }
+
+}