瀏覽代碼

Merge remote-tracking branch 'origin/V_1.0.0_RC' into V_1.0.0_RC

AE86 2 年之前
父節點
當前提交
cec39cd6af
共有 1 個文件被更改,包括 22 次插入4 次删除
  1. 22 4
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java

+ 22 - 4
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java

@@ -52,6 +52,7 @@ import org.springframework.util.Assert;
 import javax.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -188,6 +189,10 @@ public class MonitorServiceImpl implements MonitorService {
         try {
             Map row = monitor.getData(metaId, messageId);
             Map binlogData = getBinlogData(row, false);
+            // 历史数据不支持手动同步
+            if (CollectionUtils.isEmpty(binlogData)) {
+                return messageId;
+            }
             String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
             String event = (String) row.get(ConfigConstant.DATA_EVENT);
             // 有修改同步值
@@ -251,6 +256,13 @@ public class MonitorServiceImpl implements MonitorService {
     private Map getBinlogData(Map row, boolean prettyBytes) throws InvalidProtocolBufferException {
         String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
         byte[] bytes = (byte[]) row.get(ConfigConstant.BINLOG_DATA);
+        if (null == bytes) {
+            if (prettyBytes) {
+                String json = (String) row.get(ConfigConstant.CONFIG_MODEL_JSON);
+                return JsonUtil.parseObject(json).toJavaObject(Map.class);
+            }
+            return Collections.EMPTY_MAP;
+        }
         BinlogMap message = BinlogMap.parseFrom(bytes);
 
         // 1、获取配置信息
@@ -268,7 +280,7 @@ public class MonitorServiceImpl implements MonitorService {
                     if (null != val && val instanceof byte[]) {
                         byte[] b = (byte[]) val;
                         if (b.length > 128) {
-                            map.put(k, String.format("bytes[%d]", b.length));
+                            map.put(k, String.format("byte[%d]", b.length));
                             return;
                         }
                         map.put(k, Arrays.toString(b));
@@ -286,7 +298,7 @@ public class MonitorServiceImpl implements MonitorService {
             return newValue;
         }
 
-        Object newVal = null;
+        Object newVal;
         String type = oldValue.getClass().getName();
         switch (type) {
             case "java.sql.Date":
@@ -295,13 +307,19 @@ public class MonitorServiceImpl implements MonitorService {
             case "java.sql.Timestamp":
                 newVal = DateFormatUtil.stringToTimestamp(newValue);
                 break;
-            case "java.sql.Integer":
-            case "java.sql.Short":
+            case "java.lang.Integer":
+            case "java.lang.Short":
                 newVal = NumberUtil.toInt(newValue);
                 break;
+            case "java.lang.Long":
+                newVal = NumberUtil.toLong(newValue);
+                break;
             case "java.lang.Float":
                 newVal = Float.valueOf(newValue);
                 break;
+            case "java.lang.Double":
+                newVal = Double.valueOf(newValue);
+                break;
             case "[B":
                 newVal = stringToBytes(newValue);
                 break;