1
0
AE86 2 жил өмнө
parent
commit
62fff5f828

+ 37 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/DataSyncService.java

@@ -0,0 +1,37 @@
+package org.dbsyncer.biz;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.dbsyncer.biz.vo.MessageVo;
+
+import java.util.Map;
+
+public interface DataSyncService {
+
+    /**
+     * 获取同步数据
+     *
+     * @param metaId
+     * @param messageId
+     * @return
+     */
+    MessageVo getMessageVo(String metaId, String messageId);
+
+    /**
+     * 获取Binlog
+     *
+     * @param row
+     * @param prettyBytes
+     * @return
+     * @throws InvalidProtocolBufferException
+     */
+    Map getBinlogData(Map row, boolean prettyBytes) throws InvalidProtocolBufferException;
+
+    /**
+     * 手动同步数据
+     *
+     * @param params
+     * @return
+     */
+    String sync(Map<String, String> params);
+
+}

+ 0 - 16
dbsyncer-biz/src/main/java/org/dbsyncer/biz/MonitorService.java

@@ -48,22 +48,6 @@ public interface MonitorService {
      */
     Paging queryData(Map<String, String> params);
 
-    /**
-     * 获取驱动同步数据
-     * @param metaId
-     * @param messageId
-     * @return
-     */
-    MessageVo getMessageVo(String metaId, String messageId);
-
-    /**
-     * 手动同步单条数据
-     *
-     * @param params
-     * @return
-     */
-    String sync(Map<String, String> params);
-
     /**
      * 清空驱动同步数据
      *

+ 205 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/DataSyncServiceImpl.java

@@ -0,0 +1,205 @@
+package org.dbsyncer.biz.impl;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.dbsyncer.biz.DataSyncService;
+import org.dbsyncer.biz.vo.BinlogColumnVo;
+import org.dbsyncer.biz.vo.MessageVo;
+import org.dbsyncer.cache.CacheService;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.DateFormatUtil;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.monitor.Monitor;
+import org.dbsyncer.parser.flush.BufferActuator;
+import org.dbsyncer.parser.model.Picker;
+import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.WriterRequest;
+import org.dbsyncer.storage.binlog.proto.BinlogMap;
+import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.util.BinlogMessageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.util.Assert;
+
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * 数据同步服务
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/12/19 23:56
+ */
+@Service
+public class DataSyncServiceImpl implements DataSyncService {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Resource
+    private Monitor monitor;
+
+    @Resource
+    private CacheService cacheService;
+
+    @Resource
+    private BufferActuator writerBufferActuator;
+
+    @Override
+    public MessageVo getMessageVo(String metaId, String messageId) {
+        Assert.hasText(metaId, "The metaId is null.");
+        Assert.hasText(messageId, "The messageId is null.");
+
+        MessageVo messageVo = new MessageVo();
+        try {
+            Map row = monitor.getData(metaId, messageId);
+            Map binlogData = getBinlogData(row, true);
+            String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
+            TableGroup tableGroup = monitor.getTableGroup(tableGroupId);
+            messageVo.setSourceTableName(tableGroup.getSourceTable().getName());
+            messageVo.setTargetTableName(tableGroup.getTargetTable().getName());
+            messageVo.setId(messageId);
+
+            if (!CollectionUtils.isEmpty(binlogData)) {
+                Map<String, String> columnMap = tableGroup.getTargetTable().getColumn().stream().collect(Collectors.toMap(Field::getName, Field::getTypeName));
+                List<BinlogColumnVo> columns = new ArrayList<>();
+                binlogData.forEach((k, v) -> columns.add(new BinlogColumnVo((String) k, v, columnMap.get(k))));
+                messageVo.setColumns(columns);
+            }
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage());
+        }
+        return messageVo;
+    }
+
+    @Override
+    public Map getBinlogData(Map row, boolean prettyBytes) throws InvalidProtocolBufferException {
+        String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
+        byte[] bytes = (byte[]) row.get(ConfigConstant.BINLOG_DATA);
+        if (null == bytes) {
+            if (prettyBytes) {
+                String json = (String) row.get(ConfigConstant.CONFIG_MODEL_JSON);
+                return JsonUtil.parseObject(json).toJavaObject(Map.class);
+            }
+            return Collections.EMPTY_MAP;
+        }
+        BinlogMap message = BinlogMap.parseFrom(bytes);
+
+        // 1、获取配置信息
+        final TableGroup tableGroup = cacheService.get(tableGroupId, TableGroup.class);
+
+        // 2、反序列数据
+        Map<String, Object> map = new HashMap<>();
+        final Picker picker = new Picker(tableGroup.getFieldMapping());
+        final Map<String, Field> fieldMap = picker.getSourceFieldMap();
+        message.getRowMap().forEach((k, v) -> {
+            if (fieldMap.containsKey(k)) {
+                Object val = BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v);
+                // 处理二进制对象显示
+                if (prettyBytes) {
+                    if (null != val && val instanceof byte[]) {
+                        byte[] b = (byte[]) val;
+                        if (b.length > 128) {
+                            map.put(k, String.format("byte[%d]", b.length));
+                            return;
+                        }
+                        map.put(k, Arrays.toString(b));
+                        return;
+                    }
+                }
+                map.put(k, val);
+            }
+        });
+        return map;
+    }
+
+    @Override
+    public String sync(Map<String, String> params) {
+        String metaId = params.get("metaId");
+        String messageId = params.get("messageId");
+        Assert.hasText(metaId, "The metaId is null.");
+        Assert.hasText(messageId, "The messageId is null.");
+
+        try {
+            Map row = monitor.getData(metaId, messageId);
+            Map binlogData = getBinlogData(row, false);
+            // 历史数据不支持手动同步
+            if (CollectionUtils.isEmpty(binlogData)) {
+                return messageId;
+            }
+            String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
+            String event = (String) row.get(ConfigConstant.DATA_EVENT);
+            // 有修改同步值
+            String retryDataParams = params.get("retryDataParams");
+            if (StringUtil.isNotBlank(retryDataParams)) {
+                JsonUtil.parseObject(retryDataParams).getInnerMap().forEach((k, v) -> binlogData.put(k, convertValue(binlogData.get(k), (String) v)));
+            }
+            writerBufferActuator.offer(new WriterRequest(tableGroupId, event, binlogData));
+            monitor.removeData(metaId, messageId);
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage());
+        }
+        return messageId;
+    }
+
+    private Object convertValue(Object oldValue, String newValue) {
+        if (oldValue == null) {
+            return newValue;
+        }
+
+        Object newVal;
+        String type = oldValue.getClass().getName();
+        switch (type) {
+            case "java.sql.Date":
+                newVal = DateFormatUtil.stringToDate(newValue);
+                break;
+            case "java.sql.Timestamp":
+                newVal = DateFormatUtil.stringToTimestamp(newValue);
+                break;
+            case "java.lang.Integer":
+            case "java.lang.Short":
+                newVal = NumberUtil.toInt(newValue);
+                break;
+            case "java.lang.Long":
+                newVal = NumberUtil.toLong(newValue);
+                break;
+            case "java.lang.Float":
+                newVal = Float.valueOf(newValue);
+                break;
+            case "java.lang.Double":
+                newVal = Double.valueOf(newValue);
+                break;
+            case "[B":
+                newVal = stringToBytes(newValue);
+                break;
+            default:
+                newVal = newValue;
+        }
+
+        return newVal;
+    }
+
+    private byte[] stringToBytes(String s) {
+        byte[] b = null;
+        if (s.startsWith("[") && s.endsWith("]")) {
+            s = StringUtil.substring(s, 1, s.length() - 1);
+            String[] split = StringUtil.split(s, ",");
+            int length = split.length;
+            b = new byte[length];
+            for (int i = 0; i < length; i++) {
+                b[i] = Byte.valueOf(split[i].trim());
+            }
+        }
+        return b;
+    }
+
+}

+ 6 - 170
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.biz.impl;
 
-import com.google.protobuf.InvalidProtocolBufferException;
+import org.dbsyncer.biz.DataSyncService;
 import org.dbsyncer.biz.MonitorService;
 import org.dbsyncer.biz.metric.MetricDetailFormatter;
 import org.dbsyncer.biz.metric.impl.CpuMetricDetailFormatter;
@@ -10,20 +10,15 @@ import org.dbsyncer.biz.metric.impl.GCMetricDetailFormatter;
 import org.dbsyncer.biz.metric.impl.MemoryMetricDetailFormatter;
 import org.dbsyncer.biz.metric.impl.ValueMetricDetailFormatter;
 import org.dbsyncer.biz.vo.AppReportMetricVo;
-import org.dbsyncer.biz.vo.BinlogColumnVo;
 import org.dbsyncer.biz.vo.DataVo;
 import org.dbsyncer.biz.vo.LogVo;
-import org.dbsyncer.biz.vo.MessageVo;
 import org.dbsyncer.biz.vo.MetaVo;
 import org.dbsyncer.biz.vo.MetricResponseVo;
-import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.monitor.Monitor;
 import org.dbsyncer.monitor.enums.DiskMetricEnum;
 import org.dbsyncer.monitor.enums.MetricEnum;
@@ -32,29 +27,20 @@ import org.dbsyncer.monitor.enums.ThreadPoolMetricEnum;
 import org.dbsyncer.monitor.model.AppReportMetric;
 import org.dbsyncer.monitor.model.MetricResponse;
 import org.dbsyncer.parser.enums.ModelEnum;
-import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
-import org.dbsyncer.parser.model.Picker;
-import org.dbsyncer.parser.model.TableGroup;
-import org.dbsyncer.parser.model.WriterRequest;
-import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
-import org.dbsyncer.storage.util.BinlogMessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -70,14 +56,11 @@ public class MonitorServiceImpl implements MonitorService {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    @Autowired
+    @Resource
     private Monitor monitor;
 
-    @Autowired
-    private CacheService cacheService;
-
-    @Autowired
-    private BufferActuator writerBufferActuator;
+    @Resource
+    private DataSyncService dataSyncService;
 
     private Map<String, MetricDetailFormatter> metricDetailFormatterMap = new LinkedHashMap<>();
 
@@ -141,7 +124,7 @@ public class MonitorServiceImpl implements MonitorService {
         for (Map row : data) {
             try {
                 DataVo dataVo = convert2Vo(row, DataVo.class);
-                Map binlogData = getBinlogData(row, true);
+                Map binlogData = dataSyncService.getBinlogData(row, true);
                 dataVo.setJson(JsonUtil.objToJson(binlogData));
                 list.add(dataVo);
             } catch (Exception e) {
@@ -152,62 +135,6 @@ public class MonitorServiceImpl implements MonitorService {
         return paging;
     }
 
-    @Override
-    public MessageVo getMessageVo(String metaId, String messageId) {
-        Assert.hasText(metaId, "The metaId is null.");
-        Assert.hasText(messageId, "The messageId is null.");
-
-        MessageVo messageVo = new MessageVo();
-        try {
-            Map row = monitor.getData(metaId, messageId);
-            Map binlogData = getBinlogData(row, true);
-            String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
-            TableGroup tableGroup = monitor.getTableGroup(tableGroupId);
-            messageVo.setSourceTableName(tableGroup.getSourceTable().getName());
-            messageVo.setTargetTableName(tableGroup.getTargetTable().getName());
-            messageVo.setId(messageId);
-
-            if (!CollectionUtils.isEmpty(binlogData)) {
-                Map<String, String> columnMap = tableGroup.getTargetTable().getColumn().stream().collect(Collectors.toMap(Field::getName, Field::getTypeName));
-                List<BinlogColumnVo> columns = new ArrayList<>();
-                binlogData.forEach((k, v) -> columns.add(new BinlogColumnVo((String) k, v, columnMap.get(k))));
-                messageVo.setColumns(columns);
-            }
-        } catch (Exception e) {
-            logger.error(e.getLocalizedMessage());
-        }
-        return messageVo;
-    }
-
-    @Override
-    public String sync(Map<String, String> params) {
-        String metaId = params.get("metaId");
-        String messageId = params.get("messageId");
-        Assert.hasText(metaId, "The metaId is null.");
-        Assert.hasText(messageId, "The messageId is null.");
-
-        try {
-            Map row = monitor.getData(metaId, messageId);
-            Map binlogData = getBinlogData(row, false);
-            // 历史数据不支持手动同步
-            if (CollectionUtils.isEmpty(binlogData)) {
-                return messageId;
-            }
-            String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
-            String event = (String) row.get(ConfigConstant.DATA_EVENT);
-            // 有修改同步值
-            String retryDataParams = params.get("retryDataParams");
-            if (StringUtil.isNotBlank(retryDataParams)) {
-                JsonUtil.parseObject(retryDataParams).getInnerMap().forEach((k, v) -> binlogData.put(k, convertValue(binlogData.get(k), (String) v)));
-            }
-            writerBufferActuator.offer(new WriterRequest(tableGroupId, event, binlogData));
-            monitor.removeData(metaId, messageId);
-        } catch (Exception e) {
-            logger.error(e.getLocalizedMessage());
-        }
-        return messageId;
-    }
-
     @Override
     public String clearData(String id) {
         Assert.hasText(id, "驱动不存在.");
@@ -253,97 +180,6 @@ public class MonitorServiceImpl implements MonitorService {
         return vo;
     }
 
-    private Map getBinlogData(Map row, boolean prettyBytes) throws InvalidProtocolBufferException {
-        String tableGroupId = (String) row.get(ConfigConstant.DATA_TABLE_GROUP_ID);
-        byte[] bytes = (byte[]) row.get(ConfigConstant.BINLOG_DATA);
-        if (null == bytes) {
-            if (prettyBytes) {
-                String json = (String) row.get(ConfigConstant.CONFIG_MODEL_JSON);
-                return JsonUtil.parseObject(json).toJavaObject(Map.class);
-            }
-            return Collections.EMPTY_MAP;
-        }
-        BinlogMap message = BinlogMap.parseFrom(bytes);
-
-        // 1、获取配置信息
-        final TableGroup tableGroup = cacheService.get(tableGroupId, TableGroup.class);
-
-        // 2、反序列数据
-        Map<String, Object> map = new HashMap<>();
-        final Picker picker = new Picker(tableGroup.getFieldMapping());
-        final Map<String, Field> fieldMap = picker.getSourceFieldMap();
-        message.getRowMap().forEach((k, v) -> {
-            if (fieldMap.containsKey(k)) {
-                Object val = BinlogMessageUtil.deserializeValue(fieldMap.get(k).getType(), v);
-                // 处理二进制对象显示
-                if (prettyBytes) {
-                    if (null != val && val instanceof byte[]) {
-                        byte[] b = (byte[]) val;
-                        if (b.length > 128) {
-                            map.put(k, String.format("byte[%d]", b.length));
-                            return;
-                        }
-                        map.put(k, Arrays.toString(b));
-                        return;
-                    }
-                }
-                map.put(k, val);
-            }
-        });
-        return map;
-    }
-
-    private Object convertValue(Object oldValue, String newValue) {
-        if (oldValue == null) {
-            return newValue;
-        }
-
-        Object newVal;
-        String type = oldValue.getClass().getName();
-        switch (type) {
-            case "java.sql.Date":
-                newVal = DateFormatUtil.stringToDate(newValue);
-                break;
-            case "java.sql.Timestamp":
-                newVal = DateFormatUtil.stringToTimestamp(newValue);
-                break;
-            case "java.lang.Integer":
-            case "java.lang.Short":
-                newVal = NumberUtil.toInt(newValue);
-                break;
-            case "java.lang.Long":
-                newVal = NumberUtil.toLong(newValue);
-                break;
-            case "java.lang.Float":
-                newVal = Float.valueOf(newValue);
-                break;
-            case "java.lang.Double":
-                newVal = Double.valueOf(newValue);
-                break;
-            case "[B":
-                newVal = stringToBytes(newValue);
-                break;
-            default:
-                newVal = newValue;
-        }
-
-        return newVal;
-    }
-
-    private byte[] stringToBytes(String s) {
-        byte[] b = null;
-        if (s.startsWith("[") && s.endsWith("]")) {
-            s = StringUtil.substring(s, 1, s.length() - 1);
-            String[] split = StringUtil.split(s, ",");
-            int length = split.length;
-            b = new byte[length];
-            for (int i = 0; i < length; i++) {
-                b[i] = Byte.valueOf(split[i].trim());
-            }
-        }
-        return b;
-    }
-
     private MetaVo convertMeta2Vo(Meta meta) {
         Mapping mapping = monitor.getMapping(meta.getMappingId());
         Assert.notNull(mapping, "驱动不存在.");

+ 14 - 10
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/monitor/MonitorController.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.web.controller.monitor;
 
 import org.dbsyncer.biz.ConnectorService;
+import org.dbsyncer.biz.DataSyncService;
 import org.dbsyncer.biz.MonitorService;
 import org.dbsyncer.biz.SystemConfigService;
 import org.dbsyncer.biz.vo.AppReportMetricVo;
@@ -17,7 +18,6 @@ import org.dbsyncer.monitor.model.Sample;
 import org.dbsyncer.web.controller.BaseController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.actuate.health.Health;
 import org.springframework.boot.actuate.health.HealthComponent;
 import org.springframework.boot.actuate.health.HealthEndpoint;
@@ -31,6 +31,7 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import javax.annotation.Resource;
 import javax.servlet.http.HttpServletRequest;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -47,25 +48,28 @@ public class MonitorController extends BaseController {
     private HistoryStackVo cpu = new HistoryStackVo();
     private HistoryStackVo memory = new HistoryStackVo();
 
-    @Autowired
+    @Resource
     private MonitorService monitorService;
 
-    @Autowired
+    @Resource
+    private DataSyncService dataSyncService;
+
+    @Resource
     private ConnectorService connectorService;
 
-    @Autowired
+    @Resource
     private SystemConfigService systemConfigService;
 
-    @Autowired
+    @Resource
     private MetricsEndpoint metricsEndpoint;
 
-    @Autowired
+    @Resource
     private HealthEndpoint healthEndpoint;
 
-    @Autowired
+    @Resource
     private HistoryStackValueFormatter cpuHistoryStackValueFormatterImpl;
 
-    @Autowired
+    @Resource
     private HistoryStackValueFormatter memoryHistoryStackValueFormatterImpl;
 
     @RequestMapping("")
@@ -82,7 +86,7 @@ public class MonitorController extends BaseController {
     @GetMapping("/page/retry")
     public String page(ModelMap model, String metaId, String messageId) {
         model.put("meta", monitorService.getMetaVo(metaId));
-        model.put("message", monitorService.getMessageVo(metaId, messageId));
+        model.put("message", dataSyncService.getMessageVo(metaId, messageId));
         return "monitor/retry.html";
     }
 
@@ -126,7 +130,7 @@ public class MonitorController extends BaseController {
     public RestResult sync(HttpServletRequest request) {
         try {
             Map<String, String> params = getParams(request);
-            return RestResult.restSuccess(monitorService.sync(params));
+            return RestResult.restSuccess(dataSyncService.sync(params));
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e.getClass());
             return RestResult.restFail(e.getMessage());