Ver Fonte

修复binlog序列化

Signed-off-by: AE86 <836391306@qq.com>
AE86 há 2 anos atrás
pai
commit
b090113731

+ 9 - 5
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -86,6 +86,13 @@ public class DataSyncServiceImpl implements DataSyncService {
     @Override
     @Override
     public Map getBinlogData(Map row, boolean prettyBytes) throws InvalidProtocolBufferException {
     public Map getBinlogData(Map row, boolean prettyBytes) throws InvalidProtocolBufferException {
         String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
         String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
+        // 1、获取配置信息
+        final TableGroup tableGroup = cacheService.get(tableGroupId, TableGroup.class);
+        if (tableGroup == null) {
+            return Collections.EMPTY_MAP;
+        }
+
+        // 2、获取记录的数据
         byte[] bytes = (byte[]) row.get(ConfigConstant.BINLOG_DATA);
         byte[] bytes = (byte[]) row.get(ConfigConstant.BINLOG_DATA);
         if (null == bytes) {
         if (null == bytes) {
             if (prettyBytes) {
             if (prettyBytes) {
@@ -94,15 +101,12 @@ public class DataSyncServiceImpl implements DataSyncService {
             }
             }
             return Collections.EMPTY_MAP;
             return Collections.EMPTY_MAP;
         }
         }
-        BinlogMap message = BinlogMap.parseFrom(bytes);
 
 
-        // 1、获取配置信息
-        final TableGroup tableGroup = cacheService.get(tableGroupId, TableGroup.class);
-
-        // 2、反序列数据
+        // 3、反序列
         Map<String, Object> map = new HashMap<>();
         Map<String, Object> map = new HashMap<>();
         final Picker picker = new Picker(tableGroup.getFieldMapping());
         final Picker picker = new Picker(tableGroup.getFieldMapping());
         final Map<String, Field> fieldMap = picker.getSourceFieldMap();
         final Map<String, Field> fieldMap = picker.getSourceFieldMap();
+        BinlogMap message = BinlogMap.parseFrom(bytes);
         message.getRowMap().forEach((k, v) -> {
         message.getRowMap().forEach((k, v) -> {
             if (fieldMap.containsKey(k)) {
             if (fieldMap.containsKey(k)) {
                 Object val = BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v);
                 Object val = BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v);

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java

@@ -136,7 +136,7 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
                 dataVo.setJson(JsonUtil.objToJson(binlogData));
                 dataVo.setJson(JsonUtil.objToJson(binlogData));
                 list.add(dataVo);
                 list.add(dataVo);
             } catch (Exception e) {
             } catch (Exception e) {
-                logger.error(e.getLocalizedMessage());
+                logger.error(e.getLocalizedMessage(), e);
             }
             }
         }
         }
         paging.setData(list);
         paging.setData(list);

+ 1 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/AbstractMessageDecoder.java

@@ -19,8 +19,6 @@ public abstract class AbstractMessageDecoder implements MessageDecoder {
 
 
     protected DatabaseConfig config;
     protected DatabaseConfig config;
 
 
-    private static final PgColumnValue value = new PgColumnValue();
-
     @Override
     @Override
     public boolean skipMessage(ByteBuffer buffer, LogSequenceNumber startLsn, LogSequenceNumber lastReceiveLsn) {
     public boolean skipMessage(ByteBuffer buffer, LogSequenceNumber startLsn, LogSequenceNumber lastReceiveLsn) {
         if (null == lastReceiveLsn || lastReceiveLsn.asLong() == 0 || startLsn.equals(lastReceiveLsn)) {
         if (null == lastReceiveLsn || lastReceiveLsn.asLong() == 0 || startLsn.equals(lastReceiveLsn)) {
@@ -71,8 +69,7 @@ public abstract class AbstractMessageDecoder implements MessageDecoder {
      * @return
      * @return
      */
      */
     protected Object resolveValue(String typeName, String columnValue) {
     protected Object resolveValue(String typeName, String columnValue) {
-        value.setValue(columnValue);
-
+        PgColumnValue value = new PgColumnValue(columnValue);
         if (value.isNull()) {
         if (value.isNull()) {
             // nulls are null
             // nulls are null
             return null;
             return null;

+ 4 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/PgColumnValue.java

@@ -23,6 +23,10 @@ public final class PgColumnValue extends AbstractColumnValue<String> {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
+    public PgColumnValue(String value) {
+        setValue(value);
+    }
+
     @Override
     @Override
     public String asString() {
     public String asString() {
         return getValue();
         return getValue();

+ 17 - 14
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogColumnValue.java

@@ -2,6 +2,7 @@ package org.dbsyncer.storage.binlog;
 
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString;
 import org.dbsyncer.common.column.AbstractColumnValue;
 import org.dbsyncer.common.column.AbstractColumnValue;
+import org.dbsyncer.storage.enums.BinlogByteEnum;
 
 
 import java.math.BigDecimal;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
@@ -16,7 +17,9 @@ import java.sql.Timestamp;
  */
  */
 public class BinlogColumnValue extends AbstractColumnValue<ByteString> {
 public class BinlogColumnValue extends AbstractColumnValue<ByteString> {
 
 
-    private final ByteBuffer buffer = ByteBuffer.allocate(8);
+    public BinlogColumnValue(ByteString v) {
+        setValue(v);
+    }
 
 
     @Override
     @Override
     public String asString() {
     public String asString() {
@@ -30,8 +33,8 @@ public class BinlogColumnValue extends AbstractColumnValue<ByteString> {
 
 
     @Override
     @Override
     public Short asShort() {
     public Short asShort() {
-        buffer.clear();
-        buffer.put(asByteArray(), 0, 2);
+        ByteBuffer buffer = ByteBuffer.allocate(BinlogByteEnum.SHORT.getByteLength());
+        buffer.put(asByteArray(), 0, buffer.capacity());
         buffer.flip();
         buffer.flip();
         return buffer.asShortBuffer().get();
         return buffer.asShortBuffer().get();
     }
     }
@@ -39,45 +42,45 @@ public class BinlogColumnValue extends AbstractColumnValue<ByteString> {
     @Override
     @Override
     public Integer asInteger() {
     public Integer asInteger() {
         byte[] bytes = asByteArray();
         byte[] bytes = asByteArray();
-        if (bytes.length == 2) {
+        if (bytes.length == BinlogByteEnum.SHORT.getByteLength()) {
             Short aShort = asShort();
             Short aShort = asShort();
             return new Integer(aShort);
             return new Integer(aShort);
         }
         }
 
 
-        buffer.clear();
-        buffer.put(bytes, 0, 4);
+        ByteBuffer buffer = ByteBuffer.allocate(BinlogByteEnum.INTEGER.getByteLength());
+        buffer.put(bytes, 0, buffer.capacity());
         buffer.flip();
         buffer.flip();
         return buffer.asIntBuffer().get();
         return buffer.asIntBuffer().get();
     }
     }
 
 
     @Override
     @Override
     public Long asLong() {
     public Long asLong() {
-        buffer.clear();
-        buffer.put(asByteArray(), 0, 8);
+        ByteBuffer buffer = ByteBuffer.allocate(BinlogByteEnum.LONG.getByteLength());
+        buffer.put(asByteArray(), 0, buffer.capacity());
         buffer.flip();
         buffer.flip();
         return buffer.asLongBuffer().get();
         return buffer.asLongBuffer().get();
     }
     }
 
 
     @Override
     @Override
     public Float asFloat() {
     public Float asFloat() {
-        buffer.clear();
-        buffer.put(asByteArray(), 0, 4);
+        ByteBuffer buffer = ByteBuffer.allocate(BinlogByteEnum.FLOAT.getByteLength());
+        buffer.put(asByteArray(), 0, buffer.capacity());
         buffer.flip();
         buffer.flip();
         return buffer.asFloatBuffer().get();
         return buffer.asFloatBuffer().get();
     }
     }
 
 
     @Override
     @Override
     public Double asDouble() {
     public Double asDouble() {
-        buffer.clear();
-        buffer.put(asByteArray(), 0, 8);
+        ByteBuffer buffer = ByteBuffer.allocate(BinlogByteEnum.DOUBLE.getByteLength());
+        buffer.put(asByteArray(), 0, buffer.capacity());
         buffer.flip();
         buffer.flip();
         return buffer.asDoubleBuffer().get();
         return buffer.asDoubleBuffer().get();
     }
     }
 
 
     @Override
     @Override
     public Boolean asBoolean() {
     public Boolean asBoolean() {
-        buffer.clear();
-        buffer.put(asByteArray(), 0, 2);
+        ByteBuffer buffer = ByteBuffer.allocate(BinlogByteEnum.SHORT.getByteLength());
+        buffer.put(asByteArray(), 0, buffer.capacity());
         buffer.flip();
         buffer.flip();
         return buffer.asShortBuffer().get() == 1;
         return buffer.asShortBuffer().get() == 1;
     }
     }

+ 56 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/BinlogByteEnum.java

@@ -0,0 +1,56 @@
+package org.dbsyncer.storage.enums;
+
+/**
+ * 支持的序列化/反序列化字节码类型
+ *
+ * <pre>
+ * 类型     长度     大小      最小值     最大值
+ * byte     1Byte    8-bit     -128       +127
+ * short    2Byte    16-bit    -2^15      +2^15-1
+ * int      4Byte    32-bit    -2^31      +2^31-1
+ * long     8Byte    64-bit    -2^63      +2^63-1
+ * float    4Byte    32-bit    IEEE754    IEEE754
+ * double   8Byte    64-bit    IEEE754    IEEE754
+ * char     2Byte    16-bit    Unicode 0  Unicode 2^16-1
+ * boolean  8Byte    64-bit
+ * </pre>
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2023/4/21 22:07
+ */
+public enum BinlogByteEnum {
+
+    /**
+     * 8Byte
+     */
+    LONG(8),
+    /**
+     * 8Byte
+     */
+    DOUBLE(8),
+    /**
+     * 4Byte
+     */
+    INTEGER(4),
+    /**
+     * 4Byte
+     */
+    FLOAT(4),
+    /**
+     * 2Byte
+     */
+    SHORT(2),
+    ;
+
+    BinlogByteEnum(int byteLength) {
+        this.byteLength = byteLength;
+    }
+
+    int byteLength;
+
+    public int getByteLength() {
+        return byteLength;
+    }
+
+}

+ 50 - 61
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/BinlogMessageUtil.java

@@ -9,6 +9,7 @@ import org.apache.commons.io.IOUtils;
 import org.dbsyncer.storage.StorageException;
 import org.dbsyncer.storage.StorageException;
 import org.dbsyncer.storage.binlog.BinlogColumnValue;
 import org.dbsyncer.storage.binlog.BinlogColumnValue;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
+import org.dbsyncer.storage.enums.BinlogByteEnum;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
@@ -56,10 +57,6 @@ public abstract class BinlogMessageUtil {
 
 
     private static final Logger logger = LoggerFactory.getLogger(BinlogMessageUtil.class);
     private static final Logger logger = LoggerFactory.getLogger(BinlogMessageUtil.class);
 
 
-    private static final ByteBuffer buffer = ByteBuffer.allocate(8);
-
-    private static final BinlogColumnValue value = new BinlogColumnValue();
-
     public static BinlogMap toBinlogMap(Map<String, Object> data) {
     public static BinlogMap toBinlogMap(Map<String, Object> data) {
         BinlogMap.Builder dataBuilder = BinlogMap.newBuilder();
         BinlogMap.Builder dataBuilder = BinlogMap.newBuilder();
         data.forEach((k, v) -> {
         data.forEach((k, v) -> {
@@ -86,54 +83,40 @@ public abstract class BinlogMessageUtil {
 
 
             // 时间
             // 时间
             case "java.sql.Timestamp":
             case "java.sql.Timestamp":
-                buffer.clear();
-                Timestamp timestamp = (Timestamp) v;
-                buffer.putLong(timestamp.getTime());
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 8);
+                return allocateByteBufferToByteString(BinlogByteEnum.LONG, buffer -> {
+                    Timestamp timestamp = (Timestamp) v;
+                    buffer.putLong(timestamp.getTime());
+                });
             case "java.sql.Date":
             case "java.sql.Date":
-                buffer.clear();
-                Date date = (Date) v;
-                buffer.putLong(date.getTime());
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 8);
+                return allocateByteBufferToByteString(BinlogByteEnum.LONG, buffer -> {
+                    Date date = (Date) v;
+                    buffer.putLong(date.getTime());
+                });
+            case "java.util.Date":
+                return allocateByteBufferToByteString(BinlogByteEnum.LONG, buffer -> {
+                    java.util.Date uDate = (java.util.Date) v;
+                    buffer.putLong(uDate.getTime());
+                });
             case "java.sql.Time":
             case "java.sql.Time":
-                buffer.clear();
-                Time time = (Time) v;
-                buffer.putLong(time.getTime());
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 8);
+                return allocateByteBufferToByteString(BinlogByteEnum.LONG, buffer -> {
+                    Time time = (Time) v;
+                    buffer.putLong(time.getTime());
+                });
 
 
             // 数字
             // 数字
             case "java.lang.Integer":
             case "java.lang.Integer":
-                buffer.clear();
-                buffer.putInt((Integer) v);
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 4);
+                return allocateByteBufferToByteString(BinlogByteEnum.INTEGER, buffer -> buffer.putInt((Integer) v));
             case "java.math.BigInteger":
             case "java.math.BigInteger":
                 BigInteger bigInteger = (BigInteger) v;
                 BigInteger bigInteger = (BigInteger) v;
-                byte[] bytes = bigInteger.toByteArray();
-                return ByteString.copyFrom(bytes, 0, 8);
+                return ByteString.copyFrom(bigInteger.toByteArray());
             case "java.lang.Long":
             case "java.lang.Long":
-                buffer.clear();
-                buffer.putLong((Long) v);
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 8);
+                return allocateByteBufferToByteString(BinlogByteEnum.LONG, buffer -> buffer.putLong((Long) v));
             case "java.lang.Short":
             case "java.lang.Short":
-                buffer.clear();
-                buffer.putShort((Short) v);
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 2);
+                return allocateByteBufferToByteString(BinlogByteEnum.SHORT, buffer -> buffer.putShort((Short) v));
             case "java.lang.Float":
             case "java.lang.Float":
-                buffer.clear();
-                buffer.putFloat((Float) v);
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 4);
+                return allocateByteBufferToByteString(BinlogByteEnum.FLOAT, buffer -> buffer.putFloat((Float) v));
             case "java.lang.Double":
             case "java.lang.Double":
-                buffer.clear();
-                buffer.putDouble((Double) v);
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 8);
+                return allocateByteBufferToByteString(BinlogByteEnum.DOUBLE, buffer -> buffer.putDouble((Double) v));
             case "java.math.BigDecimal":
             case "java.math.BigDecimal":
                 BigDecimal bigDecimal = (BigDecimal) v;
                 BigDecimal bigDecimal = (BigDecimal) v;
                 return ByteString.copyFromUtf8(bigDecimal.toString());
                 return ByteString.copyFromUtf8(bigDecimal.toString());
@@ -143,26 +126,21 @@ public abstract class BinlogMessageUtil {
 
 
             // 布尔(1为true;0为false)
             // 布尔(1为true;0为false)
             case "java.lang.Boolean":
             case "java.lang.Boolean":
-                buffer.clear();
-                Boolean b = (Boolean) v;
-                buffer.putShort((short) (b ? 1 : 0));
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 2);
+                return allocateByteBufferToByteString(BinlogByteEnum.SHORT, buffer -> {
+                    Boolean b = (Boolean) v;
+                    buffer.putShort((short) (b ? 1 : 0));
+                });
             case "java.time.LocalDateTime":
             case "java.time.LocalDateTime":
-                buffer.clear();
-                buffer.putLong(Timestamp.valueOf((LocalDateTime) v).getTime());
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 8);
+                return allocateByteBufferToByteString(BinlogByteEnum.LONG, buffer -> buffer.putLong(Timestamp.valueOf((LocalDateTime) v).getTime()));
             case "oracle.sql.TIMESTAMP":
             case "oracle.sql.TIMESTAMP":
-                buffer.clear();
-                TIMESTAMP timeStamp = (TIMESTAMP) v;
-                try {
-                    buffer.putLong(timeStamp.timestampValue().getTime());
-                } catch (SQLException e) {
-                    logger.error(e.getMessage());
-                }
-                buffer.flip();
-                return ByteString.copyFrom(buffer, 8);
+                return allocateByteBufferToByteString(BinlogByteEnum.LONG, buffer -> {
+                    TIMESTAMP timeStamp = (TIMESTAMP) v;
+                    try {
+                        buffer.putLong(timeStamp.timestampValue().getTime());
+                    } catch (SQLException e) {
+                        logger.error(e.getMessage());
+                    }
+                });
             case "oracle.sql.BLOB":
             case "oracle.sql.BLOB":
                 return ByteString.copyFrom(getBytes((BLOB) v));
                 return ByteString.copyFrom(getBytes((BLOB) v));
             case "oracle.sql.CLOB":
             case "oracle.sql.CLOB":
@@ -176,8 +154,7 @@ public abstract class BinlogMessageUtil {
     }
     }
 
 
     public static Object deserializeValue(int type, ByteString v) {
     public static Object deserializeValue(int type, ByteString v) {
-        value.setValue(v);
-
+        BinlogColumnValue value = new BinlogColumnValue(v);
         if (value.isNull()) {
         if (value.isNull()) {
             return null;
             return null;
         }
         }
@@ -240,6 +217,13 @@ public abstract class BinlogMessageUtil {
         }
         }
     }
     }
 
 
+    private static ByteString allocateByteBufferToByteString(BinlogByteEnum byteType, ByteStringMapper mapper) {
+        ByteBuffer buffer = ByteBuffer.allocate(byteType.getByteLength());
+        mapper.apply(buffer);
+        buffer.flip();
+        return ByteString.copyFrom(buffer, byteType.getByteLength());
+    }
+
     private static byte[] getBytes(BLOB blob) {
     private static byte[] getBytes(BLOB blob) {
         InputStream is = null;
         InputStream is = null;
         byte[] b = null;
         byte[] b = null;
@@ -278,4 +262,9 @@ public abstract class BinlogMessageUtil {
             throw new StorageException(e);
             throw new StorageException(e);
         }
         }
     }
     }
+
+    interface ByteStringMapper {
+        void apply(ByteBuffer buffer);
+    }
+
 }
 }