FullPuller.java 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package org.dbsyncer.manager.puller;
  2. import org.dbsyncer.manager.Manager;
  3. import org.dbsyncer.parser.Parser;
  4. import org.dbsyncer.parser.event.FullRefreshEvent;
  5. import org.dbsyncer.parser.logger.LogService;
  6. import org.dbsyncer.parser.logger.LogType;
  7. import org.dbsyncer.parser.model.Mapping;
  8. import org.dbsyncer.parser.model.Meta;
  9. import org.dbsyncer.parser.model.TableGroup;
  10. import org.dbsyncer.parser.model.Task;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.context.ApplicationListener;
  15. import org.springframework.stereotype.Component;
  16. import org.springframework.util.Assert;
  17. import java.time.Instant;
  18. import java.util.List;
  19. import java.util.Map;
  20. import java.util.concurrent.ConcurrentHashMap;
  21. import java.util.concurrent.ExecutorService;
  22. import java.util.concurrent.Executors;
  23. /**
  24. * 全量同步
  25. *
  26. * @author AE86
  27. * @version 1.0.0
  28. * @date 2020/04/26 15:28
  29. */
  30. @Component
  31. public class FullPuller extends AbstractPuller implements ApplicationListener<FullRefreshEvent> {
  32. private final Logger logger = LoggerFactory.getLogger(getClass());
  33. @Autowired
  34. private Parser parser;
  35. @Autowired
  36. private Manager manager;
  37. @Autowired
  38. private LogService logService;
  39. private Map<String, Task> map = new ConcurrentHashMap<>();
  40. @Override
  41. public void start(Mapping mapping) {
  42. FullWorker worker = new FullWorker(mapping);
  43. worker.setName(new StringBuilder("full-worker-").append(mapping.getId()).toString());
  44. worker.setDaemon(false);
  45. worker.start();
  46. }
  47. @Override
  48. public void close(String metaId) {
  49. Task task = map.get(metaId);
  50. if (null != task) {
  51. task.stop();
  52. }
  53. }
  54. @Override
  55. public void onApplicationEvent(FullRefreshEvent event) {
  56. // 异步监听任务刷新事件
  57. flush(event.getTask());
  58. }
  59. private void doTask(Task task, Mapping mapping, List<TableGroup> list, ExecutorService executorService) {
  60. // 记录开始时间
  61. long now = Instant.now().toEpochMilli();
  62. task.setBeginTime(now);
  63. task.setEndTime(now);
  64. flush(task);
  65. for (TableGroup t : list) {
  66. if (!task.isRunning()) {
  67. break;
  68. }
  69. parser.execute(task, mapping, t, executorService);
  70. }
  71. // 记录结束时间
  72. task.setEndTime(Instant.now().toEpochMilli());
  73. flush(task);
  74. }
  75. private void flush(Task task) {
  76. Meta meta = manager.getMeta(task.getId());
  77. Assert.notNull(meta, "检查meta为空.");
  78. meta.setBeginTime(task.getBeginTime());
  79. meta.setEndTime(task.getEndTime());
  80. manager.editMeta(meta);
  81. }
  82. final class FullWorker extends Thread {
  83. Mapping mapping;
  84. List<TableGroup> list;
  85. public FullWorker(Mapping mapping) {
  86. this.mapping = mapping;
  87. this.list = manager.getTableGroupAll(mapping.getId());
  88. Assert.notEmpty(list, "映射关系不能为空");
  89. }
  90. @Override
  91. public void run() {
  92. final String metaId = mapping.getMetaId();
  93. logger.info("开始全量同步:{}, {}", metaId, mapping.getName());
  94. try {
  95. map.putIfAbsent(metaId, new Task(metaId));
  96. final ExecutorService executor = Executors.newFixedThreadPool(mapping.getThreadNum());
  97. Task task = map.get(metaId);
  98. doTask(task, mapping, list, executor);
  99. } catch (Exception e) {
  100. logger.error(e.getMessage());
  101. logService.log(LogType.SystemLog.ERROR, e.getMessage());
  102. } finally {
  103. map.remove(metaId);
  104. publishClosedEvent(metaId);
  105. logger.info("结束全量同步:{}, {}", metaId, mapping.getName());
  106. }
  107. }
  108. }
  109. }