1
0
Эх сурвалжийг харах

!321 merge
Merge pull request !321 from AE86/v_2.0

AE86 5 сар өмнө
parent
commit
36b728fffd
19 өөрчлөгдсөн 94 нэмэгдсэн , 118 устгасан
  1. 1 37
      dbsyncer-connector/dbsyncer-connector-base/src/test/java/ConnectionTest.java
  2. 8 1
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/DeleteDeserializer.java
  3. 8 1
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/UpdateDeserializer.java
  4. 8 1
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/WriteDeserializer.java
  5. 16 1
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/MySQLSchemaResolver.java
  6. 10 2
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLBooleanType.java
  7. 1 1
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLByteType.java
  8. 13 4
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLBytesType.java
  9. 0 10
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLDecimalType.java
  10. 1 11
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLDoubleType.java
  11. 1 11
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLFloatType.java
  12. 4 11
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLIntType.java
  13. 4 11
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLLongType.java
  14. 1 11
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLShortType.java
  15. 8 1
      dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLStringType.java
  16. 1 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/AbstractConnector.java
  17. 6 2
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/schema/AbstractSchemaResolver.java
  18. 1 1
      dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/schema/support/BytesType.java
  19. 2 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/BinlogMessageUtil.java

+ 1 - 37
dbsyncer-connector/dbsyncer-connector-base/src/test/java/ConnectionTest.java

@@ -242,7 +242,7 @@ public class ConnectionTest {
         final String update = "UPDATE `test`.`vote_records_test` SET `user_id` = ?, `create_time` = now() WHERE `id` = ?";
         final String delete = "DELETE from `test`.`vote_records_test` WHERE `id` = ?";
 
-        // 模拟单表增删改事件,每个事件间隔2条数据
+        // 模拟单表增删改事件
         for (int i = 0; i < threadSize; i++) {
             final int offset = i;
             pool.submit(() -> {
@@ -250,8 +250,6 @@ public class ConnectionTest {
                     logger.info("{}-开始任务", Thread.currentThread().getName());
                     // 增删改事件密集型
                     mockData(connectorInstance, num, offset, insert, update, delete);
-                    // 增改事件密集型
-//                    mockData2(connectorInstance, num, offset, insert, update);
                     logger.info("{}-结束任务", Thread.currentThread().getName());
                 } catch (Exception e) {
                     logger.error(e.getMessage());
@@ -267,7 +265,6 @@ public class ConnectionTest {
             logger.error(e.getMessage());
         }
         pool.shutdown();
-//        logger.info("总数:{}, 耗时:{}秒", (threadSize * num * 3), (Instant.now().toEpochMilli() - begin) / 1000);
         logger.info("总数:{}, 耗时:{}秒", (threadSize * num), (Instant.now().toEpochMilli() - begin) / 1000);
     }
 
@@ -309,39 +306,6 @@ public class ConnectionTest {
         }
     }
 
-    private void mockData2(DatabaseConnectorInstance connectorInstance, int num, int offset, String insert, String update) {
-        List<Object[]> insertData = new ArrayList<>();
-        List<Object[]> updateData = new ArrayList<>();
-        final int batch = 100;
-        int start = offset * num;
-        logger.info("{}-offset:{}, start:{}", Thread.currentThread().getName(), offset, start);
-        for (int i = 1; i <= num; i++) {
-            // insert
-            Object[] insertArgs = new Object[6];
-            insertArgs[0] = i + start;
-            insertArgs[1] = randomUserId(20);
-            insertArgs[2] = RandomUtil.nextInt(1, 9999);
-            insertArgs[3] = RandomUtil.nextInt(0, 3);
-            insertArgs[4] = RandomUtil.nextInt(1, 3);
-            insertArgs[5] = Timestamp.valueOf(LocalDateTime.now());
-            insertData.add(insertArgs);
-
-            // update
-            Object[] updateArgs = new Object[2];
-            updateArgs[0] = randomUserId(20);
-            updateArgs[1] = i + start;
-            updateData.add(updateArgs);
-
-            if (i % batch == 0) {
-                connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(insert, insertData));
-                connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(update, updateData));
-                logger.info("{}, 数据行[{}, {}], 已处理:{}", Thread.currentThread().getName(), start, start + num, i + start);
-                insertData.clear();
-                updateData.clear();
-            }
-        }
-    }
-
     private final static String STR = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
 
     private String randomUserId(int i) {

+ 8 - 1
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/DeleteDeserializer.java

@@ -11,7 +11,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
-public class DeleteDeserializer extends DeleteRowsEventDataDeserializer {
+public final class DeleteDeserializer extends DeleteRowsEventDataDeserializer {
 
     private final DatetimeV2Deserialize datetimeV2Deserialize = new DatetimeV2Deserialize();
     private final JsonBinaryDeserialize jsonBinaryDeserialize = new JsonBinaryDeserialize();
@@ -20,10 +20,17 @@ public class DeleteDeserializer extends DeleteRowsEventDataDeserializer {
         super(tableMapEventByTableId);
     }
 
+    @Override
+    protected Serializable deserializeTiny(ByteArrayInputStream inputStream) throws IOException {
+        return inputStream.readInteger(1);
+    }
+
+    @Override
     protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
         return datetimeV2Deserialize.deserializeDatetimeV2(meta, inputStream);
     }
 
+    @Override
     protected byte[] deserializeJson(int meta, ByteArrayInputStream inputStream) throws IOException {
         return jsonBinaryDeserialize.deserializeJson(meta, inputStream);
     }

+ 8 - 1
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/UpdateDeserializer.java

@@ -11,7 +11,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
-public class UpdateDeserializer extends UpdateRowsEventDataDeserializer {
+public final class UpdateDeserializer extends UpdateRowsEventDataDeserializer {
 
     private final DatetimeV2Deserialize datetimeV2Deserialize = new DatetimeV2Deserialize();
     private final JsonBinaryDeserialize jsonBinaryDeserialize = new JsonBinaryDeserialize();
@@ -20,10 +20,17 @@ public class UpdateDeserializer extends UpdateRowsEventDataDeserializer {
         super(tableMapEventByTableId);
     }
 
+    @Override
+    protected Serializable deserializeTiny(ByteArrayInputStream inputStream) throws IOException {
+        return inputStream.readInteger(1);
+    }
+
+    @Override
     protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
         return datetimeV2Deserialize.deserializeDatetimeV2(meta, inputStream);
     }
 
+    @Override
     protected byte[] deserializeJson(int meta, ByteArrayInputStream inputStream) throws IOException {
         return jsonBinaryDeserialize.deserializeJson(meta, inputStream);
     }

+ 8 - 1
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/deserializer/WriteDeserializer.java

@@ -11,7 +11,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
-public class WriteDeserializer extends WriteRowsEventDataDeserializer {
+public final class WriteDeserializer extends WriteRowsEventDataDeserializer {
 
     private final DatetimeV2Deserialize datetimeV2Deserialize = new DatetimeV2Deserialize();
     private final JsonBinaryDeserialize jsonBinaryDeserialize = new JsonBinaryDeserialize();
@@ -20,10 +20,17 @@ public class WriteDeserializer extends WriteRowsEventDataDeserializer {
         super(tableMapEventByTableId);
     }
 
+    @Override
+    protected Serializable deserializeTiny(ByteArrayInputStream inputStream) throws IOException {
+        return inputStream.readInteger(1);
+    }
+
+    @Override
     protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
         return datetimeV2Deserialize.deserializeDatetimeV2(meta, inputStream);
     }
 
+    @Override
     protected byte[] deserializeJson(int meta, ByteArrayInputStream inputStream) throws IOException {
         return jsonBinaryDeserialize.deserializeJson(meta, inputStream);
     }

+ 16 - 1
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/MySQLSchemaResolver.java

@@ -5,6 +5,7 @@ package org.dbsyncer.connector.mysql.schema;
 
 import org.dbsyncer.connector.mysql.MySQLException;
 import org.dbsyncer.connector.mysql.schema.support.*;
+import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.schema.AbstractSchemaResolver;
 import org.dbsyncer.sdk.schema.DataType;
 
@@ -21,11 +22,14 @@ import java.util.stream.Stream;
  */
 public final class MySQLSchemaResolver extends AbstractSchemaResolver {
 
+    private MySQLBytesType bytesType;
+
     @Override
     protected void initDataTypeMapping(Map<String, DataType> mapping) {
+        bytesType = new MySQLBytesType();
         Stream.of(
                 new MySQLBooleanType(),
-                new MySQLBytesType(),
+                bytesType,
                 new MySQLByteType(),
                 new MySQLDateType(),
                 new MySQLDecimalType(),
@@ -45,4 +49,15 @@ public final class MySQLSchemaResolver extends AbstractSchemaResolver {
         }));
     }
 
+    @Override
+    protected DataType getDataType(Map<String, DataType> mapping, Field field) {
+        DataType dataType = super.getDataType(mapping, field);
+        // bit(n > 1)
+        if (dataType instanceof MySQLBooleanType) {
+            if (field.getColumnSize() > 1) {
+                return bytesType;
+            }
+        }
+        return dataType;
+    }
 }

+ 10 - 2
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLBooleanType.java

@@ -7,6 +7,7 @@ import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.schema.support.BooleanType;
 
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -28,17 +29,24 @@ public final class MySQLBooleanType extends BooleanType {
 
     @Override
     protected Boolean merge(Object val, Field field) {
+        if (val instanceof Integer) {
+            return (Integer) val == 1;
+        }
+        if (val instanceof BitSet) {
+            BitSet bitSet = (BitSet) val;
+            return bitSet.get(0);
+        }
         return throwUnsupportedException(val, field);
     }
 
     @Override
     protected Boolean getDefaultMergedVal() {
-        return false;
+        return null;
     }
 
     @Override
     protected Object convert(Object val, Field field) {
-        if(val instanceof Boolean) {
+        if (val instanceof Boolean) {
             Boolean b = (Boolean) val;
             return (short) (b ? 1 : 0);
         }

+ 1 - 1
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLByteType.java

@@ -36,7 +36,7 @@ public final class MySQLByteType extends ByteType {
 
     @Override
     protected Byte getDefaultMergedVal() {
-        return 0;
+        return null;
     }
 
     @Override

+ 13 - 4
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLBytesType.java

@@ -6,7 +6,9 @@ package org.dbsyncer.connector.mysql.schema.support;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.schema.support.BytesType;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -32,18 +34,25 @@ public final class MySQLBytesType extends BytesType {
     }
 
     @Override
-    protected Byte[] merge(Object val, Field field) {
+    protected byte[] merge(Object val, Field field) {
+        if (val instanceof String) {
+            return ((String) val).getBytes(StandardCharsets.UTF_8);
+        }
+        if (val instanceof BitSet) {
+            BitSet bitSet = (BitSet) val;
+            return bitSet.toByteArray();
+        }
         return throwUnsupportedException(val, field);
     }
 
     @Override
-    protected Byte[] getDefaultMergedVal() {
-        return new Byte[0];
+    protected byte[] getDefaultMergedVal() {
+        return null;
     }
 
     @Override
     protected Object convert(Object val, Field field) {
-        if (val instanceof Byte[]) {
+        if (val instanceof byte[]) {
             return val;
         }
         return throwUnsupportedException(val, field);

+ 0 - 10
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLDecimalType.java

@@ -3,7 +3,6 @@
  */
 package org.dbsyncer.connector.mysql.schema.support;
 
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.schema.support.DecimalType;
 
@@ -34,15 +33,6 @@ public final class MySQLDecimalType extends DecimalType {
             this.value = value;
         }
 
-        public TypeEnum getTypeEnum(String value) {
-            for (TypeEnum type : TypeEnum.values()) {
-                if (StringUtil.equals(value, type.value)) {
-                    return type;
-                }
-            }
-            throw new IllegalArgumentException("Can not find type:" + value);
-        }
-
         public String getValue() {
             return value;
         }

+ 1 - 11
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLDoubleType.java

@@ -3,7 +3,6 @@
  */
 package org.dbsyncer.connector.mysql.schema.support;
 
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.schema.support.DoubleType;
 
@@ -35,15 +34,6 @@ public final class MySQLDoubleType extends DoubleType {
             this.value = value;
         }
 
-        public TypeEnum getTypeEnum(String value) {
-            for (TypeEnum type : TypeEnum.values()) {
-                if (StringUtil.equals(value, type.value)) {
-                    return type;
-                }
-            }
-            throw new IllegalArgumentException("Can not find type:" + value);
-        }
-
         public String getValue() {
             return value;
         }
@@ -64,7 +54,7 @@ public final class MySQLDoubleType extends DoubleType {
 
     @Override
     protected Double getDefaultMergedVal() {
-        return 0.0;
+        return null;
     }
 
     @Override

+ 1 - 11
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLFloatType.java

@@ -3,7 +3,6 @@
  */
 package org.dbsyncer.connector.mysql.schema.support;
 
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.schema.support.FloatType;
 
@@ -29,15 +28,6 @@ public final class MySQLFloatType extends FloatType {
             this.value = value;
         }
 
-        public TypeEnum getTypeEnum(String value) {
-            for (TypeEnum type : TypeEnum.values()) {
-                if (StringUtil.equals(value, type.value)) {
-                    return type;
-                }
-            }
-            throw new IllegalArgumentException("Can not find type:" + value);
-        }
-
         public String getValue() {
             return value;
         }
@@ -58,7 +48,7 @@ public final class MySQLFloatType extends FloatType {
 
     @Override
     protected Float getDefaultMergedVal() {
-        return 0f;
+        return null;
     }
 
     @Override

+ 4 - 11
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLIntType.java

@@ -3,7 +3,6 @@
  */
 package org.dbsyncer.connector.mysql.schema.support;
 
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.schema.support.IntType;
 
@@ -32,15 +31,6 @@ public final class MySQLIntType extends IntType {
 
         private final String value;
 
-        public TypeEnum getTypeEnum(String value) {
-            for (TypeEnum type : TypeEnum.values()) {
-                if (StringUtil.equals(value, type.value)) {
-                    return type;
-                }
-            }
-            throw new IllegalArgumentException("Can not find type:" + value);
-        }
-
         TypeEnum(String value) {
             this.value = value;
         }
@@ -63,12 +53,15 @@ public final class MySQLIntType extends IntType {
             calendar.setTime(d);
             return calendar.get(Calendar.YEAR);
         }
+        if (val instanceof Number) {
+            return ((Number) val).intValue();
+        }
         return throwUnsupportedException(val, field);
     }
 
     @Override
     protected Integer getDefaultMergedVal() {
-        return 0;
+        return null;
     }
 
     @Override

+ 4 - 11
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLLongType.java

@@ -3,7 +3,6 @@
  */
 package org.dbsyncer.connector.mysql.schema.support;
 
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.schema.support.LongType;
 
@@ -30,15 +29,6 @@ public final class MySQLLongType extends LongType {
             this.value = value;
         }
 
-        public TypeEnum getTypeEnum(String value) {
-            for (TypeEnum type : TypeEnum.values()) {
-                if (StringUtil.equals(value, type.value)) {
-                    return type;
-                }
-            }
-            throw new IllegalArgumentException("Can not find type:" + value);
-        }
-
         public String getValue() {
             return value;
         }
@@ -51,12 +41,15 @@ public final class MySQLLongType extends LongType {
 
     @Override
     protected Long merge(Object val, Field field) {
+        if (val instanceof Number) {
+            return ((Number) val).longValue();
+        }
         return throwUnsupportedException(val, field);
     }
 
     @Override
     protected Long getDefaultMergedVal() {
-        return 0L;
+        return null;
     }
 
     @Override

+ 1 - 11
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLShortType.java

@@ -3,7 +3,6 @@
  */
 package org.dbsyncer.connector.mysql.schema.support;
 
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.schema.support.ShortType;
 
@@ -29,15 +28,6 @@ public final class MySQLShortType extends ShortType {
             this.value = value;
         }
 
-        public TypeEnum getTypeEnum(String value) {
-            for (TypeEnum type : TypeEnum.values()) {
-                if (StringUtil.equals(value, type.value)) {
-                    return type;
-                }
-            }
-            throw new IllegalArgumentException("Can not find type:" + value);
-        }
-
         public String getValue() {
             return value;
         }
@@ -58,7 +48,7 @@ public final class MySQLShortType extends ShortType {
 
     @Override
     protected Short getDefaultMergedVal() {
-        return 0;
+        return null;
     }
 
     @Override

+ 8 - 1
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/schema/support/MySQLStringType.java

@@ -6,6 +6,7 @@ package org.dbsyncer.connector.mysql.schema.support;
 import org.dbsyncer.sdk.model.Field;
 import org.dbsyncer.sdk.schema.support.StringType;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -35,12 +36,18 @@ public final class MySQLStringType extends StringType {
 
     @Override
     protected String merge(Object val, Field field) {
+        if (val instanceof byte[]) {
+            return new String((byte[]) val, StandardCharsets.UTF_8);
+        }
         switch (TypeEnum.valueOf(field.getTypeName())) {
             case CHAR:
             case VARCHAR:
             case TINYTEXT:
             case TEXT:
             case MEDIUMTEXT:
+            case LONGTEXT:
+            case ENUM:
+            case JSON:
                 return String.valueOf(val);
 
             default:
@@ -50,7 +57,7 @@ public final class MySQLStringType extends StringType {
 
     @Override
     protected String getDefaultMergedVal() {
-        return "";
+        return null;
     }
 
     @Override

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/AbstractConnector.java

@@ -142,7 +142,7 @@ public abstract class AbstractConnector {
                     Object o = resolver.merge(row.get(f.getName()), f);
                     row.put(f.getName(), resolver.convert(o, f));
                 } catch (Exception e) {
-                    logger.error("convert value error: ({}, {}, {})", config.getTableName(), f.getName(), row.get(f.getName()));
+                    logger.error("convert value error: ({}, {}, {}, {})", config.getTableName(), f.getName(), row.get(f.getName()), e);
                     throw new SdkException(e);
                 }
             }

+ 6 - 2
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/schema/AbstractSchemaResolver.java

@@ -26,9 +26,13 @@ public abstract class AbstractSchemaResolver implements SchemaResolver {
 
     protected abstract void initDataTypeMapping(Map<String, DataType> mapping);
 
+    protected DataType getDataType(Map<String, DataType> mapping, Field field) {
+        return mapping.get(field.getTypeName());
+    }
+
     @Override
     public Object merge(Object val, Field field) {
-        DataType dataType = mapping.get(field.getTypeName());
+        DataType dataType = getDataType(mapping, field);
         if (dataType != null) {
             return dataType.mergeValue(val, field);
         }
@@ -37,7 +41,7 @@ public abstract class AbstractSchemaResolver implements SchemaResolver {
 
     @Override
     public Object convert(Object val, Field field) {
-        DataType dataType = mapping.get(field.getTypeName());
+        DataType dataType = getDataType(mapping, field);
         if (dataType != null) {
             return dataType.convertValue(val, field);
         }

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/schema/support/BytesType.java

@@ -11,7 +11,7 @@ import org.dbsyncer.sdk.schema.AbstractDataType;
  * @Version 1.0.0
  * @Date 2024-11-21 23:56
  */
-public abstract class BytesType extends AbstractDataType<Byte[]> {
+public abstract class BytesType extends AbstractDataType<byte[]> {
 
     @Override
     public DataTypeEnum getType() {

+ 2 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/BinlogMessageUtil.java

@@ -77,6 +77,8 @@ public abstract class BinlogMessageUtil {
             // 字节
             case "[B":
                 return ByteString.copyFrom((byte[]) v);
+            case "java.lang.Byte":
+                return ByteString.copyFromUtf8(String.valueOf(v));
 
             // 字符串
             case "java.lang.String":