AE86 2 år sedan
förälder
incheckning
26e9d30ade

+ 62 - 50
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -132,6 +132,8 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         }
     }
 
+    private final ByteBuffer buffer = ByteBuffer.allocate(8);
+
     /**
      * Java语言提供了八种基本类型,六种数字类型(四个整数型,两个浮点型),一种字符类型,一种布尔型。
      * <p>
@@ -158,61 +160,71 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
      * @return
      */
     protected ByteString serializeValue(Object v) {
-        // 字节
-        if (v instanceof byte[]) {
-            return ByteString.copyFrom((byte[]) v);
-        }
-
-        // 字符串
-        if (v instanceof String) {
-            return ByteString.copyFromUtf8((String) v);
-        }
+        String type = v.getClass().getName();
+        switch (type) {
+            // 字节
+//            case "[B":
+//            return ByteString.copyFrom((byte[]) v);
 
-        final ByteBuffer buffer = ByteBuffer.allocate(32);
+            // 字符串
+            case "java.lang.String":
+                return ByteString.copyFromUtf8((String) v);
 
-        // 时间
-        if (v instanceof Timestamp) {
-            Timestamp timestamp = (Timestamp) v;
-            return ByteString.copyFrom(buffer.putLong(timestamp.getTime()).array());
-        }
-        if (v instanceof Date) {
-            Date date = (Date) v;
-            return ByteString.copyFromUtf8(DateFormatUtil.dateToString(date));
-        }
-        if (v instanceof Time) {
-            Time time = (Time) v;
-            return ByteString.copyFromUtf8(time.toString());
-        }
+            // 时间
+            case "java.sql.Timestamp":
+                buffer.clear();
+                Timestamp timestamp = (Timestamp) v;
+                buffer.putLong(timestamp.getTime());
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 8);
+            case "java.sql.Date":
+                return ByteString.copyFromUtf8(DateFormatUtil.dateToString((Date) v));
+            case "java.sql.Time":
+                Time time = (Time) v;
+                return ByteString.copyFromUtf8(time.toString());
 
-        // 数字
-        if (v instanceof Integer) {
-            return ByteString.copyFrom(buffer.putInt((Integer) v).array());
-        }
-        if (v instanceof Long) {
-            return ByteString.copyFrom(buffer.putLong((Long) v).array());
-        }
-        if (v instanceof Short) {
-            return ByteString.copyFrom(buffer.putShort((Short) v).array());
-        }
-        if (v instanceof Float) {
-            return ByteString.copyFrom(buffer.putFloat((Float) v).array());
-        }
-        if (v instanceof Double) {
-            return ByteString.copyFrom(buffer.putDouble((Double) v).array());
-        }
-        if (v instanceof BigDecimal) {
-            BigDecimal bigDecimal = (BigDecimal) v;
-            return ByteString.copyFromUtf8(bigDecimal.toString());
-        }
+            // 数字
+            case "java.lang.Integer":
+                buffer.clear();
+                buffer.putInt((Integer) v);
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 4);
+            case "java.lang.Long":
+                buffer.clear();
+                buffer.putLong((Long) v);
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 8);
+            case "java.lang.Short":
+                buffer.clear();
+                buffer.putShort((Short) v);
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 2);
+            case "java.lang.Float":
+                buffer.clear();
+                buffer.putFloat((Float) v);
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 4);
+            case "java.lang.Double":
+                buffer.clear();
+                buffer.putDouble((Double) v);
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 8);
+            case "java.math.BigDecimal":
+                BigDecimal bigDecimal = (BigDecimal) v;
+                return ByteString.copyFromUtf8(bigDecimal.toString());
+
+            // 布尔(1为true;0为false)
+            case "java.lang.Boolean":
+                buffer.clear();
+                Boolean b = (Boolean) v;
+                buffer.putShort((short) (b ? 1 : 0));
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 2);
 
-        // 布尔(1为true;0为false)
-        if (v instanceof Boolean) {
-            Boolean b = (Boolean) v;
-            return ByteString.copyFrom(buffer.putShort((short) (b ? 1 : 0)).array());
+            default:
+                logger.error("Unsupported serialize value type:{}", type);
+                return null;
         }
-
-        logger.error("Unsupported serialize value type:{}", v.getClass().getSimpleName());
-        return null;
     }
 
     /**

+ 10 - 6
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogColumnValue.java

@@ -17,6 +17,11 @@ import java.sql.Timestamp;
  */
 public class BinlogColumnValue extends AbstractColumnValue<ByteString> {
 
+    private final ByteBuffer oneBytes = ByteBuffer.allocate(1);
+    private final ByteBuffer twoBytes = ByteBuffer.allocate(2);
+    private final ByteBuffer fourBytes = ByteBuffer.allocate(4);
+    private final ByteBuffer eightBytes = ByteBuffer.allocate(8);
+
     @Override
     public String asString() {
         return getValue().toStringUtf8();
@@ -29,16 +34,15 @@ public class BinlogColumnValue extends AbstractColumnValue<ByteString> {
 
     @Override
     public Short asShort() {
-        final ByteBuffer buffer = ByteBuffer.allocate(32);
-        buffer.put(getValue().toByteArray());
-        return buffer.asShortBuffer().get();
+        oneBytes.clear();
+        oneBytes.put(getValue().toByteArray(), 0, oneBytes.capacity());
+        return oneBytes.asShortBuffer().get();
     }
 
     @Override
     public Integer asInteger() {
-        final ByteBuffer buffer = ByteBuffer.allocate(32);
-        buffer.put(getValue().toByteArray());
-        return buffer.asIntBuffer().get();
+        fourBytes.put(getValue().toByteArray(), 0, fourBytes.capacity());
+        return fourBytes.asIntBuffer().get();
     }
 
     @Override

+ 43 - 0
dbsyncer-storage/src/main/test/BinlogMessageFieldTypeTest.java

@@ -0,0 +1,43 @@
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/7/1 23:44
+ */
+public class BinlogMessageFieldTypeTest {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Test
+    public void testMessageFieldType() {
+        final ByteBuffer buffer = ByteBuffer.allocate(32);
+        Timestamp timestamp = Timestamp.valueOf(LocalDateTime.now());
+        ByteBuffer byteBuffer = buffer.putLong(timestamp.getTime());
+        byteBuffer.flip();
+        byte[] bytes = new byte[8];
+        byteBuffer.get(bytes);
+        logger.info("remaining:{}, position:{}, limit:{}, arrayOffset:{}, bytes:{}", byteBuffer.remaining(), byteBuffer.position(), byteBuffer.limit(), byteBuffer.arrayOffset(), bytes);
+
+        byteBuffer.clear();
+        timestamp = Timestamp.valueOf(LocalDateTime.now());
+        byteBuffer = buffer.putLong(timestamp.getTime());
+        byteBuffer.flip();
+        byteBuffer.get(bytes);
+        logger.info("remaining:{}, position:{}, limit:{}, arrayOffset:{}, bytes:{}", byteBuffer.remaining(), byteBuffer.position(), byteBuffer.limit(), byteBuffer.arrayOffset(), bytes);
+
+        byteBuffer.clear();
+        timestamp = Timestamp.valueOf(LocalDateTime.now());
+        byteBuffer = buffer.putLong(timestamp.getTime());
+        byteBuffer.flip();
+        byteBuffer.get(bytes);
+        logger.info("remaining:{}, position:{}, limit:{}, arrayOffset:{}, bytes:{}", byteBuffer.remaining(), byteBuffer.position(), byteBuffer.limit(), byteBuffer.arrayOffset(), bytes);
+    }
+
+}