1
0
AE86 2 жил өмнө
parent
commit
3587c3d6ed

+ 1 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/column/ColumnValue.java

@@ -30,7 +30,7 @@ public interface ColumnValue {
 
     Boolean asBoolean();
 
-    BigDecimal asDecimal();
+    BigDecimal asBigDecimal();
 
     Date asDate();
 

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/PgColumnValue.java

@@ -64,7 +64,7 @@ public final class PgColumnValue extends AbstractColumnValue<String> {
     }
 
     @Override
-    public BigDecimal asDecimal() {
+    public BigDecimal asBigDecimal() {
         return new BigDecimal(getValue());
     }
 

+ 10 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -178,10 +178,17 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
                 buffer.flip();
                 return ByteString.copyFrom(buffer, 8);
             case "java.sql.Date":
-                return ByteString.copyFromUtf8(DateFormatUtil.dateToString((Date) v));
+                buffer.clear();
+                Date date = (Date) v;
+                buffer.putLong(date.getTime());
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 8);
             case "java.sql.Time":
+                buffer.clear();
                 Time time = (Time) v;
-                return ByteString.copyFromUtf8(time.toString());
+                buffer.putLong(time.getTime());
+                buffer.flip();
+                return ByteString.copyFrom(buffer, 8);
 
             // 数字
             case "java.lang.Integer":
@@ -272,7 +279,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
                 return value.asDouble();
             case Types.DECIMAL:
             case Types.NUMERIC:
-                return value.asDecimal();
+                return value.asBigDecimal();
 
             // 布尔
             case Types.BOOLEAN:

+ 24 - 20
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogColumnValue.java

@@ -17,10 +17,7 @@ import java.sql.Timestamp;
  */
 public class BinlogColumnValue extends AbstractColumnValue<ByteString> {
 
-    private final ByteBuffer oneBytes = ByteBuffer.allocate(1);
-    private final ByteBuffer twoBytes = ByteBuffer.allocate(2);
-    private final ByteBuffer fourBytes = ByteBuffer.allocate(4);
-    private final ByteBuffer eightBytes = ByteBuffer.allocate(8);
+    private final ByteBuffer buffer = ByteBuffer.allocate(8);
 
     @Override
     public String asString() {
@@ -34,53 +31,60 @@ public class BinlogColumnValue extends AbstractColumnValue<ByteString> {
 
     @Override
     public Short asShort() {
-        oneBytes.clear();
-        oneBytes.put(getValue().toByteArray(), 0, oneBytes.capacity());
-        return oneBytes.asShortBuffer().get();
+        buffer.clear();
+        buffer.put(asByteArray(), 0, 2);
+        buffer.flip();
+        return buffer.asShortBuffer().get();
     }
 
     @Override
     public Integer asInteger() {
-        fourBytes.put(getValue().toByteArray(), 0, fourBytes.capacity());
-        return fourBytes.asIntBuffer().get();
+        buffer.clear();
+        buffer.put(asByteArray(), 0, 4);
+        buffer.flip();
+        return buffer.asIntBuffer().get();
     }
 
     @Override
     public Long asLong() {
-        final ByteBuffer buffer = ByteBuffer.allocate(32);
-        buffer.put(getValue().toByteArray());
+        buffer.clear();
+        buffer.put(asByteArray(), 0, 8);
+        buffer.flip();
         return buffer.asLongBuffer().get();
     }
 
     @Override
     public Float asFloat() {
-        final ByteBuffer buffer = ByteBuffer.allocate(32);
-        buffer.put(getValue().toByteArray());
+        buffer.clear();
+        buffer.put(asByteArray(), 0, 4);
+        buffer.flip();
         return buffer.asFloatBuffer().get();
     }
 
     @Override
     public Double asDouble() {
-        final ByteBuffer buffer = ByteBuffer.allocate(32);
-        buffer.put(getValue().toByteArray());
+        buffer.clear();
+        buffer.put(asByteArray(), 0, 8);
+        buffer.flip();
         return buffer.asDoubleBuffer().get();
     }
 
     @Override
     public Boolean asBoolean() {
-        final ByteBuffer buffer = ByteBuffer.allocate(32);
-        buffer.put(getValue().toByteArray());
+        buffer.clear();
+        buffer.put(asByteArray(), 0, 2);
+        buffer.flip();
         return buffer.asShortBuffer().get() == 1;
     }
 
     @Override
-    public BigDecimal asDecimal() {
+    public BigDecimal asBigDecimal() {
         return new BigDecimal(asString());
     }
 
     @Override
     public Date asDate() {
-        return DateFormatUtil.stringToDate(asString());
+        return new Date(asLong());
     }
 
     @Override
@@ -90,6 +94,6 @@ public class BinlogColumnValue extends AbstractColumnValue<ByteString> {
 
     @Override
     public Time asTime() {
-        return Time.valueOf(asString());
+        return new Time(asLong());
     }
 }

+ 118 - 22
dbsyncer-storage/src/main/test/BinlogMessageFieldTypeTest.java

@@ -1,10 +1,17 @@
+import com.google.protobuf.ByteString;
+import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
+import org.dbsyncer.storage.binlog.impl.BinlogColumnValue;
+import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
 import java.sql.Timestamp;
 import java.time.LocalDateTime;
+import java.util.Queue;
 
 /**
  * @author AE86
@@ -15,29 +22,118 @@ public class BinlogMessageFieldTypeTest {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    private BinlogColumnValue value = new BinlogColumnValue();
+
+    private MessageTest messageTest = new MessageTest();
+
+    @Test
+    public void testMessageNumber() {
+        // short
+        short s = 32767;
+        logger.info("short1:{}", s);
+        ByteString shortBytes = messageTest.serializeValue(s);
+        logger.info("bytes:{}", shortBytes.toByteArray());
+        value.setValue(shortBytes);
+        short s2 = value.asShort();
+        logger.info("short2:{}", s2);
+
+        // int
+        int i = 1999999999;
+        logger.info("int1:{}", i);
+        ByteString intBytes = messageTest.serializeValue(i);
+        logger.info("bytes:{}", intBytes.toByteArray());
+        value.setValue(intBytes);
+        int i2 = value.asInteger();
+        logger.info("int2:{}", i2);
+
+        // long
+        long l = 8999999999999999999L;
+        logger.info("long1:{}", l);
+        ByteString longBytes = messageTest.serializeValue(l);
+        logger.info("bytes:{}", longBytes.toByteArray());
+        value.setValue(longBytes);
+        long l2 = value.asLong();
+        logger.info("long2:{}", l2);
+
+        // float
+        float f = 99999999999999999999999999999999999.99999999999999999999999999999999999f;
+        logger.info("float1:{}", f);
+        ByteString floatBytes = messageTest.serializeValue(f);
+        logger.info("bytes:{}", floatBytes.toByteArray());
+        value.setValue(floatBytes);
+        float f2 = value.asFloat();
+        logger.info("float2:{}", f2);
+
+        // double
+        double d = 999999.9999999999999999999999999d;
+        logger.info("double1:{}", d);
+        ByteString doubleBytes = messageTest.serializeValue(d);
+        logger.info("bytes:{}", doubleBytes.toByteArray());
+        value.setValue(doubleBytes);
+        double d2 = value.asDouble();
+        logger.info("double2:{}", d2);
+
+        // double
+        BigDecimal b = new BigDecimal(8888888.888888888888888f);
+        logger.info("bigDecimal1:{}", b);
+        ByteString bigDecimalBytes = messageTest.serializeValue(b);
+        logger.info("bytes:{}", bigDecimalBytes.toByteArray());
+        value.setValue(bigDecimalBytes);
+        BigDecimal b2 = value.asBigDecimal();
+        logger.info("bigDecimal2:{}", b2);
+    }
+
     @Test
-    public void testMessageFieldType() {
-        final ByteBuffer buffer = ByteBuffer.allocate(32);
+    public void testMessageDate() {
+        // timestamp
         Timestamp timestamp = Timestamp.valueOf(LocalDateTime.now());
-        ByteBuffer byteBuffer = buffer.putLong(timestamp.getTime());
-        byteBuffer.flip();
-        byte[] bytes = new byte[8];
-        byteBuffer.get(bytes);
-        logger.info("remaining:{}, position:{}, limit:{}, arrayOffset:{}, bytes:{}", byteBuffer.remaining(), byteBuffer.position(), byteBuffer.limit(), byteBuffer.arrayOffset(), bytes);
-
-        byteBuffer.clear();
-        timestamp = Timestamp.valueOf(LocalDateTime.now());
-        byteBuffer = buffer.putLong(timestamp.getTime());
-        byteBuffer.flip();
-        byteBuffer.get(bytes);
-        logger.info("remaining:{}, position:{}, limit:{}, arrayOffset:{}, bytes:{}", byteBuffer.remaining(), byteBuffer.position(), byteBuffer.limit(), byteBuffer.arrayOffset(), bytes);
-
-        byteBuffer.clear();
-        timestamp = Timestamp.valueOf(LocalDateTime.now());
-        byteBuffer = buffer.putLong(timestamp.getTime());
-        byteBuffer.flip();
-        byteBuffer.get(bytes);
-        logger.info("remaining:{}, position:{}, limit:{}, arrayOffset:{}, bytes:{}", byteBuffer.remaining(), byteBuffer.position(), byteBuffer.limit(), byteBuffer.arrayOffset(), bytes);
+        logger.info("timestamp1:{}, l:{}", timestamp, timestamp.getTime());
+        ByteString timestampBytes = messageTest.serializeValue(timestamp);
+        logger.info("bytes:{}", timestampBytes.toByteArray());
+        value.setValue(timestampBytes);
+        Timestamp timestamp2 = value.asTimestamp();
+        logger.info("timestamp2:{}, l:{}", timestamp2, timestamp2.getTime());
+
+        // date
+        Date date = new Date(timestamp.getTime());
+        logger.info("date1:{}, l:{}", date, date.getTime());
+        ByteString dateBytes = messageTest.serializeValue(date);
+        logger.info("bytes:{}", dateBytes.toByteArray());
+        value.setValue(dateBytes);
+        Date date2 = value.asDate();
+        logger.info("date2:{}, l:{}", date2, date2.getTime());
+
+        // time
+        Time time = new Time(timestamp.getTime());
+        logger.info("time1:{}, l:{}", time, time.getTime());
+        ByteString timeBytes = messageTest.serializeValue(time);
+        logger.info("bytes:{}", timeBytes.toByteArray());
+        value.setValue(timeBytes);
+        Time time2 = value.asTime();
+        logger.info("time2:{}, l:{}", time2, time2.getTime());
+    }
+
+    final class MessageTest extends AbstractBinlogRecorder {
+
+        @Override
+        protected Queue getQueue() {
+            return null;
+        }
+
+        @Override
+        protected Object deserialize(BinlogMessage message) {
+            return null;
+        }
+
+        @Override
+        protected Object resolveValue(int type, ByteString v) {
+            return super.resolveValue(type, v);
+        }
+
+        @Override
+        protected ByteString serializeValue(Object v) {
+            return super.serializeValue(v);
+        }
     }
 
 }