IncrementPuller.java 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package org.dbsyncer.manager.impl;
  2. import org.dbsyncer.common.util.CollectionUtils;
  3. import org.dbsyncer.connector.ConnectorFactory;
  4. import org.dbsyncer.manager.AbstractPuller;
  5. import org.dbsyncer.manager.ManagerException;
  6. import org.dbsyncer.parser.LogService;
  7. import org.dbsyncer.parser.LogType;
  8. import org.dbsyncer.parser.ProfileComponent;
  9. import org.dbsyncer.parser.consumer.impl.LogConsumer;
  10. import org.dbsyncer.parser.consumer.impl.QuartzConsumer;
  11. import org.dbsyncer.parser.event.RefreshOffsetEvent;
  12. import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
  13. import org.dbsyncer.parser.model.Connector;
  14. import org.dbsyncer.parser.model.Mapping;
  15. import org.dbsyncer.parser.model.Meta;
  16. import org.dbsyncer.parser.model.TableGroup;
  17. import org.dbsyncer.sdk.config.ListenerConfig;
  18. import org.dbsyncer.sdk.enums.ListenerTypeEnum;
  19. import org.dbsyncer.sdk.listener.AbstractListener;
  20. import org.dbsyncer.sdk.listener.AbstractQuartzListener;
  21. import org.dbsyncer.sdk.listener.Listener;
  22. import org.dbsyncer.sdk.model.ChangedOffset;
  23. import org.dbsyncer.sdk.model.ConnectorConfig;
  24. import org.dbsyncer.sdk.model.Table;
  25. import org.dbsyncer.sdk.model.TableGroupQuartzCommand;
  26. import org.dbsyncer.sdk.scheduled.ScheduledTaskJob;
  27. import org.dbsyncer.sdk.scheduled.ScheduledTaskService;
  28. import org.slf4j.Logger;
  29. import org.slf4j.LoggerFactory;
  30. import org.springframework.context.ApplicationListener;
  31. import org.springframework.stereotype.Component;
  32. import org.springframework.util.Assert;
  33. import javax.annotation.PostConstruct;
  34. import javax.annotation.Resource;
  35. import java.time.Instant;
  36. import java.util.ArrayList;
  37. import java.util.HashSet;
  38. import java.util.List;
  39. import java.util.Map;
  40. import java.util.Set;
  41. import java.util.concurrent.ConcurrentHashMap;
  42. import java.util.stream.Collectors;
  43. /**
  44. * 增量同步
  45. *
  46. * @author AE86
  47. * @version 1.0.0
  48. * @date 2020/04/26 15:28
  49. */
  50. @Component
  51. public final class IncrementPuller extends AbstractPuller implements ApplicationListener<RefreshOffsetEvent>, ScheduledTaskJob {
  52. private final Logger logger = LoggerFactory.getLogger(getClass());
  53. @Resource
  54. private BufferActuatorRouter bufferActuatorRouter;
  55. @Resource
  56. private ScheduledTaskService scheduledTaskService;
  57. @Resource
  58. private ConnectorFactory connectorFactory;
  59. @Resource
  60. private ProfileComponent profileComponent;
  61. @Resource
  62. private LogService logService;
  63. private Map<String, Listener> map = new ConcurrentHashMap<>();
  64. @PostConstruct
  65. private void init() {
  66. scheduledTaskService.start(3000, this);
  67. }
  68. @Override
  69. public void start(Mapping mapping) {
  70. final String mappingId = mapping.getId();
  71. final String metaId = mapping.getMetaId();
  72. logger.info("开始增量同步:{}, {}", metaId, mapping.getName());
  73. Connector connector = profileComponent.getConnector(mapping.getSourceConnectorId());
  74. Assert.notNull(connector, "连接器不能为空.");
  75. List<TableGroup> list = profileComponent.getSortedTableGroupAll(mappingId);
  76. Assert.notEmpty(list, "映射关系不能为空.");
  77. Meta meta = profileComponent.getMeta(metaId);
  78. Assert.notNull(meta, "Meta不能为空.");
  79. Thread worker = new Thread(() -> {
  80. try {
  81. long now = Instant.now().toEpochMilli();
  82. meta.setBeginTime(now);
  83. meta.setEndTime(now);
  84. profileComponent.editConfigModel(meta);
  85. map.putIfAbsent(metaId, getListener(mapping, connector, list, meta));
  86. map.get(metaId).start();
  87. } catch (Exception e) {
  88. close(metaId);
  89. logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
  90. logger.error("运行异常,结束增量同步{}:{}", metaId, e.getMessage());
  91. }
  92. });
  93. worker.setName(new StringBuilder("increment-worker-").append(mapping.getId()).toString());
  94. worker.setDaemon(false);
  95. worker.start();
  96. }
  97. @Override
  98. public void close(String metaId) {
  99. Listener listener = map.get(metaId);
  100. if (null != listener) {
  101. bufferActuatorRouter.unbind(metaId);
  102. listener.close();
  103. }
  104. map.remove(metaId);
  105. publishClosedEvent(metaId);
  106. logger.info("关闭成功:{}", metaId);
  107. }
  108. @Override
  109. public void onApplicationEvent(RefreshOffsetEvent event) {
  110. List<ChangedOffset> offsetList = event.getOffsetList();
  111. if (!CollectionUtils.isEmpty(offsetList)) {
  112. offsetList.forEach(offset -> {
  113. if (offset.isRefreshOffset() && map.containsKey(offset.getMetaId())) {
  114. map.get(offset.getMetaId()).refreshEvent(offset);
  115. }
  116. });
  117. }
  118. }
  119. @Override
  120. public void run() {
  121. // 定时同步增量信息
  122. map.values().forEach(listener -> listener.flushEvent());
  123. }
  124. private Listener getListener(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta) {
  125. ConnectorConfig connectorConfig = connector.getConfig();
  126. ListenerConfig listenerConfig = mapping.getListener();
  127. String listenerType = listenerConfig.getListenerType();
  128. Listener listener = connectorFactory.getListener(connectorConfig.getConnectorType(), listenerType);
  129. if (null == listener) {
  130. throw new ManagerException(String.format("Unsupported listener type \"%s\".", connectorConfig.getConnectorType()));
  131. }
  132. // 默认定时抽取
  133. if (ListenerTypeEnum.isTiming(listenerType) && listener instanceof AbstractQuartzListener) {
  134. AbstractQuartzListener quartzListener = (AbstractQuartzListener) listener;
  135. quartzListener.setCommands(list.stream().map(t -> new TableGroupQuartzCommand(t.getSourceTable(), t.getCommand())).collect(Collectors.toList()));
  136. quartzListener.register(new QuartzConsumer().init(bufferActuatorRouter, profileComponent, logService, meta.getId(), mapping, list));
  137. }
  138. // 基于日志抽取
  139. if (ListenerTypeEnum.isLog(listenerType) && listener instanceof AbstractListener) {
  140. AbstractListener abstractListener = (AbstractListener) listener;
  141. abstractListener.register(new LogConsumer().init(bufferActuatorRouter, profileComponent, logService, meta.getId(), mapping, list));
  142. }
  143. if (listener instanceof AbstractListener) {
  144. AbstractListener abstractListener = (AbstractListener) listener;
  145. Set<String> filterTable = new HashSet<>();
  146. List<Table> sourceTable = new ArrayList<>();
  147. list.forEach(t -> {
  148. Table table = t.getSourceTable();
  149. if (!filterTable.contains(t.getName())) {
  150. sourceTable.add(table);
  151. }
  152. filterTable.add(table.getName());
  153. });
  154. abstractListener.setConnectorService(connectorFactory.getConnectorService(connectorConfig.getConnectorType()));
  155. abstractListener.setConnectorInstance(connectorFactory.connect(connectorConfig));
  156. abstractListener.setScheduledTaskService(scheduledTaskService);
  157. abstractListener.setConnectorConfig(connectorConfig);
  158. abstractListener.setListenerConfig(listenerConfig);
  159. abstractListener.setFilterTable(filterTable);
  160. abstractListener.setSourceTable(sourceTable);
  161. abstractListener.setSnapshot(meta.getSnapshot());
  162. abstractListener.setMetaId(meta.getId());
  163. }
  164. return listener;
  165. }
  166. }