FullPuller.java 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package org.dbsyncer.manager.puller;
  2. import org.dbsyncer.common.util.NumberUtil;
  3. import org.dbsyncer.common.util.StringUtil;
  4. import org.dbsyncer.connector.util.PrimaryKeyUtil;
  5. import org.dbsyncer.manager.Manager;
  6. import org.dbsyncer.parser.Parser;
  7. import org.dbsyncer.parser.enums.ParserEnum;
  8. import org.dbsyncer.parser.event.FullRefreshEvent;
  9. import org.dbsyncer.parser.logger.LogService;
  10. import org.dbsyncer.parser.logger.LogType;
  11. import org.dbsyncer.parser.model.Mapping;
  12. import org.dbsyncer.parser.model.Meta;
  13. import org.dbsyncer.parser.model.TableGroup;
  14. import org.dbsyncer.parser.model.Task;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import org.springframework.context.ApplicationListener;
  18. import org.springframework.stereotype.Component;
  19. import org.springframework.util.Assert;
  20. import javax.annotation.Resource;
  21. import java.time.Instant;
  22. import java.util.List;
  23. import java.util.Map;
  24. import java.util.concurrent.ConcurrentHashMap;
  25. import java.util.concurrent.ExecutorService;
  26. import java.util.concurrent.Executors;
  27. /**
  28. * 全量同步
  29. *
  30. * @author AE86
  31. * @version 1.0.0
  32. * @date 2020/04/26 15:28
  33. */
  34. @Component
  35. public class FullPuller extends AbstractPuller implements ApplicationListener<FullRefreshEvent> {
  36. private final Logger logger = LoggerFactory.getLogger(getClass());
  37. @Resource
  38. private Parser parser;
  39. @Resource
  40. private Manager manager;
  41. @Resource
  42. private LogService logService;
  43. private Map<String, Task> map = new ConcurrentHashMap<>();
  44. @Override
  45. public void start(Mapping mapping) {
  46. Thread worker = new Thread(() -> {
  47. final String metaId = mapping.getMetaId();
  48. try {
  49. List<TableGroup> list = manager.getSortedTableGroupAll(mapping.getId());
  50. Assert.notEmpty(list, "映射关系不能为空");
  51. logger.info("开始全量同步:{}, {}", metaId, mapping.getName());
  52. map.putIfAbsent(metaId, new Task(metaId));
  53. final ExecutorService executor = Executors.newFixedThreadPool(mapping.getThreadNum());
  54. Task task = map.get(metaId);
  55. doTask(task, mapping, list, executor);
  56. } catch (Exception e) {
  57. logger.error(e.getMessage());
  58. logService.log(LogType.SystemLog.ERROR, e.getMessage());
  59. } finally {
  60. map.remove(metaId);
  61. publishClosedEvent(metaId);
  62. logger.info("结束全量同步:{}, {}", metaId, mapping.getName());
  63. }
  64. });
  65. worker.setName(new StringBuilder("full-worker-").append(mapping.getId()).toString());
  66. worker.setDaemon(false);
  67. worker.start();
  68. }
  69. @Override
  70. public void close(String metaId) {
  71. Task task = map.get(metaId);
  72. if (null != task) {
  73. task.stop();
  74. }
  75. }
  76. @Override
  77. public void onApplicationEvent(FullRefreshEvent event) {
  78. // 异步监听任务刷新事件
  79. flush(event.getTask());
  80. }
  81. private void doTask(Task task, Mapping mapping, List<TableGroup> list, ExecutorService executorService) {
  82. // 记录开始时间
  83. long now = Instant.now().toEpochMilli();
  84. task.setBeginTime(now);
  85. task.setEndTime(now);
  86. // 获取上次同步点
  87. Meta meta = manager.getMeta(task.getId());
  88. Map<String, String> snapshot = meta.getSnapshot();
  89. task.setPageIndex(NumberUtil.toInt(snapshot.get(ParserEnum.PAGE_INDEX.getCode()), ParserEnum.PAGE_INDEX.getDefaultValue()));
  90. // 反序列化游标值类型(通常为数字或字符串类型)
  91. String cursorValue = snapshot.get(ParserEnum.CURSOR.getCode());
  92. task.setCursors(PrimaryKeyUtil.getLastCursors(cursorValue));
  93. task.setTableGroupIndex(NumberUtil.toInt(snapshot.get(ParserEnum.TABLE_GROUP_INDEX.getCode()), ParserEnum.TABLE_GROUP_INDEX.getDefaultValue()));
  94. flush(task);
  95. int i = task.getTableGroupIndex();
  96. while (i < list.size()) {
  97. parser.execute(task, mapping, list.get(i), executorService);
  98. if (!task.isRunning()) {
  99. break;
  100. }
  101. task.setPageIndex(ParserEnum.PAGE_INDEX.getDefaultValue());
  102. task.setCursors(null);
  103. task.setTableGroupIndex(++i);
  104. flush(task);
  105. }
  106. // 记录结束时间
  107. task.setEndTime(Instant.now().toEpochMilli());
  108. task.setTableGroupIndex(ParserEnum.TABLE_GROUP_INDEX.getDefaultValue());
  109. flush(task);
  110. }
  111. private void flush(Task task) {
  112. Meta meta = manager.getMeta(task.getId());
  113. Assert.notNull(meta, "检查meta为空.");
  114. // 全量的过程中,有新数据则更新总数
  115. long finished = meta.getSuccess().get() + meta.getFail().get();
  116. if (meta.getTotal().get() < finished) {
  117. meta.getTotal().set(finished);
  118. }
  119. meta.setBeginTime(task.getBeginTime());
  120. meta.setEndTime(task.getEndTime());
  121. Map<String, String> snapshot = meta.getSnapshot();
  122. snapshot.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(task.getPageIndex()));
  123. snapshot.put(ParserEnum.CURSOR.getCode(), StringUtil.join(task.getCursors(), ","));
  124. snapshot.put(ParserEnum.TABLE_GROUP_INDEX.getCode(), String.valueOf(task.getTableGroupIndex()));
  125. manager.editConfigModel(meta);
  126. }
  127. }