IncrementPuller.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. package org.dbsyncer.manager.puller;
  2. import org.dbsyncer.common.event.Event;
  3. import org.dbsyncer.common.event.RowChangedEvent;
  4. import org.dbsyncer.common.scheduled.ScheduledTaskJob;
  5. import org.dbsyncer.common.scheduled.ScheduledTaskService;
  6. import org.dbsyncer.common.util.CollectionUtils;
  7. import org.dbsyncer.common.util.StringUtil;
  8. import org.dbsyncer.connector.ConnectorFactory;
  9. import org.dbsyncer.connector.config.ConnectorConfig;
  10. import org.dbsyncer.connector.model.Field;
  11. import org.dbsyncer.connector.model.Table;
  12. import org.dbsyncer.connector.constant.ConnectorConstant;
  13. import org.dbsyncer.listener.AbstractExtractor;
  14. import org.dbsyncer.listener.Extractor;
  15. import org.dbsyncer.listener.Listener;
  16. import org.dbsyncer.listener.config.ListenerConfig;
  17. import org.dbsyncer.listener.enums.ListenerTypeEnum;
  18. import org.dbsyncer.listener.quartz.AbstractQuartzExtractor;
  19. import org.dbsyncer.manager.Manager;
  20. import org.dbsyncer.manager.ManagerException;
  21. import org.dbsyncer.manager.config.FieldPicker;
  22. import org.dbsyncer.parser.Parser;
  23. import org.dbsyncer.parser.logger.LogService;
  24. import org.dbsyncer.parser.logger.LogType;
  25. import org.dbsyncer.parser.model.Connector;
  26. import org.dbsyncer.parser.model.Mapping;
  27. import org.dbsyncer.parser.model.Meta;
  28. import org.dbsyncer.parser.model.TableGroup;
  29. import org.dbsyncer.parser.util.PickerUtil;
  30. import org.slf4j.Logger;
  31. import org.slf4j.LoggerFactory;
  32. import org.springframework.beans.factory.annotation.Autowired;
  33. import org.springframework.beans.factory.annotation.Qualifier;
  34. import org.springframework.stereotype.Component;
  35. import org.springframework.util.Assert;
  36. import javax.annotation.PostConstruct;
  37. import java.time.Instant;
  38. import java.time.LocalDateTime;
  39. import java.util.*;
  40. import java.util.concurrent.ConcurrentHashMap;
  41. import java.util.concurrent.Executor;
  42. import java.util.concurrent.atomic.AtomicInteger;
  43. import java.util.stream.Collectors;
  44. /**
  45. * 增量同步
  46. *
  47. * @author AE86
  48. * @version 1.0.0
  49. * @date 2020/04/26 15:28
  50. */
  51. @Component
  52. public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob {
  53. private final Logger logger = LoggerFactory.getLogger(getClass());
  54. @Autowired
  55. private Parser parser;
  56. @Autowired
  57. private Listener listener;
  58. @Autowired
  59. private Manager manager;
  60. @Autowired
  61. private LogService logService;
  62. @Autowired
  63. private ScheduledTaskService scheduledTaskService;
  64. @Autowired
  65. private ConnectorFactory connectorFactory;
  66. @Qualifier("taskExecutor")
  67. @Autowired
  68. private Executor taskExecutor;
  69. private Map<String, Extractor> map = new ConcurrentHashMap<>();
  70. @PostConstruct
  71. private void init() {
  72. scheduledTaskService.start(3000, this);
  73. }
  74. @Override
  75. public void asyncStart(Mapping mapping) {
  76. final String mappingId = mapping.getId();
  77. final String metaId = mapping.getMetaId();
  78. try {
  79. Connector connector = manager.getConnector(mapping.getSourceConnectorId());
  80. Assert.notNull(connector, "连接器不能为空.");
  81. List<TableGroup> list = manager.getTableGroupAll(mappingId);
  82. Assert.notEmpty(list, "映射关系不能为空.");
  83. Meta meta = manager.getMeta(metaId);
  84. Assert.notNull(meta, "Meta不能为空.");
  85. AbstractExtractor extractor = getExtractor(mapping, connector, list, meta);
  86. long now = Instant.now().toEpochMilli();
  87. meta.setBeginTime(now);
  88. meta.setEndTime(now);
  89. manager.editMeta(meta);
  90. map.putIfAbsent(metaId, extractor);
  91. // 执行任务
  92. logger.info("启动成功:{}", metaId);
  93. map.get(metaId).start();
  94. } catch (Exception e) {
  95. close(metaId);
  96. logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
  97. logger.error("运行异常,结束任务{}:{}", metaId, e.getMessage());
  98. }
  99. }
  100. @Override
  101. public void close(String metaId) {
  102. Extractor extractor = map.get(metaId);
  103. if (null != extractor) {
  104. extractor.clearAllListener();
  105. extractor.close();
  106. }
  107. map.remove(metaId);
  108. publishClosedEvent(metaId);
  109. logger.info("关闭成功:{}", metaId);
  110. }
  111. @Override
  112. public void run() {
  113. // 定时同步增量信息
  114. map.forEach((k, v) -> v.flushEvent());
  115. }
  116. private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta)
  117. throws InstantiationException, IllegalAccessException {
  118. ConnectorConfig connectorConfig = connector.getConfig();
  119. ListenerConfig listenerConfig = mapping.getListener();
  120. // timing/log
  121. final String listenerType = listenerConfig.getListenerType();
  122. // 默认定时抽取
  123. if (ListenerTypeEnum.isTiming(listenerType)) {
  124. AbstractQuartzExtractor extractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
  125. List<Map<String, String>> commands = list.stream().map(t -> t.getCommand()).collect(Collectors.toList());
  126. extractor.setCommands(commands);
  127. setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), new QuartzListener(mapping, list));
  128. return extractor;
  129. }
  130. // 基于日志抽取
  131. if (ListenerTypeEnum.isLog(listenerType)) {
  132. AbstractExtractor extractor = listener.getExtractor(ListenerTypeEnum.LOG, connectorConfig.getConnectorType(), AbstractExtractor.class);
  133. Set<String> filterTable = new HashSet<>();
  134. LogListener logListener = new LogListener(mapping, list, extractor);
  135. logListener.getTablePicker().forEach((k, fieldPickers) -> filterTable.add(k));
  136. extractor.setFilterTable(filterTable);
  137. setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getMap(), logListener);
  138. return extractor;
  139. }
  140. throw new ManagerException("未知的监听配置.");
  141. }
  142. private void setExtractorConfig(AbstractExtractor extractor, ConnectorConfig connector, ListenerConfig listener,
  143. Map<String, String> snapshot, AbstractListener event) {
  144. extractor.setTaskExecutor(taskExecutor);
  145. extractor.setConnectorFactory(connectorFactory);
  146. extractor.setScheduledTaskService(scheduledTaskService);
  147. extractor.setConnectorConfig(connector);
  148. extractor.setListenerConfig(listener);
  149. extractor.setSnapshot(snapshot);
  150. extractor.addListener(event);
  151. extractor.setMetaId(event.metaId);
  152. }
  153. abstract class AbstractListener implements Event {
  154. private static final int FLUSH_DELAYED_SECONDS = 30;
  155. protected Mapping mapping;
  156. protected String metaId;
  157. private LocalDateTime updateTime = LocalDateTime.now();
  158. @Override
  159. public void flushEvent(Map<String, String> map) {
  160. // 30s内更新,执行写入
  161. if (updateTime.isAfter(LocalDateTime.now().minusSeconds(FLUSH_DELAYED_SECONDS))) {
  162. if (!CollectionUtils.isEmpty(map)) {
  163. logger.debug("{}", map);
  164. }
  165. forceFlushEvent(map);
  166. }
  167. }
  168. @Override
  169. public void forceFlushEvent(Map<String, String> map) {
  170. Meta meta = manager.getMeta(metaId);
  171. if (null != meta) {
  172. meta.setMap(map);
  173. manager.editMeta(meta);
  174. }
  175. }
  176. @Override
  177. public void refreshFlushEventUpdateTime() {
  178. updateTime = LocalDateTime.now();
  179. }
  180. @Override
  181. public void errorEvent(Exception e) {
  182. logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
  183. }
  184. @Override
  185. public void interruptException(Exception e) {
  186. errorEvent(e);
  187. close(metaId);
  188. }
  189. }
  190. /**
  191. * </p>定时模式
  192. * <ol>
  193. * <li>根据过滤条件筛选</li>
  194. * </ol>
  195. * </p>同步关系:
  196. * </p>数据源表 >> 目标源表
  197. * <ul>
  198. * <li>A >> B</li>
  199. * <li>A >> C</li>
  200. * <li>E >> F</li>
  201. * </ul>
  202. * </p>PS:
  203. * <ol>
  204. * <li>依次执行同步关系A >> B 然后 A >> C ...</li>
  205. * </ol>
  206. */
  207. final class QuartzListener extends AbstractListener {
  208. private List<FieldPicker> tablePicker;
  209. public QuartzListener(Mapping mapping, List<TableGroup> list) {
  210. this.mapping = mapping;
  211. this.metaId = mapping.getMetaId();
  212. this.tablePicker = new LinkedList<>();
  213. list.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
  214. }
  215. @Override
  216. public void changedEvent(RowChangedEvent rowChangedEvent) {
  217. final FieldPicker picker = tablePicker.get(rowChangedEvent.getTableGroupIndex());
  218. TableGroup tableGroup = picker.getTableGroup();
  219. rowChangedEvent.setSourceTableName(tableGroup.getSourceTable().getName());
  220. // 处理过程有异常向上抛
  221. parser.execute(mapping, tableGroup, rowChangedEvent);
  222. // 标记有变更记录
  223. refreshFlushEventUpdateTime();
  224. }
  225. }
  226. /**
  227. * </p>日志模式
  228. * <ol>
  229. * <li>监听表增量数据</li>
  230. * <li>根据过滤条件筛选</li>
  231. * </ol>
  232. * </p>同步关系:
  233. * </p>数据源表 >> 目标源表
  234. * <ul>
  235. * <li>A >> B</li>
  236. * <li>A >> C</li>
  237. * <li>E >> F</li>
  238. * </ul>
  239. * </p>PS:
  240. * <ol>
  241. * <li>为减少开销而选择复用监听器实例, 启动时只需创建一个数据源连接器.</li>
  242. * <li>关系A >> B和A >> C会复用A监听的数据, A监听到增量数据,会发送给B和C.</li>
  243. * <li>该模式下,会监听表所有字段.</li>
  244. * </ol>
  245. */
  246. final class LogListener extends AbstractListener {
  247. private Extractor extractor;
  248. private Map<String, List<FieldPicker>> tablePicker;
  249. private AtomicInteger eventCounter;
  250. private static final int MAX_LOG_CACHE_SIZE = 128;
  251. public LogListener(Mapping mapping, List<TableGroup> list, Extractor extractor) {
  252. this.mapping = mapping;
  253. this.metaId = mapping.getMetaId();
  254. this.extractor = extractor;
  255. this.tablePicker = new LinkedHashMap<>();
  256. this.eventCounter = new AtomicInteger();
  257. list.forEach(t -> {
  258. final Table table = t.getSourceTable();
  259. final String tableName = table.getName();
  260. List<Field> pkList = t.getTargetTable().getColumn().stream().filter(field -> field.isPk()).collect(Collectors.toList());
  261. tablePicker.putIfAbsent(tableName, new ArrayList<>());
  262. TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
  263. tablePicker.get(tableName).add(new FieldPicker(group, pkList, group.getFilter(), table.getColumn(), group.getFieldMapping()));
  264. });
  265. }
  266. @Override
  267. public void changedEvent(RowChangedEvent rowChangedEvent) {
  268. // 处理过程有异常向上抛
  269. List<FieldPicker> pickers = tablePicker.get(rowChangedEvent.getSourceTableName());
  270. if (!CollectionUtils.isEmpty(pickers)) {
  271. pickers.forEach(picker -> {
  272. final Map<String, Object> dataMap = picker.getColumns(rowChangedEvent.getDataList());
  273. if (picker.filter(dataMap)) {
  274. rowChangedEvent.setDataMap(dataMap);
  275. parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
  276. }
  277. });
  278. // 标记有变更记录
  279. refreshFlushEventUpdateTime();
  280. eventCounter.set(0);
  281. return;
  282. }
  283. // 防止挤压无效的增量数据,刷新最新的有效记录点
  284. if (eventCounter.incrementAndGet() >= MAX_LOG_CACHE_SIZE) {
  285. extractor.forceFlushEvent();
  286. eventCounter.set(0);
  287. }
  288. }
  289. public Map<String, List<FieldPicker>> getTablePicker() {
  290. return tablePicker;
  291. }
  292. }
  293. }