Browse Source

simple code

AE86 1 năm trước cách đây
mục cha
commit
cb48612fe3

+ 13 - 23
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java

@@ -3,6 +3,9 @@ package org.dbsyncer.biz.impl;
 import org.dbsyncer.biz.DataSyncService;
 import org.dbsyncer.biz.MonitorService;
 import org.dbsyncer.biz.SystemConfigService;
+import org.dbsyncer.biz.enums.BufferActuatorMetricEnum;
+import org.dbsyncer.biz.enums.DiskMetricEnum;
+import org.dbsyncer.biz.enums.MetricEnum;
 import org.dbsyncer.biz.metric.MetricDetailFormatter;
 import org.dbsyncer.biz.metric.impl.CpuMetricDetailFormatter;
 import org.dbsyncer.biz.metric.impl.DiskMetricDetailFormatter;
@@ -10,32 +13,28 @@ import org.dbsyncer.biz.metric.impl.DoubleRoundMetricDetailFormatter;
 import org.dbsyncer.biz.metric.impl.GCMetricDetailFormatter;
 import org.dbsyncer.biz.metric.impl.MemoryMetricDetailFormatter;
 import org.dbsyncer.biz.metric.impl.ValueMetricDetailFormatter;
+import org.dbsyncer.biz.model.AppReportMetric;
+import org.dbsyncer.biz.model.MetricResponse;
 import org.dbsyncer.biz.vo.AppReportMetricVo;
 import org.dbsyncer.biz.vo.DataVo;
 import org.dbsyncer.biz.vo.LogVo;
 import org.dbsyncer.biz.vo.MetaVo;
 import org.dbsyncer.biz.vo.MetricResponseVo;
-import org.dbsyncer.manager.event.PreloadCompletedEvent;
 import org.dbsyncer.common.model.Paging;
-import org.dbsyncer.connector.scheduled.ScheduledTaskJob;
-import org.dbsyncer.connector.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.enums.FilterEnum;
-import org.dbsyncer.biz.enums.BufferActuatorMetricEnum;
-import org.dbsyncer.biz.enums.DiskMetricEnum;
-import org.dbsyncer.biz.enums.MetricEnum;
-import org.dbsyncer.biz.model.AppReportMetric;
-import org.dbsyncer.biz.model.MetricResponse;
-import org.dbsyncer.parser.ProfileComponent;
-import org.dbsyncer.parser.enums.MetaEnum;
-import org.dbsyncer.sdk.enums.ModelEnum;
+import org.dbsyncer.connector.scheduled.ScheduledTaskJob;
+import org.dbsyncer.connector.scheduled.ScheduledTaskService;
 import org.dbsyncer.parser.LogService;
 import org.dbsyncer.parser.LogType;
+import org.dbsyncer.parser.ProfileComponent;
+import org.dbsyncer.parser.enums.MetaEnum;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
+import org.dbsyncer.sdk.enums.ModelEnum;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
@@ -47,7 +46,6 @@ import org.dbsyncer.storage.query.filter.LongFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
-import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 
@@ -69,7 +67,7 @@ import java.util.stream.Collectors;
  * @date 2020/04/27 10:20
  */
 @Service
-public class MonitorServiceImpl extends BaseServiceImpl implements MonitorService, ScheduledTaskJob, ApplicationListener<PreloadCompletedEvent> {
+public class MonitorServiceImpl extends BaseServiceImpl implements MonitorService, ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -94,8 +92,6 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
     @Resource
     private SystemConfigService systemConfigService;
 
-    private boolean preloadCompleted;
-
     private Map<String, MetricDetailFormatter> metricDetailFormatterMap = new LinkedHashMap<>();
 
     @PostConstruct
@@ -205,10 +201,8 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
 
     @Override
     public void deleteExpiredDataAndLog() {
-        if (preloadCompleted) {
-            deleteExpiredData();
-            deleteExpiredLog();
-        }
+        deleteExpiredData();
+        deleteExpiredLog();
     }
 
     @Override
@@ -350,8 +344,4 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
         }).collect(Collectors.toList());
     }
 
-    @Override
-    public void onApplicationEvent(PreloadCompletedEvent event) {
-        preloadCompleted = true;
-    }
 }

+ 6 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseConnectorMapper.java

@@ -36,6 +36,11 @@ public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig,
         }
     }
 
+    @Override
+    public String getServiceUrl() {
+        return config.getUrl();
+    }
+
     @Override
     public DatabaseConfig getConfig() {
         return config;
@@ -61,4 +66,4 @@ public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig,
         return super.clone();
     }
 
-}
+}

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnectorMapper.java

@@ -23,6 +23,11 @@ public final class ESConnectorMapper implements ConnectorMapper<ESConfig, EasyRe
         }
     }
 
+    @Override
+    public String getServiceUrl() {
+        return config.getUrl();
+    }
+
     @Override
     public ESConfig getConfig() {
         return config;

+ 6 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnectorMapper.java

@@ -37,6 +37,11 @@ public final class FileConnectorMapper implements ConnectorMapper<FileConfig, St
         }
     }
 
+    @Override
+    public String getServiceUrl() {
+        return config.getFileDir();
+    }
+
     public FileConfig getConfig() {
         return config;
     }
@@ -88,4 +93,4 @@ public final class FileConnectorMapper implements ConnectorMapper<FileConfig, St
             Assert.isTrue(file.exists(), String.format("found not file '%s'", filePath));
         }
     }
-}
+}

+ 6 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnectorMapper.java

@@ -13,6 +13,11 @@ public final class KafkaConnectorMapper implements ConnectorMapper<KafkaConfig,
         this.client = KafkaUtil.getConnection(config);
     }
 
+    @Override
+    public String getServiceUrl() {
+        return config.getBootstrapServers();
+    }
+
     @Override
     public KafkaConfig getConfig() {
         return config;
@@ -37,4 +42,4 @@ public final class KafkaConnectorMapper implements ConnectorMapper<KafkaConfig,
     public Object clone() throws CloneNotSupportedException {
         return super.clone();
     }
-}
+}

+ 1 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/event/PreloadCompletedEvent.java

@@ -10,6 +10,7 @@ import org.springframework.context.event.ApplicationContextEvent;
  * @Author AE86
  * @Date 2020-08-26 22:45
  */
+@Deprecated
 public final class PreloadCompletedEvent extends ApplicationContextEvent {
 
     /**

+ 57 - 19
dbsyncer-manager/src/main/java/org/dbsyncer/manager/impl/PreloadTemplate.java

@@ -1,28 +1,31 @@
 package org.dbsyncer.manager.impl;
 
-import org.dbsyncer.manager.event.PreloadCompletedEvent;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.manager.ManagerFactory;
+import org.dbsyncer.parser.LogService;
+import org.dbsyncer.parser.LogType;
 import org.dbsyncer.parser.ProfileComponent;
 import org.dbsyncer.parser.command.PreloadCommand;
 import org.dbsyncer.parser.enums.CommandEnum;
 import org.dbsyncer.parser.enums.GroupStrategyEnum;
 import org.dbsyncer.parser.enums.MetaEnum;
+import org.dbsyncer.parser.impl.OperationTemplate;
 import org.dbsyncer.parser.model.ConfigModel;
+import org.dbsyncer.parser.model.Connector;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.Meta;
 import org.dbsyncer.parser.model.OperationConfig;
-import org.dbsyncer.parser.impl.OperationTemplate;
 import org.dbsyncer.plugin.PluginFactory;
+import org.dbsyncer.sdk.spi.ConnectorMapper;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.dbsyncer.storage.query.Query;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationListener;
 import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.stereotype.Component;
@@ -31,6 +34,8 @@ import javax.annotation.Resource;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.stream.Stream;
 
 /**
  * 预加载配置模板
@@ -53,6 +58,9 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
     @Resource
     private ManagerFactory managerFactory;
 
+    @Resource
+    private ConnectorFactory connectorFactory;
+
     @Resource
     private PluginFactory pluginFactory;
 
@@ -60,7 +68,12 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
     private StorageService storageService;
 
     @Resource
-    private ApplicationContext applicationContext;
+    private LogService logService;
+
+    @Resource
+    private Executor generalExecutor;
+
+    private boolean preloadCompleted;
 
     @Override
     public void onApplicationEvent(ContextRefreshedEvent event) {
@@ -70,11 +83,22 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
         // Load plugins
         pluginFactory.loadPlugins();
 
+        // Load connectorMappers
+        loadConnectorMapper();
+
         // Launch drivers
         launch();
 
-        // publish event
-        applicationContext.publishEvent(new PreloadCompletedEvent(applicationContext));
+        preloadCompleted = true;
+    }
+
+    /**
+     * 是否完成预加载配置
+     *
+     * @return
+     */
+    public boolean isPreloadCompleted() {
+        return preloadCompleted;
     }
 
     public void reload(String json) {
@@ -83,18 +107,14 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
             return;
         }
 
-        // Load system
-        reload(map, CommandEnum.PRELOAD_SYSTEM);
-        // Load user
-        reload(map, CommandEnum.PRELOAD_USER);
-        // Load connectors
-        reload(map, CommandEnum.PRELOAD_CONNECTOR);
-        // Load mappings
-        reload(map, CommandEnum.PRELOAD_MAPPING);
-        // Load metas
-        reload(map, CommandEnum.PRELOAD_META);
-        // Load projectGroups
-        reload(map, CommandEnum.PRELOAD_PROJECT_GROUP);
+        // Load configModels
+        Stream.of(CommandEnum.PRELOAD_SYSTEM, CommandEnum.PRELOAD_USER, CommandEnum.PRELOAD_CONNECTOR, CommandEnum.PRELOAD_MAPPING,
+                CommandEnum.PRELOAD_META, CommandEnum.PRELOAD_PROJECT_GROUP).forEach(commandEnum -> reload(map, commandEnum));
+
+        // Load connectorMappers
+        loadConnectorMapper();
+
+        // Launch drivers
         launch();
     }
 
@@ -140,7 +160,7 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
             total += paging.getTotal();
             pageNum++;
         }
-        logger.info("PreLoad {}:{}", modelType, total);
+        logger.info("{}:{}", modelType, total);
     }
 
     private void reload(Map<String, Map> map, CommandEnum commandEnum) {
@@ -172,4 +192,22 @@ public final class PreloadTemplate implements ApplicationListener<ContextRefresh
             }
         }
     }
+
+    private void loadConnectorMapper() {
+        List<Connector> list = profileComponent.getConnectorAll();
+        if (!CollectionUtils.isEmpty(list)) {
+            list.forEach(connector -> {
+                generalExecutor.execute(() -> {
+                    try {
+                        connectorFactory.disconnect(connector.getConfig());
+                        ConnectorMapper mapper = connectorFactory.connect(connector.getConfig());
+                        logger.info("Completed connection {} {}", connector.getConfig().getConnectorType(), mapper.getServiceUrl());
+                    } catch (Exception e) {
+                        logger.error("连接配置异常", e);
+                        logService.log(LogType.ConnectorLog.FAILED, e.getMessage());
+                    }
+                });
+            });
+        }
+    }
 }

+ 7 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/spi/ConnectorMapper.java

@@ -11,6 +11,13 @@ package org.dbsyncer.sdk.spi;
  */
 public interface ConnectorMapper<K, V> extends Cloneable {
 
+    /**
+     * 获取服务地址
+     *
+     * @return
+     */
+    String getServiceUrl();
+
     /**
      * 获取连接配置
      *

+ 10 - 2
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/monitor/MonitorController.java

@@ -15,6 +15,7 @@ import org.dbsyncer.biz.enums.MetricEnum;
 import org.dbsyncer.biz.enums.StatisticEnum;
 import org.dbsyncer.biz.model.MetricResponse;
 import org.dbsyncer.biz.model.Sample;
+import org.dbsyncer.manager.impl.PreloadTemplate;
 import org.dbsyncer.web.controller.BaseController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +61,9 @@ public class MonitorController extends BaseController {
     @Resource
     private SystemConfigService systemConfigService;
 
+    @Resource
+    private PreloadTemplate preloadTemplate;
+
     @Resource
     private MetricsEndpoint metricsEndpoint;
 
@@ -98,12 +102,16 @@ public class MonitorController extends BaseController {
 
     @Scheduled(fixedRate = 10000)
     public void refreshConnectorHealth() {
-        connectorService.refreshHealth();
+        if (preloadTemplate.isPreloadCompleted()) {
+            connectorService.refreshHealth();
+        }
     }
 
     @Scheduled(fixedRate = 30000)
     public void deleteExpiredDataAndLog() {
-        monitorService.deleteExpiredDataAndLog();
+        if (preloadTemplate.isPreloadCompleted()) {
+            monitorService.deleteExpiredDataAndLog();
+        }
     }
 
     @GetMapping("/queryData")