AE86 5 年 前
コミット
5dd76dc10c

+ 0 - 8
dbsyncer-biz/src/main/java/org/dbsyncer/biz/ConnectorService.java

@@ -12,14 +12,6 @@ import java.util.Map;
  */
 public interface ConnectorService {
 
-    /**
-     * 检查连接器是否可用
-     *
-     * @param params
-     * @return
-     */
-    boolean alive(Map<String, String> params);
-
     /**
      * 新增连接器
      *

+ 0 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/mapping/MetaChecker.java

@@ -2,7 +2,6 @@ package org.dbsyncer.biz.checker.impl.mapping;
 
 import org.dbsyncer.biz.BizException;
 import org.dbsyncer.biz.checker.AbstractChecker;
-import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.enums.MetaEnum;
 import org.dbsyncer.parser.model.ConfigModel;
@@ -15,7 +14,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;

+ 0 - 5
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/ConnectorServiceImpl.java

@@ -31,11 +31,6 @@ public class ConnectorServiceImpl implements ConnectorService {
     @Autowired
     private Checker connectorChecker;
 
-    @Override
-    public boolean alive(Map<String, String> params) {
-        return manager.alive(null);
-    }
-
     @Override
     public String add(Map<String, String> params) {
         ConfigModel model = connectorChecker.checkAddConfigModel(params);

+ 21 - 21
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MappingServiceImpl.java

@@ -8,6 +8,7 @@ import org.dbsyncer.biz.vo.MappingVo;
 import org.dbsyncer.biz.vo.MetaVo;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.manager.Manager;
+import org.dbsyncer.monitor.Monitor;
 import org.dbsyncer.parser.enums.MetaEnum;
 import org.dbsyncer.parser.enums.ModelEnum;
 import org.dbsyncer.parser.model.*;
@@ -16,7 +17,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 
@@ -38,12 +38,18 @@ public class MappingServiceImpl implements MappingService {
     @Autowired
     private Manager manager;
 
+    @Autowired
+    private Monitor monitor;
+
     @Autowired
     private Checker mappingChecker;
 
     @Autowired
     private Checker metaChecker;
 
+    // 驱动启停锁
+    private final static Object LOCK = new Object();
+
     @Override
     public String add(Map<String, String> params) {
         ConfigModel model = mappingChecker.checkAddConfigModel(params);
@@ -94,8 +100,11 @@ public class MappingServiceImpl implements MappingService {
     public String start(String id) {
         Map<String, String> params = new HashMap<>();
         params.put(ConfigConstant.CONFIG_MODEL_ID, id);
-        ConfigModel model = metaChecker.checkAddConfigModel(params);
-        manager.addMeta(model);
+
+        synchronized (LOCK){
+            ConfigModel model = metaChecker.checkAddConfigModel(params);
+            manager.addMeta(model);
+        }
         return "驱动启动成功";
     }
 
@@ -104,12 +113,13 @@ public class MappingServiceImpl implements MappingService {
         Mapping mapping = manager.getMapping(id);
         Assert.notNull(mapping, "驱动不存在.");
 
-        String metaId = mapping.getMetaId();
-        Meta meta = manager.getMeta(metaId);
-        if (null != meta) {
-            manager.removeMeta(metaId);
-        } else {
-            throw new BizException("驱动已停止.");
+        synchronized (LOCK){
+            String metaId = mapping.getMetaId();
+            if (null != manager.getMeta(metaId)) {
+                manager.removeMeta(metaId);
+            } else {
+                throw new BizException("驱动已停止.");
+            }
         }
         return "驱动停止成功";
     }
@@ -124,23 +134,13 @@ public class MappingServiceImpl implements MappingService {
         return temp;
     }
 
-    boolean running = false;
-
-    /**
-     * 定时推送消息
-     */
-    @Scheduled(fixedRate = 5000)
-    public void callback() {
-        running = running ? false : true;
-    }
-
     private MappingVo convertMapping2Vo(Mapping mapping) {
         Assert.notNull(mapping, "Mapping can not be null.");
         Connector s = manager.getConnector(mapping.getSourceConnectorId());
         Connector t = manager.getConnector(mapping.getTargetConnectorId());
-        ConnectorVo sConn = new ConnectorVo(running);
+        ConnectorVo sConn = new ConnectorVo(monitor.alive(s.getId()));
         BeanUtils.copyProperties(s, sConn);
-        ConnectorVo tConn = new ConnectorVo(running);
+        ConnectorVo tConn = new ConnectorVo(monitor.alive(t.getId()));
         BeanUtils.copyProperties(t, tConn);
 
         boolean isRunning = null != manager.getMeta(mapping.getMetaId());

+ 16 - 1
dbsyncer-common/pom.xml

@@ -40,6 +40,15 @@
             <groupId>org.springframework</groupId>
             <artifactId>spring-core</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-cache</artifactId>
+        </dependency>
+        <!-- caffeine -->
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+        </dependency>
 
         <!-- apache 字符工具类 -->
         <dependency>
@@ -57,6 +66,12 @@
             <artifactId>json</artifactId>
         </dependency>
 
+        <!-- lombok -->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+
     </dependencies>
 
-</project>
+</project>

+ 12 - 0
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/Monitor.java

@@ -0,0 +1,12 @@
+package org.dbsyncer.monitor;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/23 11:30
+ */
+public interface Monitor {
+
+    boolean alive(String id);
+
+}

+ 34 - 0
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -0,0 +1,34 @@
+package org.dbsyncer.monitor;
+
+import org.dbsyncer.manager.Manager;
+import org.dbsyncer.parser.model.Connector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.annotation.Cacheable;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/23 11:30
+ */
+@Component
+public class MonitorFactory implements Monitor {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private Manager manager;
+
+    @Override
+    @Cacheable(value = "connector", keyGenerator = "cacheKeyGenerator")
+    public boolean alive(String id) {
+        logger.info("{}从DB检查alive,id:{}", LocalDateTime.now(), id);
+        Connector connector = manager.getConnector(id);
+        return null != connector ? manager.alive(connector.getConfig()) : false;
+    }
+
+}

+ 4 - 1
dbsyncer-web/src/main/java/org/dbsyncer/web/Application.java

@@ -3,9 +3,12 @@ package org.dbsyncer.web;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cache.annotation.EnableCaching;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
 @EnableScheduling
+@EnableCaching
 @SpringBootApplication(scanBasePackages ="org.dbsyncer", exclude = DataSourceAutoConfiguration.class)
 public class Application {
 
@@ -13,4 +16,4 @@ public class Application {
         SpringApplication.run(Application.class, args);
     }
 
-}
+}

+ 84 - 0
dbsyncer-web/src/main/java/org/dbsyncer/web/config/CacheConfiguration.java

@@ -0,0 +1,84 @@
+package org.dbsyncer.web.config;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Ticker;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.cache.CacheManager;
+import org.springframework.cache.caffeine.CaffeineCache;
+import org.springframework.cache.interceptor.KeyGenerator;
+import org.springframework.cache.support.SimpleCacheManager;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/23 11:30
+ */
+@Data
+@Slf4j
+@Configuration
+@ConfigurationProperties(prefix = "dbsyncer.caching")
+public class CacheConfiguration {
+
+    private Map<String, CacheConfig> cache;
+
+    @Data
+    static class CacheConfig {
+        private Integer timeout;
+        private Integer max = 200;
+    }
+
+    @Bean
+    public KeyGenerator cacheKeyGenerator() {
+        return new KeyGenerator(){
+            @Override
+            public Object generate(Object target, Method method, Object... params) {
+                String className = method.getDeclaringClass().getSimpleName();
+                String methodName = method.getName();
+                String paramHash = String.valueOf(Arrays.toString(params).hashCode());
+                String cacheKey = new StringJoiner("_").add(className).add(methodName).add(paramHash).toString();
+                log.debug("generate cache key : {}", cacheKey);
+                return cacheKey;
+            }
+        };
+    }
+
+    @Bean
+    public Ticker ticker() {
+        return Ticker.systemTicker();
+    }
+
+    @Bean
+    public CacheManager cacheManager(Ticker ticker) {
+        SimpleCacheManager manager = new SimpleCacheManager();
+        if (cache != null) {
+            List<CaffeineCache> caches = cache.entrySet()
+                    .stream()
+                    .map(entry -> buildCache(entry.getKey(), entry.getValue(), ticker))
+                    .collect(Collectors.toList());
+            manager.setCaches(caches);
+        }
+        return manager;
+    }
+
+    private CaffeineCache buildCache(String key, CacheConfig config, Ticker ticker) {
+        log.info("Cache key {} specified timeout of {} min, max of {}", key, config.getTimeout(), config.getMax());
+        final Caffeine<Object, Object> caffeineBuilder = Caffeine.newBuilder()
+                .expireAfterWrite(config.getTimeout(), TimeUnit.SECONDS)
+                .maximumSize(config.getMax())
+                .ticker(ticker);
+        return new CaffeineCache(key, caffeineBuilder.build());
+    }
+
+}

+ 0 - 12
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/index/ConnectorController.java

@@ -39,18 +39,6 @@ public class ConnectorController extends BaseController {
         return "connector/edit";
     }
 
-    @PostMapping(value = "/alive")
-    @ResponseBody
-    public RestResult alive(HttpServletRequest request) {
-        try {
-            Map<String, String> params = getParams(request);
-            return RestResult.restSuccess(connectorService.alive(params));
-        } catch (Exception e) {
-            logger.error(e.getLocalizedMessage(), e.getClass());
-            return RestResult.restFail(e.getMessage());
-        }
-    }
-
     @PostMapping("/add")
     @ResponseBody
     public RestResult add(HttpServletRequest request) {

+ 3 - 0
dbsyncer-web/src/main/resources/application.properties

@@ -7,6 +7,9 @@ dbsyncer.config.login.username=admin
 dbsyncer.config.login.password=0DPiKuNIrrVmD8IUCuw1hQxNqZc=
 server.servlet.session.timeout=1800
 
+#cache
+dbsyncer.caching.cache.connector.timeout=5
+
 #storage
 dbsyncer.storage.support.disk=true
 #dbsyncer.storage.support.mysql=true