Browse Source

add thread pool

AE86 5 years ago
parent
commit
3c94b0a11b

+ 19 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/MonitorService.java

@@ -0,0 +1,19 @@
+package org.dbsyncer.biz;
+
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/1/14 0:02
+ */
+public interface MonitorService {
+
+    /**
+     * 获取线程信息
+     *
+     * @return
+     */
+    Map getThreadInfo();
+
+}

+ 25 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/MonitorServiceImpl.java

@@ -0,0 +1,25 @@
+package org.dbsyncer.biz.impl;
+
+import org.dbsyncer.biz.MonitorService;
+import org.dbsyncer.monitor.Monitor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/27 10:20
+ */
+@Service
+public class MonitorServiceImpl implements MonitorService {
+
+    @Autowired
+    private Monitor monitor;
+
+    @Override
+    public Map getThreadInfo() {
+        return monitor.getThreadInfo();
+    }
+}

+ 13 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/DateFormatUtil.java

@@ -0,0 +1,13 @@
+package org.dbsyncer.common.util;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+public abstract class DateFormatUtil {
+    public DateFormatUtil() {
+    }
+
+    public static String getCurrentDateTime() {
+        return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+    }
+}

+ 1 - 15
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -19,7 +19,7 @@ import java.util.Map;
  * @version 1.0.0
  * @date 2019/9/30 20:31
  */
-public interface Manager {
+public interface Manager extends TaskExecutor{
 
     boolean alive(ConnectorConfig config);
 
@@ -88,18 +88,4 @@ public interface Manager {
     // Plugin
     List<Plugin> getPluginAll();
 
-    /**
-     * 启动同步任务
-     *
-     * @param mapping
-     */
-    void start(Mapping mapping);
-
-    /**
-     * 关闭同步任务
-     *
-     * @param mapping
-     */
-    void close(Mapping mapping);
-
 }

+ 59 - 6
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.manager;
 
+import org.dbsyncer.common.event.ClosedEvent;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.MetaInfo;
 import org.dbsyncer.connector.enums.ConnectorEnum;
@@ -10,18 +11,26 @@ import org.dbsyncer.manager.config.OperationConfig;
 import org.dbsyncer.manager.config.QueryConfig;
 import org.dbsyncer.manager.enums.GroupStrategyEnum;
 import org.dbsyncer.manager.enums.HandlerEnum;
+import org.dbsyncer.manager.extractor.Extractor;
 import org.dbsyncer.manager.template.impl.OperationTemplate;
 import org.dbsyncer.manager.template.impl.PreloadTemplate;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.enums.ConvertEnum;
+import org.dbsyncer.parser.enums.MetaEnum;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.plugin.PluginFactory;
 import org.dbsyncer.plugin.config.Plugin;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ApplicationListener;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
 
 import java.util.List;
 import java.util.Map;
@@ -32,7 +41,7 @@ import java.util.Map;
  * @date 2019/9/16 23:59
  */
 @Component
-public class ManagerFactory implements Manager {
+public class ManagerFactory implements Manager, ApplicationContextAware, ApplicationListener<ClosedEvent> {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -46,7 +55,7 @@ public class ManagerFactory implements Manager {
     private Listener listener;
 
     @Autowired
-    private Executor executor;
+    private TaskExecutor executor;
 
     @Autowired
     private PreloadTemplate preloadTemplate;
@@ -216,16 +225,60 @@ public class ManagerFactory implements Manager {
         return pluginFactory.getPluginAll();
     }
 
+    private Map<String, Extractor> map;
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        map = applicationContext.getBeansOfType(Extractor.class);
+    }
+
     @Override
     public void start(Mapping mapping) {
-        // 启动任务
-        executor.start(mapping);
+        // 获取数据源连接器
+        Connector connector = getConnector(mapping.getSourceConnectorId());
+        Assert.notNull(connector, "数据源配置不能为空.");
+
+        Extractor extractor = getExtractor(mapping);
+
+        // 标记运行中
+        changeMetaState(mapping.getMetaId(), MetaEnum.RUNNING);
+
+        extractor.asyncStart(mapping);
     }
 
     @Override
     public void close(Mapping mapping) {
-        // 关闭任务
-        executor.close(mapping);
+        Extractor extractor = getExtractor(mapping);
+
+        // 标记停止中
+        String metaId = mapping.getMetaId();
+        changeMetaState(metaId, MetaEnum.STOPPING);
+
+        extractor.asyncClose(metaId);
+    }
+
+    @Override
+    public void onApplicationEvent(ClosedEvent event) {
+        // 异步监听任务关闭事件
+        changeMetaState(event.getId(), MetaEnum.READY);
+    }
+
+    private Extractor getExtractor(Mapping mapping) {
+        Assert.notNull(mapping, "驱动不能为空");
+        String model = mapping.getModel();
+        String metaId = mapping.getMetaId();
+        Assert.hasText(model, "同步方式不能为空");
+        Assert.hasText(metaId, "任务ID不能为空");
+
+        Extractor extractor = map.get(model.concat("Extractor"));
+        Assert.notNull(extractor, String.format("未知的同步方式: %s", model));
+        return extractor;
+    }
+
+    private void changeMetaState(String metaId, MetaEnum metaEnum){
+        Meta meta = getMeta(metaId);
+        meta.setState(metaEnum.getCode());
+        editMeta(meta);
     }
 
 }

+ 28 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/TaskExecutor.java

@@ -0,0 +1,28 @@
+package org.dbsyncer.manager;
+
+import org.dbsyncer.parser.model.Mapping;
+
+/**
+ * 同步任务执行器
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2020/04/26 16:32
+ */
+public interface TaskExecutor {
+
+    /**
+     * 启动同步任务
+     *
+     * @param mapping
+     */
+    void start(Mapping mapping);
+
+    /**
+     * 关闭同步任务
+     *
+     * @param mapping
+     */
+    void close(Mapping mapping);
+
+}

+ 39 - 0
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/AbstractExtractor.java

@@ -0,0 +1,39 @@
+package org.dbsyncer.manager.extractor;
+
+import org.dbsyncer.common.event.ClosedEvent;
+import org.dbsyncer.parser.model.Mapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+
+public abstract class AbstractExtractor implements Extractor {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private ApplicationContext applicationContext;
+
+    protected abstract void doTask(Mapping mapping);
+
+    @Override
+    public void asyncStart(Mapping mapping) {
+        String metaId = mapping.getMetaId();
+        logger.info("启动任务:{}", metaId);
+
+        this.doTask(mapping);
+
+        close(metaId);
+    }
+
+    @Override
+    public void asyncClose(String metaId) {
+        close(metaId);
+    }
+
+    private void close(String metaId) {
+        logger.info("结束任务:{}", metaId);
+        applicationContext.publishEvent(new ClosedEvent(applicationContext, metaId));
+    }
+
+}

+ 6 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/Extractor.java

@@ -1,11 +1,14 @@
 package org.dbsyncer.manager.extractor;
 
 import org.dbsyncer.parser.model.Mapping;
+import org.springframework.scheduling.annotation.Async;
 
 public interface Extractor {
 
-    void start(Mapping mapping);
+    @Async("taskExecutor")
+    void asyncStart(Mapping mapping);
 
-    void close(String metaId);
+    @Async("taskExecutor")
+    void asyncClose(String metaId);
 
-}
+}

+ 4 - 30
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/FullExtractor.java

@@ -1,18 +1,10 @@
 package org.dbsyncer.manager.extractor;
 
-import org.dbsyncer.common.event.ClosedEvent;
-import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.model.Mapping;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
 
-import java.util.concurrent.TimeUnit;
-
 /**
  * 全量同步
  *
@@ -21,34 +13,16 @@ import java.util.concurrent.TimeUnit;
  * @date 2020/04/26 15:28
  */
 @Component
-public class FullExtractor implements Extractor {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
+public class FullExtractor extends AbstractExtractor {
 
     @Autowired
     private Parser parser;
 
-    @Autowired
-    private ApplicationContext applicationContext;
-
-    @Override
-    public void start(Mapping mapping) {
-        new Thread(()->{
-            String metaId = mapping.getMetaId();
-            logger.info("模拟同步...等待5s");
-            try {
-                TimeUnit.SECONDS.sleep(5);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-            logger.info("同步结束");
-            applicationContext.publishEvent(new ClosedEvent(applicationContext, metaId));
-        }).start();
-    }
-
     @Override
-    public void close(String metaId) {
+    protected void doTask(Mapping mapping) {
+        // 获取数据源连接配置
 
+        // 获取执行命令
     }
 
 }

+ 3 - 16
dbsyncer-manager/src/main/java/org/dbsyncer/manager/extractor/IncrementExtractor.java

@@ -1,12 +1,8 @@
 package org.dbsyncer.manager.extractor;
 
-import org.dbsyncer.common.event.ClosedEvent;
-import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.listener.Listener;
-import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.parser.model.Mapping;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
 
 /**
@@ -17,23 +13,14 @@ import org.springframework.stereotype.Component;
  * @date 2020/04/26 15:28
  */
 @Component
-public class IncrementExtractor implements Extractor {
+public class IncrementExtractor extends AbstractExtractor {
 
     @Autowired
     private Listener listener;
 
-    @Autowired
-    private ApplicationContext applicationContext;
-
-    @Override
-    public void start(Mapping mapping) {
-        final String metaId = mapping.getMetaId();
-        applicationContext.publishEvent(new ClosedEvent(applicationContext, metaId));
-    }
-
     @Override
-    public void close(String metaId) {
-
+    protected void doTask(Mapping mapping) {
+        // 获取数据源连接配置
     }
 
 }

+ 3 - 0
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/Monitor.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.monitor;
 
+import java.util.Map;
+
 /**
  * @author AE86
  * @version 1.0.0
@@ -9,4 +11,5 @@ public interface Monitor {
 
     boolean alive(String id);
 
+    Map getThreadInfo();
 }

+ 27 - 0
dbsyncer-monitor/src/main/java/org/dbsyncer/monitor/MonitorFactory.java

@@ -1,13 +1,20 @@
 package org.dbsyncer.monitor;
 
+import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.model.Connector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.cache.annotation.Cacheable;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+
 /**
  * @author AE86
  * @version 1.0.0
@@ -21,6 +28,9 @@ public class MonitorFactory implements Monitor {
     @Autowired
     private Manager manager;
 
+    @Autowired
+    private Executor taskExecutor;
+
     @Override
     @Cacheable(value = "connector", keyGenerator = "cacheKeyGenerator")
     public boolean alive(String id) {
@@ -28,4 +38,21 @@ public class MonitorFactory implements Monitor {
         return null != connector ? manager.alive(connector.getConfig()) : false;
     }
 
+    @Override
+    public Map getThreadInfo() {
+        Map map = new HashMap();
+        if(taskExecutor instanceof ThreadPoolTaskExecutor){
+            ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) taskExecutor;
+            ThreadPoolExecutor threadPoolExecutor = threadTask.getThreadPoolExecutor();
+
+            map.put("提交任务数", threadPoolExecutor.getTaskCount());
+            map.put("完成任务数", threadPoolExecutor.getCompletedTaskCount());
+            map.put("当前有多少线程正在处理任务", threadPoolExecutor.getActiveCount());
+            map.put("还剩多少个任务未执行", threadPoolExecutor.getQueue().size());
+            map.put("当前可用队列长度", threadPoolExecutor.getQueue().remainingCapacity());
+            map.put("当前时间", DateFormatUtil.getCurrentDateTime());
+        }
+        return map;
+    }
+
 }

+ 2 - 0
dbsyncer-web/src/main/java/org/dbsyncer/web/Application.java

@@ -4,8 +4,10 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
 import org.springframework.cache.annotation.EnableCaching;
+import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
+@EnableAsync
 @EnableScheduling
 @EnableCaching
 @SpringBootApplication(scanBasePackages ="org.dbsyncer", exclude = DataSourceAutoConfiguration.class)

+ 49 - 0
dbsyncer-web/src/main/java/org/dbsyncer/web/config/TaskPoolConfig.java

@@ -0,0 +1,49 @@
+package org.dbsyncer.web.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * @version 1.0.0
+ * @author AE86
+ * @date 2020-04-26 23:40
+ */
+@Configuration
+public class TaskPoolConfig {
+
+    @Bean("taskExecutor")
+    public Executor taskExecutor() {
+        //注意这一行日志:2. do submit,taskCount [101], completedTaskCount [87], activeCount [5], queueSize [9]
+        //这说明提交任务到线程池的时候,调用的是submit(Callable task)这个方法,当前已经提交了101个任务,完成了87个,当前有5个线程在处理任务,还剩9个任务在队列中等待,线程池的基本情况一路了然;
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        //核心线程数10:线程池创建时候初始化的线程数
+        executor.setCorePoolSize(10);
+        //最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
+        //maxPoolSize 当系统负载大道最大值时,核心线程数已无法按时处理完所有任务,这是就需要增加线程.每秒200个任务需要20个线程,那么当每秒1000个任务时,则需要(1000-queueCapacity)*(20/200),即60个线程,可将maxPoolSize设置为60;
+        executor.setMaxPoolSize(30);
+        //缓冲队列200:用来缓冲执行任务的队列
+        executor.setQueueCapacity(400);
+        //允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
+        executor.setKeepAliveSeconds(60);
+        //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
+        executor.setThreadNamePrefix("taskExecutor");
+        //理线程池对拒绝任务的处策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
+        /*CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
+        这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
+        AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException
+        这种策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录)
+        DiscardPolicy:不能执行的任务将被删除
+        这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
+        DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
+        该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心*/
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+        executor.setWaitForTasksToCompleteOnShutdown(true);
+        executor.setAwaitTerminationSeconds(60);
+        return executor;
+    }
+
+}

+ 7 - 1
dbsyncer-web/src/main/java/org/dbsyncer/web/controller/monitor/MonitorController.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.web.controller.monitor;
 
+import org.dbsyncer.biz.MonitorService;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Controller;
 import org.springframework.ui.ModelMap;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -10,9 +12,13 @@ import javax.servlet.http.HttpServletRequest;
 @RequestMapping("/monitor")
 public class MonitorController {
 
+    @Autowired
+    private MonitorService monitorService;
+
     @RequestMapping("")
     public String index(HttpServletRequest request, ModelMap model) {
+        model.put("threadInfo", monitorService.getThreadInfo());
         return "monitor/monitor.html";
     }
 
-}
+}

+ 16 - 1
dbsyncer-web/src/main/resources/templates/monitor/monitor.html

@@ -1,12 +1,27 @@
 <!DOCTYPE html>
 <html xmlns="http://www.w3.org/1999/xhtml"
-      lang="zh-CN">
+      xmlns:th="http://www.thymeleaf.org" lang="zh-CN">
 
 <!-- Monitor -->
 <div class="container-fluid">
 
     图表信息
     <p>CPU 内存 硬盘 堆内存 线程数</p>
+    <table class="table table-hover">
+        <caption>任务线程</caption>
+        <thead>
+        <tr>
+            <th>序号</th>
+            <th>指标</th>
+        </tr>
+        </thead>
+        <tr th:each="item,userStat:${threadInfo}">
+            <td th:text="${userStat.index}+1"></td>
+            <td th:text="${userStat.current.key}"></td><!-- key-->
+            <td th:text="${userStat.current.value}"></td><!-- value-->
+        </tr>
+    </table>
+
     <p>驱动增量/全量同步数据</p>
     <p>操作日志</p>