浏览代码

resolve value for binlog

AE86 2 年之前
父节点
当前提交
d2fbe46863

+ 20 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/column/AbstractColumnValue.java

@@ -0,0 +1,20 @@
+package org.dbsyncer.common.column;
+
+public abstract class AbstractColumnValue<T> implements ColumnValue {
+
+    protected Object value;
+
+    protected T getValue() {
+        return (T) value;
+    }
+
+    public void setValue(T value) {
+        this.value = value;
+    }
+
+    @Override
+    public boolean isNull() {
+        return value == null;
+    }
+
+}

+ 40 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/column/ColumnValue.java

@@ -0,0 +1,40 @@
+package org.dbsyncer.common.column;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/4/22 22:39
+ */
+public interface ColumnValue {
+
+    boolean isNull();
+
+    String asString();
+
+    byte[] asByteArray();
+
+    Short asShort();
+
+    Integer asInteger();
+
+    Long asLong();
+
+    Float asFloat();
+
+    Double asDouble();
+
+    Boolean asBoolean();
+
+    BigDecimal asDecimal();
+
+    Date asDate();
+
+    Timestamp asTimestamp();
+
+    Time asTime();
+}

+ 2 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/AbstractMessageDecoder.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.listener.postgresql;
 
 import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.listener.postgresql.column.ColumnValue;
 import org.dbsyncer.listener.postgresql.column.PgColumnValue;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
 import org.postgresql.replication.LogSequenceNumber;
@@ -20,7 +19,7 @@ public abstract class AbstractMessageDecoder implements MessageDecoder {
 
     protected DatabaseConfig config;
 
-    private ColumnValue value = new PgColumnValue();
+    private static final PgColumnValue value = new PgColumnValue();
 
     @Override
     public boolean skipMessage(ByteBuffer buffer, LogSequenceNumber startLsn, LogSequenceNumber lastReceiveLsn) {
@@ -65,7 +64,7 @@ public abstract class AbstractMessageDecoder implements MessageDecoder {
     }
 
     /**
-     * Resolve the value of a {@link ColumnValue}.
+     * Resolve value
      *
      * @param typeName
      * @param columnValue

+ 0 - 182
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/AbstractColumnValue.java

@@ -1,182 +0,0 @@
-package org.dbsyncer.listener.postgresql.column;
-
-import org.dbsyncer.common.util.DateFormatUtil;
-import org.dbsyncer.listener.ListenerException;
-import org.postgresql.PGStatement;
-import org.postgresql.geometric.*;
-import org.postgresql.util.PGInterval;
-import org.postgresql.util.PGmoney;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.SQLException;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.OffsetDateTime;
-import java.time.OffsetTime;
-import java.time.ZoneOffset;
-import java.util.concurrent.TimeUnit;
-
-public abstract class AbstractColumnValue implements ColumnValue {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    @Override
-    public Date asDate() {
-        return DateFormatUtil.stringToDate(asString());
-    }
-
-    @Override
-    public Object asTime() {
-        return asString();
-    }
-
-    @Override
-    public Object asLocalTime() {
-        return DateFormatUtil.stringToLocalTime(asString());
-    }
-
-    @Override
-    public OffsetTime asOffsetTimeUtc() {
-        return DateFormatUtil.timeWithTimeZone(asString());
-    }
-
-    @Override
-    public OffsetDateTime asOffsetDateTimeAtUtc() {
-        if ("infinity".equals(asString())) {
-            return OffsetDateTime.ofInstant(toInstantFromMillis(PGStatement.DATE_POSITIVE_INFINITY), ZoneOffset.UTC);
-        } else if ("-infinity".equals(asString())) {
-            return OffsetDateTime.ofInstant(toInstantFromMillis(PGStatement.DATE_NEGATIVE_INFINITY), ZoneOffset.UTC);
-        }
-        return DateFormatUtil.timestampWithTimeZoneToOffsetDateTime(asString());
-    }
-
-    @Override
-    public Timestamp asTimestamp() {
-        if ("infinity".equals(asString())) {
-            return Timestamp.from(toInstantFromMicros(PGStatement.DATE_POSITIVE_INFINITY));
-        } else if ("-infinity".equals(asString())) {
-            return Timestamp.from(toInstantFromMicros(PGStatement.DATE_NEGATIVE_INFINITY));
-        }
-        return DateFormatUtil.stringToTimestamp(asString());
-    }
-
-    @Override
-    public PGbox asBox() {
-        try {
-            return new PGbox(asString());
-        } catch (final SQLException e) {
-            logger.error("Failed to parse point {}, {}", asString(), e);
-            throw new ListenerException(e);
-        }
-    }
-
-    @Override
-    public PGcircle asCircle() {
-        try {
-            return new PGcircle(asString());
-        } catch (final SQLException e) {
-            logger.error("Failed to parse circle {}, {}", asString(), e);
-            throw new ListenerException(e);
-        }
-    }
-
-    @Override
-    public Object asInterval() {
-        try {
-            return new PGInterval(asString());
-        } catch (final SQLException e) {
-            logger.error("Failed to parse point {}, {}", asString(), e);
-            throw new ListenerException(e);
-        }
-    }
-
-    @Override
-    public PGline asLine() {
-        try {
-            return new PGline(asString());
-        } catch (final SQLException e) {
-            logger.error("Failed to parse point {}, {}", asString(), e);
-            throw new ListenerException(e);
-        }
-    }
-
-    @Override
-    public PGlseg asLseg() {
-        try {
-            return new PGlseg(asString());
-        } catch (final SQLException e) {
-            logger.error("Failed to parse point {}, {}", asString(), e);
-            throw new ListenerException(e);
-        }
-    }
-
-    @Override
-    public PGmoney asMoney() {
-        try {
-            final String value = asString();
-            if (value != null && value.startsWith("-")) {
-                final String negativeMoney = "(" + value.substring(1) + ")";
-                return new PGmoney(negativeMoney);
-            }
-            return new PGmoney(asString());
-        } catch (final SQLException e) {
-            logger.error("Failed to parse money {}, {}", asString(), e);
-            throw new ListenerException(e);
-        }
-    }
-
-    @Override
-    public PGpath asPath() {
-        try {
-            return new PGpath(asString());
-        } catch (final SQLException e) {
-            logger.error("Failed to parse point {}, {}", asString(), e);
-            throw new ListenerException(e);
-        }
-    }
-
-    @Override
-    public PGpoint asPoint() {
-        try {
-            return new PGpoint(asString());
-        } catch (final SQLException e) {
-            logger.error("Failed to parse point {}, {}", asString(), e);
-            throw new ListenerException(e);
-        }
-    }
-
-    @Override
-    public PGpolygon asPolygon() {
-        try {
-            return new PGpolygon(asString());
-        } catch (final SQLException e) {
-            logger.error("Failed to parse point {}, {}", asString(), e);
-            throw new ListenerException(e);
-        }
-    }
-
-    @Override
-    public boolean isArray() {
-        return false;
-    }
-
-    @Override
-    public Object asArray() {
-        return null;
-    }
-
-    private Instant toInstantFromMicros(long microsSinceEpoch) {
-        return Instant.ofEpochSecond(
-                TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch),
-                TimeUnit.MICROSECONDS.toNanos(microsSinceEpoch % TimeUnit.SECONDS.toMicros(1)));
-    }
-
-    private Instant toInstantFromMillis(long millisecondSinceEpoch) {
-        return Instant.ofEpochSecond(
-                TimeUnit.MILLISECONDS.toSeconds(millisecondSinceEpoch),
-                TimeUnit.MILLISECONDS.toNanos(millisecondSinceEpoch % TimeUnit.SECONDS.toMillis(1)));
-    }
-
-}

+ 0 - 73
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/ColumnValue.java

@@ -1,73 +0,0 @@
-package org.dbsyncer.listener.postgresql.column;
-
-import org.postgresql.geometric.*;
-import org.postgresql.util.PGmoney;
-
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.time.OffsetDateTime;
-import java.time.OffsetTime;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/4/22 22:39
- * @see org.postgresql.jdbc.TypeInfoCache
- */
-public interface ColumnValue {
-
-    void setValue(String value);
-
-    boolean isNull();
-
-    String asString();
-
-    Boolean asBoolean();
-
-    Integer asInteger();
-
-    Long asLong();
-
-    Float asFloat();
-
-    Double asDouble();
-
-    Object asDecimal();
-
-    Date asDate();
-
-    OffsetDateTime asOffsetDateTimeAtUtc();
-
-    Timestamp asTimestamp();
-
-    Object asTime();
-
-    Object asLocalTime();
-
-    OffsetTime asOffsetTimeUtc();
-
-    byte[] asByteArray();
-
-    PGbox asBox();
-
-    PGcircle asCircle();
-
-    Object asInterval();
-
-    PGline asLine();
-
-    Object asLseg();
-
-    PGmoney asMoney();
-
-    PGpath asPath();
-
-    PGpoint asPoint();
-
-    PGpolygon asPolygon();
-
-    boolean isArray();
-
-    Object asArray();
-
-}

+ 167 - 20
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/PgColumnValue.java

@@ -1,59 +1,206 @@
 package org.dbsyncer.listener.postgresql.column;
 
+import org.dbsyncer.common.column.AbstractColumnValue;
+import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.listener.ListenerException;
+import org.postgresql.PGStatement;
+import org.postgresql.geometric.*;
+import org.postgresql.util.PGInterval;
+import org.postgresql.util.PGmoney;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.*;
+import java.util.concurrent.TimeUnit;
 
-public final class PgColumnValue extends AbstractColumnValue {
+public final class PgColumnValue extends AbstractColumnValue<String> {
 
-    private String value;
-
-    public void setValue(String value) {
-        this.value = value;
-    }
+    private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Override
-    public boolean isNull() {
-        return value == null;
+    public String asString() {
+        return getValue();
     }
 
     @Override
-    public String asString() {
-        return value;
+    public byte[] asByteArray() {
+        return StringUtil.hexStringToByteArray(getValue().substring(2));
     }
 
     @Override
-    public Boolean asBoolean() {
-        return "t".equalsIgnoreCase(value);
+    public Short asShort() {
+        return Short.valueOf(getValue());
     }
 
     @Override
     public Integer asInteger() {
-        return Integer.valueOf(value);
+        return Integer.valueOf(getValue());
     }
 
     @Override
     public Long asLong() {
-        return Long.valueOf(value);
+        return Long.valueOf(getValue());
     }
 
     @Override
     public Float asFloat() {
-        return Float.valueOf(value);
+        return Float.valueOf(getValue());
     }
 
     @Override
     public Double asDouble() {
-        return Double.valueOf(value);
+        return Double.valueOf(getValue());
     }
 
     @Override
-    public Object asDecimal() {
-        return new BigDecimal(value);
+    public Boolean asBoolean() {
+        return "t".equalsIgnoreCase(getValue());
     }
 
     @Override
-    public byte[] asByteArray() {
-        return StringUtil.hexStringToByteArray(value.substring(2));
+    public BigDecimal asDecimal() {
+        return new BigDecimal(getValue());
+    }
+
+    @Override
+    public Date asDate() {
+        return DateFormatUtil.stringToDate(asString());
+    }
+
+    @Override
+    public Timestamp asTimestamp() {
+        if ("infinity".equals(asString())) {
+            return Timestamp.from(toInstantFromMicros(PGStatement.DATE_POSITIVE_INFINITY));
+        } else if ("-infinity".equals(asString())) {
+            return Timestamp.from(toInstantFromMicros(PGStatement.DATE_NEGATIVE_INFINITY));
+        }
+        return DateFormatUtil.stringToTimestamp(asString());
+    }
+
+    @Override
+    public Time asTime() {
+        return Time.valueOf(getValue());
+    }
+
+    public LocalTime asLocalTime() {
+        return DateFormatUtil.stringToLocalTime(asString());
+    }
+
+    public OffsetTime asOffsetTimeUtc() {
+        return DateFormatUtil.timeWithTimeZone(asString());
+    }
+
+    public OffsetDateTime asOffsetDateTimeAtUtc() {
+        if ("infinity".equals(asString())) {
+            return OffsetDateTime.ofInstant(toInstantFromMillis(PGStatement.DATE_POSITIVE_INFINITY), ZoneOffset.UTC);
+        } else if ("-infinity".equals(asString())) {
+            return OffsetDateTime.ofInstant(toInstantFromMillis(PGStatement.DATE_NEGATIVE_INFINITY), ZoneOffset.UTC);
+        }
+        return DateFormatUtil.timestampWithTimeZoneToOffsetDateTime(asString());
     }
+
+    public PGbox asBox() {
+        try {
+            return new PGbox(asString());
+        } catch (final SQLException e) {
+            logger.error("Failed to parse point {}, {}", asString(), e);
+            throw new ListenerException(e);
+        }
+    }
+
+    public PGcircle asCircle() {
+        try {
+            return new PGcircle(asString());
+        } catch (final SQLException e) {
+            logger.error("Failed to parse circle {}, {}", asString(), e);
+            throw new ListenerException(e);
+        }
+    }
+
+    public Object asInterval() {
+        try {
+            return new PGInterval(asString());
+        } catch (final SQLException e) {
+            logger.error("Failed to parse point {}, {}", asString(), e);
+            throw new ListenerException(e);
+        }
+    }
+
+    public PGline asLine() {
+        try {
+            return new PGline(asString());
+        } catch (final SQLException e) {
+            logger.error("Failed to parse point {}, {}", asString(), e);
+            throw new ListenerException(e);
+        }
+    }
+
+    public PGlseg asLseg() {
+        try {
+            return new PGlseg(asString());
+        } catch (final SQLException e) {
+            logger.error("Failed to parse point {}, {}", asString(), e);
+            throw new ListenerException(e);
+        }
+    }
+
+    public PGmoney asMoney() {
+        try {
+            final String value = asString();
+            if (value != null && value.startsWith("-")) {
+                final String negativeMoney = "(" + value.substring(1) + ")";
+                return new PGmoney(negativeMoney);
+            }
+            return new PGmoney(asString());
+        } catch (final SQLException e) {
+            logger.error("Failed to parse money {}, {}", asString(), e);
+            throw new ListenerException(e);
+        }
+    }
+
+    public PGpath asPath() {
+        try {
+            return new PGpath(asString());
+        } catch (final SQLException e) {
+            logger.error("Failed to parse point {}, {}", asString(), e);
+            throw new ListenerException(e);
+        }
+    }
+
+    public PGpoint asPoint() {
+        try {
+            return new PGpoint(asString());
+        } catch (final SQLException e) {
+            logger.error("Failed to parse point {}, {}", asString(), e);
+            throw new ListenerException(e);
+        }
+    }
+
+    public PGpolygon asPolygon() {
+        try {
+            return new PGpolygon(asString());
+        } catch (final SQLException e) {
+            logger.error("Failed to parse point {}, {}", asString(), e);
+            throw new ListenerException(e);
+        }
+    }
+
+    private Instant toInstantFromMicros(long microsSinceEpoch) {
+        return Instant.ofEpochSecond(
+                TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch),
+                TimeUnit.MICROSECONDS.toNanos(microsSinceEpoch % TimeUnit.SECONDS.toMicros(1)));
+    }
+
+    private Instant toInstantFromMillis(long millisecondSinceEpoch) {
+        return Instant.ofEpochSecond(
+                TimeUnit.MILLISECONDS.toSeconds(millisecondSinceEpoch),
+                TimeUnit.MILLISECONDS.toNanos(millisecondSinceEpoch % TimeUnit.SECONDS.toMillis(1)));
+    }
+
 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -145,7 +145,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
                 } catch (Exception e) {
                     logger.error("[{}]异常{}", key);
                 }
-                logger.info("[{}]{}条,耗时{}秒", key, flushTask.getTaskSize(), (Instant.now().toEpochMilli() - now) / 1000);
+                logger.info("[{}]{}条,耗时{}秒", key, flushTask.getTaskSize(), (Instant.now().toEpochMilli() - now));
             });
             map.clear();
         }

+ 4 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -7,6 +7,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class Picker {
 
@@ -66,4 +67,7 @@ public class Picker {
         return targetFields;
     }
 
+    public Map<String, Field> getSourceFieldMap() {
+        return sourceFields.stream().collect(Collectors.toMap(Field::getName, f -> f, (k1, k2) -> k1));
+    }
 }

+ 27 - 11
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableWriterBufferActuatorStrategy.java

@@ -3,8 +3,10 @@ package org.dbsyncer.parser.strategy.impl;
 import com.google.protobuf.ByteString;
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Picker;
@@ -69,25 +71,39 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
 
     @Override
     protected WriterRequest deserialize(BinlogMessage message) {
-        String tableGroupId = message.getTableGroupId();
-        TableGroup tableGroup = cacheService.get(tableGroupId, TableGroup.class);
-        Mapping mapping = cacheService.get(tableGroup.getMappingId(), Mapping.class);
-
-        // 1、获取映射字段
-        String event = message.getEvent().name();
-        String sourceTableName = tableGroup.getSourceTable().getName();
-        String targetTableName = tableGroup.getTargetTable().getName();
+        if (CollectionUtils.isEmpty(message.getData().getRowMap())) {
+            return null;
+        }
 
+        // 1、获取配置信息
+        final String tableGroupId = message.getTableGroupId();
+        final TableGroup tableGroup = cacheService.get(tableGroupId, TableGroup.class);
+        final Mapping mapping = cacheService.get(tableGroup.getMappingId(), Mapping.class);
+        final String event = message.getEvent().name();
+        final String sourceTableName = tableGroup.getSourceTable().getName();
+        final String targetTableName = tableGroup.getTargetTable().getName();
+
+        // 2、反序列数据
+        final Picker picker = new Picker(tableGroup.getFieldMapping());
+        final Map<String, Field> fieldMap = picker.getSourceFieldMap();
         Map<String, Object> data = new HashMap<>();
-        Picker picker = new Picker(tableGroup.getFieldMapping());
+        message.getData().getRowMap().forEach((k, v) -> {
+            if (fieldMap.containsKey(k)) {
+                data.put(k, resolveValue(fieldMap.get(k).getType(), v));
+            }
+        });
+
+
+        // 3、获取目标源数据集合
         Map target = picker.pickData(data);
 
-        // 2、参数转换
+        // 4、参数转换
         ConvertUtil.convert(tableGroup.getConvert(), target);
 
-        // 3、插件转换
+        // 5、插件转换
         pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
 
         return new WriterRequest(tableGroupId, target, mapping.getMetaId(), mapping.getTargetConnectorId(), sourceTableName, targetTableName, event, picker.getTargetFields(), tableGroup.getCommand());
     }
+
 }

+ 81 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -1,7 +1,9 @@
 package org.dbsyncer.storage.binlog;
 
+import com.google.protobuf.ByteString;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.storage.binlog.impl.BinlogColumnValue;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -10,6 +12,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 
 import javax.annotation.PostConstruct;
 import java.io.IOException;
+import java.sql.Types;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -33,6 +36,8 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
     private static final long CONTEXT_PERIOD = 10_000;
 
+    private static final BinlogColumnValue value = new BinlogColumnValue();
+
     private final Lock lock = new ReentrantLock(true);
 
     private volatile boolean running;
@@ -109,7 +114,10 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         byte[] line;
         AtomicInteger batchCounter = new AtomicInteger();
         while (batchCounter.get() < MAX_BATCH_COUNT && null != (line = context.readLine())) {
-            getQueue().offer(deserialize(BinlogMessage.parseFrom(line)));
+            Message message = deserialize(BinlogMessage.parseFrom(line));
+            if (null != message) {
+                getQueue().offer(message);
+            }
             batchCounter.getAndAdd(1);
         }
 
@@ -117,4 +125,76 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
             context.flush();
         }
     }
+
+    /**
+     * Resolve value
+     *
+     * @param type
+     * @param columnValue
+     * @return
+     */
+    protected Object resolveValue(int type, ByteString columnValue) {
+        value.setValue(columnValue);
+
+        if (value.isNull()) {
+            return null;
+        }
+
+        switch (type) {
+            // 字符串
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+            case Types.NVARCHAR:
+            case Types.NCHAR:
+            case Types.CHAR:
+                return value.asString();
+
+            // 时间
+            case Types.TIMESTAMP:
+                return value.asTimestamp();
+            case Types.DATE:
+                return value.asDate();
+
+            // 数字
+            case Types.INTEGER:
+            case Types.TINYINT:
+            case Types.SMALLINT:
+                return value.asInteger();
+            case Types.BIGINT:
+                return value.asLong();
+            case Types.FLOAT:
+            case Types.REAL:
+                return value.asFloat();
+            case Types.DOUBLE:
+                return value.asDouble();
+            case Types.DECIMAL:
+            case Types.NUMERIC:
+                return value.asDecimal();
+
+            // 布尔
+            case Types.BOOLEAN:
+                return value.asBoolean();
+
+            // 字节
+            case Types.BIT:
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+                return value.asByteArray();
+
+            // TODO 待实现
+            case Types.NCLOB:
+            case Types.CLOB:
+            case Types.BLOB:
+                return null;
+
+            // 暂不支持
+            case Types.ROWID:
+                return null;
+
+            default:
+                return null;
+        }
+    }
+
 }

+ 77 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogColumnValue.java

@@ -0,0 +1,77 @@
+package org.dbsyncer.storage.binlog.impl;
+
+import com.google.protobuf.ByteString;
+import org.dbsyncer.common.column.AbstractColumnValue;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/6/30 22:39
+ */
+public class BinlogColumnValue extends AbstractColumnValue<ByteString> {
+
+    @Override
+    public String asString() {
+        return getValue().toStringUtf8();
+    }
+
+    @Override
+    public byte[] asByteArray() {
+        return new byte[0];
+    }
+
+    @Override
+    public Short asShort() {
+        return null;
+    }
+
+    @Override
+    public Integer asInteger() {
+        return null;
+    }
+
+    @Override
+    public Long asLong() {
+        return null;
+    }
+
+    @Override
+    public Float asFloat() {
+        return null;
+    }
+
+    @Override
+    public Double asDouble() {
+        return null;
+    }
+
+    @Override
+    public Boolean asBoolean() {
+        return null;
+    }
+
+    @Override
+    public BigDecimal asDecimal() {
+        return null;
+    }
+
+    @Override
+    public Date asDate() {
+        return null;
+    }
+
+    @Override
+    public Timestamp asTimestamp() {
+        return null;
+    }
+
+    @Override
+    public Time asTime() {
+        return Time.valueOf(getValue().toStringUtf8());
+    }
+}

+ 1 - 0
dbsyncer-storage/src/main/test/BinlogMessageTest.java

@@ -49,6 +49,7 @@ public class BinlogMessageTest {
             BinlogMessage binlogMessage = BinlogMessage.parseFrom(line);
             logger.info(binlogMessage.toString());
         }
+        context.flush();
     }
 
     private void write(String tableGroupId, String key) throws IOException {