IncrementPuller.java 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. /**
  2. * DBSyncer Copyright 2020-2023 All Rights Reserved.
  3. */
  4. package org.dbsyncer.manager.impl;
  5. import org.dbsyncer.common.util.CollectionUtils;
  6. import org.dbsyncer.connector.base.ConnectorFactory;
  7. import org.dbsyncer.manager.AbstractPuller;
  8. import org.dbsyncer.manager.ManagerException;
  9. import org.dbsyncer.parser.LogService;
  10. import org.dbsyncer.parser.LogType;
  11. import org.dbsyncer.parser.ProfileComponent;
  12. import org.dbsyncer.parser.consumer.impl.LogConsumer;
  13. import org.dbsyncer.parser.consumer.impl.QuartzConsumer;
  14. import org.dbsyncer.parser.event.RefreshOffsetEvent;
  15. import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
  16. import org.dbsyncer.parser.model.Connector;
  17. import org.dbsyncer.parser.model.Mapping;
  18. import org.dbsyncer.parser.model.Meta;
  19. import org.dbsyncer.parser.model.TableGroup;
  20. import org.dbsyncer.sdk.config.ListenerConfig;
  21. import org.dbsyncer.sdk.enums.ListenerTypeEnum;
  22. import org.dbsyncer.sdk.listener.AbstractListener;
  23. import org.dbsyncer.sdk.listener.AbstractQuartzListener;
  24. import org.dbsyncer.sdk.listener.Listener;
  25. import org.dbsyncer.sdk.model.ChangedOffset;
  26. import org.dbsyncer.sdk.model.ConnectorConfig;
  27. import org.dbsyncer.sdk.model.Table;
  28. import org.dbsyncer.sdk.model.TableGroupQuartzCommand;
  29. import org.dbsyncer.common.scheduled.ScheduledTaskJob;
  30. import org.dbsyncer.common.scheduled.ScheduledTaskService;
  31. import org.slf4j.Logger;
  32. import org.slf4j.LoggerFactory;
  33. import org.springframework.context.ApplicationListener;
  34. import org.springframework.stereotype.Component;
  35. import org.springframework.util.Assert;
  36. import javax.annotation.PostConstruct;
  37. import javax.annotation.Resource;
  38. import java.time.Instant;
  39. import java.util.ArrayList;
  40. import java.util.HashSet;
  41. import java.util.List;
  42. import java.util.Map;
  43. import java.util.Set;
  44. import java.util.concurrent.ConcurrentHashMap;
  45. import java.util.stream.Collectors;
  46. /**
  47. * 增量同步
  48. *
  49. * @Version 1.0.0
  50. * @Author AE86
  51. * @Date 2020-04-26 15:28
  52. */
  53. @Component
  54. public final class IncrementPuller extends AbstractPuller implements ApplicationListener<RefreshOffsetEvent>, ScheduledTaskJob {
  55. private final Logger logger = LoggerFactory.getLogger(getClass());
  56. @Resource
  57. private BufferActuatorRouter bufferActuatorRouter;
  58. @Resource
  59. private ScheduledTaskService scheduledTaskService;
  60. @Resource
  61. private ConnectorFactory connectorFactory;
  62. @Resource
  63. private ProfileComponent profileComponent;
  64. @Resource
  65. private LogService logService;
  66. private Map<String, Listener> map = new ConcurrentHashMap<>();
  67. @PostConstruct
  68. private void init() {
  69. scheduledTaskService.start(3000, this);
  70. }
  71. @Override
  72. public void start(Mapping mapping) {
  73. final String mappingId = mapping.getId();
  74. final String metaId = mapping.getMetaId();
  75. logger.info("开始增量同步:{}, {}", metaId, mapping.getName());
  76. Connector connector = profileComponent.getConnector(mapping.getSourceConnectorId());
  77. Assert.notNull(connector, "连接器不能为空.");
  78. List<TableGroup> list = profileComponent.getSortedTableGroupAll(mappingId);
  79. Assert.notEmpty(list, "表映射关系不能为空,请先添加源表到目标表关系.");
  80. Meta meta = profileComponent.getMeta(metaId);
  81. Assert.notNull(meta, "Meta不能为空.");
  82. Thread worker = new Thread(() -> {
  83. try {
  84. long now = Instant.now().toEpochMilli();
  85. meta.setBeginTime(now);
  86. meta.setEndTime(now);
  87. profileComponent.editConfigModel(meta);
  88. map.putIfAbsent(metaId, getListener(mapping, connector, list, meta));
  89. map.get(metaId).start();
  90. } catch (Exception e) {
  91. close(metaId);
  92. logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
  93. logger.error("运行异常,结束增量同步{}:{}", metaId, e.getMessage());
  94. }
  95. });
  96. worker.setName(new StringBuilder("increment-worker-").append(mapping.getId()).toString());
  97. worker.setDaemon(false);
  98. worker.start();
  99. }
  100. @Override
  101. public void close(String metaId) {
  102. Listener listener = map.get(metaId);
  103. if (null != listener) {
  104. bufferActuatorRouter.unbind(metaId);
  105. listener.close();
  106. }
  107. map.remove(metaId);
  108. publishClosedEvent(metaId);
  109. logger.info("关闭成功:{}", metaId);
  110. }
  111. @Override
  112. public void onApplicationEvent(RefreshOffsetEvent event) {
  113. List<ChangedOffset> offsetList = event.getOffsetList();
  114. if (!CollectionUtils.isEmpty(offsetList)) {
  115. offsetList.forEach(offset -> {
  116. if (offset.isRefreshOffset() && map.containsKey(offset.getMetaId())) {
  117. map.get(offset.getMetaId()).refreshEvent(offset);
  118. }
  119. });
  120. }
  121. }
  122. @Override
  123. public void run() {
  124. // 定时同步增量信息
  125. map.values().forEach(listener -> listener.flushEvent());
  126. }
  127. private Listener getListener(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta) {
  128. ConnectorConfig connectorConfig = connector.getConfig();
  129. ListenerConfig listenerConfig = mapping.getListener();
  130. String listenerType = listenerConfig.getListenerType();
  131. Listener listener = connectorFactory.getListener(connectorConfig.getConnectorType(), listenerType);
  132. if (null == listener) {
  133. throw new ManagerException(String.format("Unsupported listener type \"%s\".", connectorConfig.getConnectorType()));
  134. }
  135. // 默认定时抽取
  136. if (ListenerTypeEnum.isTiming(listenerType) && listener instanceof AbstractQuartzListener) {
  137. AbstractQuartzListener quartzListener = (AbstractQuartzListener) listener;
  138. quartzListener.setCommands(list.stream().map(t -> new TableGroupQuartzCommand(t.getSourceTable(), t.getCommand())).collect(Collectors.toList()));
  139. quartzListener.register(new QuartzConsumer().init(bufferActuatorRouter, profileComponent, logService, meta.getId(), mapping, list));
  140. }
  141. // 基于日志抽取
  142. if (ListenerTypeEnum.isLog(listenerType) && listener instanceof AbstractListener) {
  143. AbstractListener abstractListener = (AbstractListener) listener;
  144. abstractListener.register(new LogConsumer().init(bufferActuatorRouter, profileComponent, logService, meta.getId(), mapping, list));
  145. }
  146. if (listener instanceof AbstractListener) {
  147. AbstractListener abstractListener = (AbstractListener) listener;
  148. Set<String> filterTable = new HashSet<>();
  149. List<Table> sourceTable = new ArrayList<>();
  150. list.forEach(t -> {
  151. Table table = t.getSourceTable();
  152. if (!filterTable.contains(t.getName())) {
  153. sourceTable.add(table);
  154. }
  155. filterTable.add(table.getName());
  156. });
  157. abstractListener.setConnectorService(connectorFactory.getConnectorService(connectorConfig.getConnectorType()));
  158. abstractListener.setConnectorInstance(connectorFactory.connect(connectorConfig));
  159. abstractListener.setScheduledTaskService(scheduledTaskService);
  160. abstractListener.setConnectorConfig(connectorConfig);
  161. abstractListener.setListenerConfig(listenerConfig);
  162. abstractListener.setFilterTable(filterTable);
  163. abstractListener.setSourceTable(sourceTable);
  164. abstractListener.setSnapshot(meta.getSnapshot());
  165. abstractListener.setMetaId(meta.getId());
  166. }
  167. listener.init();
  168. return listener;
  169. }
  170. }