AE86 3 年之前
父节点
当前提交
8373d4f98a

+ 2 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskService.java

@@ -19,6 +19,8 @@ public interface ScheduledTaskService {
 
     void start(String key, long period, ScheduledTaskJob job);
 
+    void start(String cron, ScheduledTaskJob job);
+
     void stop(String key);
 
 }

+ 16 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java

@@ -1,8 +1,10 @@
 package org.dbsyncer.common.scheduled;
 
 import org.dbsyncer.common.CommonException;
+import org.dbsyncer.common.util.UUIDUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 import org.springframework.scheduling.support.CronTrigger;
@@ -18,7 +20,7 @@ import java.util.concurrent.ScheduledFuture;
  * @Date 2020-05-24 22:06
  */
 @Component
-public class ScheduledTaskServiceImpl implements ScheduledTaskService {
+public class ScheduledTaskServiceImpl implements ScheduledTaskService, DisposableBean {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -32,20 +34,28 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService {
 
     @Override
     public void start(String key, String cron, ScheduledTaskJob job) {
+        logger.info("{}-[{}], Started task [{}]", key, cron, job.getClass().getName());
         apply(key, () -> taskScheduler.schedule(job, (trigger) -> new CronTrigger(cron).nextExecutionTime(trigger)));
     }
 
     @Override
     public void start(String key, long period, ScheduledTaskJob job) {
+        logger.info("[period={}], Started task [{}]", period, key);
         apply(key, () -> taskScheduler.scheduleAtFixedRate(job, period));
     }
 
+    @Override
+    public void start(String cron, ScheduledTaskJob job) {
+        start(UUIDUtil.getUUID(), cron, job);
+    }
+
     @Override
     public void stop(String key) {
         ScheduledFuture job = map.get(key);
         if (null != job) {
             job.cancel(true);
             map.remove(key);
+            logger.info("Stopped task [{}]", key);
         }
     }
 
@@ -59,6 +69,11 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService {
         map.putIfAbsent(key, scheduledFutureMapper.apply());
     }
 
+    @Override
+    public void destroy() {
+        map.keySet().forEach(this::stop);
+    }
+
     private interface ScheduledFutureMapper {
         /**
          * 返回定时任务

+ 2 - 15
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -6,7 +6,6 @@ import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.Field;
@@ -31,7 +30,6 @@ import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
@@ -53,7 +51,7 @@ import java.util.stream.Collectors;
  * @date 2020/04/26 15:28
  */
 @Component
-public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob, DisposableBean {
+public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -78,16 +76,11 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
     @Autowired
     private Executor taskExecutor;
 
-    private String key;
-
     private Map<String, Extractor> map = new ConcurrentHashMap<>();
 
     @PostConstruct
     private void init() {
-        key = UUIDUtil.getUUID();
-        String cron = "*/10 * * * * ?";
-        scheduledTaskService.start(key, cron, this);
-        logger.info("[{}], Started scheduled task", cron);
+        scheduledTaskService.start("*/10 * * * * ?", this);
     }
 
     @Override
@@ -137,12 +130,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob,
         map.forEach((k, v) -> v.flushEvent());
     }
 
-    @Override
-    public void destroy() {
-        scheduledTaskService.stop(key);
-        logger.info("Stopped scheduled task.");
-    }
-
     private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta)
             throws InstantiationException, IllegalAccessException {
         ConnectorConfig connectorConfig = connector.getConfig();

+ 2 - 15
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushServiceImpl.java

@@ -4,7 +4,6 @@ import com.alibaba.fastjson.JSONException;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.util.JsonUtil;
-import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.storage.SnowflakeIdWorker;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
@@ -12,7 +11,6 @@ import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -35,7 +33,7 @@ import java.util.stream.Collectors;
  * @date 2020/05/19 18:38
  */
 @Component
-public class FlushServiceImpl implements FlushService, ScheduledTaskJob, DisposableBean {
+public class FlushServiceImpl implements FlushService, ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -59,14 +57,9 @@ public class FlushServiceImpl implements FlushService, ScheduledTaskJob, Disposa
 
     private volatile boolean running;
 
-    private String key;
-
     @PostConstruct
     private void init() {
-        key = UUIDUtil.getUUID();
-        String cron = "*/3 * * * * ?";
-        scheduledTaskService.start(key, cron, this);
-        logger.info("[{}], Started scheduled task", cron);
+        scheduledTaskService.start("*/3 * * * * ?", this);
     }
 
     @Override
@@ -154,12 +147,6 @@ public class FlushServiceImpl implements FlushService, ScheduledTaskJob, Disposa
         }
     }
 
-    @Override
-    public void destroy() {
-        scheduledTaskService.stop(key);
-        logger.info("Stopped scheduled task.");
-    }
-
     final class Task {
         String metaId;
         List<Map> list;

+ 1 - 1
dbsyncer-web/src/main/resources/application.properties

@@ -4,7 +4,7 @@ server.port=18686
 #web
 dbsyncer.web.login.username=admin
 dbsyncer.web.login.password=0DPiKuNIrrVmD8IUCuw1hQxNqZc=
-dbsyncer.web.thread.pool.core.size=32
+dbsyncer.web.thread.pool.core.size=10
 dbsyncer.web.thread.pool.queue.capacity=5000
 server.servlet.session.timeout=1800
 server.servlet.context-path=/