ScheduledTaskServiceImpl.java 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package org.dbsyncer.common.scheduled;
  2. import org.dbsyncer.common.CommonException;
  3. import org.dbsyncer.common.util.UUIDUtil;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.beans.factory.DisposableBean;
  7. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  8. import org.springframework.scheduling.support.CronTrigger;
  9. import org.springframework.stereotype.Component;
  10. import javax.annotation.Resource;
  11. import java.util.Map;
  12. import java.util.concurrent.ConcurrentHashMap;
  13. import java.util.concurrent.ScheduledFuture;
  14. /**
  15. * @version 1.0.0
  16. * @Author AE86
  17. * @Date 2020-05-24 22:06
  18. */
  19. @Component
  20. public class ScheduledTaskServiceImpl implements ScheduledTaskService, DisposableBean {
  21. private final Logger logger = LoggerFactory.getLogger(getClass());
  22. /**
  23. * 定时任务线程池
  24. */
  25. @Resource
  26. private ThreadPoolTaskScheduler taskScheduler;
  27. private Map<String, ScheduledFuture> map = new ConcurrentHashMap<>();
  28. @Override
  29. public void start(String key, String cron, ScheduledTaskJob job) {
  30. logger.info("[{}], Started task [{}]", cron, job.getClass().getName());
  31. apply(key, () -> taskScheduler.schedule(job, (trigger) -> new CronTrigger(cron).nextExecutionTime(trigger)));
  32. }
  33. @Override
  34. public void start(String key, long period, ScheduledTaskJob job) {
  35. logger.info("[period={}], Started task [{}]", period, job.getClass().getName());
  36. apply(key, () -> taskScheduler.scheduleAtFixedRate(job, period));
  37. }
  38. @Override
  39. public void start(String cron, ScheduledTaskJob job) {
  40. start(UUIDUtil.getUUID(), cron, job);
  41. }
  42. @Override
  43. public void start(long period, ScheduledTaskJob job) {
  44. start(UUIDUtil.getUUID(), period, job);
  45. }
  46. @Override
  47. public void stop(String key) {
  48. ScheduledFuture job = map.get(key);
  49. if (null != job) {
  50. job.cancel(true);
  51. map.remove(key);
  52. logger.info("Stopped task [{}]", key);
  53. }
  54. }
  55. private void apply(String key, ScheduledFutureMapper scheduledFutureMapper) {
  56. final ScheduledFuture scheduledFuture = map.get(key);
  57. if (null != scheduledFuture && !scheduledFuture.isCancelled()) {
  58. String msg = String.format(">>>>>> 任务已启动 %s >>>>>>", key);
  59. logger.error(msg);
  60. throw new CommonException(msg);
  61. }
  62. map.putIfAbsent(key, scheduledFutureMapper.apply());
  63. }
  64. @Override
  65. public void destroy() {
  66. map.keySet().forEach(this::stop);
  67. }
  68. private interface ScheduledFutureMapper {
  69. /**
  70. * 返回定时任务
  71. *
  72. * @return
  73. */
  74. ScheduledFuture apply();
  75. }
  76. }