瀏覽代碼

读写binlog

AE86 2 年之前
父節點
當前提交
b9e0d6e7ca

+ 10 - 4
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -18,6 +18,7 @@ import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
+import java.util.BitSet;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -162,8 +163,8 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         String type = v.getClass().getName();
         switch (type) {
             // 字节
-//            case "[B":
-//            return ByteString.copyFrom((byte[]) v);
+            case "[B":
+            return ByteString.copyFrom((byte[]) v);
 
             // 字符串
             case "java.lang.String":
@@ -201,8 +202,10 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
                 buffer.flip();
                 return ByteString.copyFrom(buffer, 8);
             case "java.lang.Short":
-                Short aShort = (Short) v;
-                return ByteString.copyFromUtf8(aShort.toString());
+                buffer.clear();
+                buffer.putShort((Short) v);
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 2);
             case "java.lang.Float":
                 buffer.clear();
                 buffer.putFloat((Float) v);
@@ -216,6 +219,9 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
             case "java.math.BigDecimal":
                 BigDecimal bigDecimal = (BigDecimal) v;
                 return ByteString.copyFromUtf8(bigDecimal.toString());
+            case "java.util.BitSet":
+                BitSet bitSet = (BitSet) v;
+                return ByteString.copyFrom(bitSet.toByteArray());
 
             // 布尔(1为true;0为false)
             case "java.lang.Boolean":

+ 5 - 2
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogReader.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.storage.binlog.impl;
 
+import com.google.protobuf.CodedInputStream;
 import org.apache.commons.io.IOUtils;
 import org.dbsyncer.common.file.BufferedRandomAccessFile;
 import org.dbsyncer.storage.binlog.AbstractBinlogActuator;
@@ -16,9 +17,10 @@ import java.io.RandomAccessFile;
  */
 public class BinlogReader extends AbstractBinlogActuator {
     private final RandomAccessFile raf;
-    private final byte[] h = new byte[1];
+    private final byte[] h = new byte[4];
     private byte[] b;
     private long offset;
+    private CodedInputStream cis;
 
     public BinlogReader(String path, BinlogIndex binlogIndex, long position) throws IOException {
         initBinlogIndex(binlogIndex);
@@ -32,7 +34,8 @@ public class BinlogReader extends AbstractBinlogActuator {
             return null;
         }
         raf.read(h);
-        b = new byte[Byte.toUnsignedInt(h[0])];
+        cis = CodedInputStream.newInstance(h);
+        b = new byte[cis.readFixed32()];
         raf.read(b);
         raf.seek(this.offset + (h.length + b.length));
         refreshBinlogIndexUpdateTime();

+ 8 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogWriter.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.storage.binlog.impl;
 
+import com.google.protobuf.CodedOutputStream;
 import org.apache.commons.io.IOUtils;
 import org.dbsyncer.storage.binlog.AbstractBinlogActuator;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
@@ -26,7 +27,13 @@ public class BinlogWriter extends AbstractBinlogActuator {
 
     public void write(BinlogMessage message) throws IOException {
         if(null != message){
-            message.writeDelimitedTo(out);
+            // 选择固定长度int32作为tag标志位,4bytes, 最多可容纳2^31-1字节(2047MB左右, 建议上限64~128M内最佳),
+            final int serialized = message.getSerializedSize();
+            final int bufferSize = CodedOutputStream.computeFixed32SizeNoTag(serialized) + serialized;
+            final CodedOutputStream codedOutput = CodedOutputStream.newInstance(out, bufferSize);
+            codedOutput.writeFixed32NoTag(serialized);
+            message.writeTo(codedOutput);
+            codedOutput.flush();
             refreshBinlogIndexUpdateTime();
         }
     }

+ 10 - 4
dbsyncer-storage/src/main/test/BinlogMessageTest.java

@@ -14,6 +14,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.charset.Charset;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
@@ -55,21 +56,26 @@ public class BinlogMessageTest {
 
         byte[] line;
         while (null != (line = context.readLine())) {
-            BinlogMessage binlogMessage = BinlogMessage.parseFrom(line);
-            logger.info(binlogMessage.toString());
+            logger.info("size:{}, {}", line.length, line);
+            BinlogMessage message = BinlogMessage.parseFrom(line);
+            logger.info(message.toString());
         }
         context.flush();
     }
 
     private void write(String tableGroupId, String key) throws IOException {
         Map<String, Object> data = new HashMap<>();
-        data.put("id", 1);
+        data.put("id", 1L);
         data.put("name", key + "中文");
         data.put("age", 88);
-        data.put("number", 123456789L);
+        data.put("bd", new BigDecimal(88));
+        data.put("sex", 1);
         data.put("f", 88.88f);
         data.put("d", 999.99d);
         data.put("b", true);
+        short ss = 32767;
+        data.put("ss", ss);
+        data.put("bytes", "中文666".getBytes(Charset.defaultCharset()));
         data.put("create_date", new Date(Timestamp.valueOf(LocalDateTime.now()).getTime()));
         data.put("update_time", Timestamp.valueOf(LocalDateTime.now()).getTime());