IncrementPuller.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. package org.dbsyncer.manager.puller;
  2. import org.dbsyncer.common.event.Event;
  3. import org.dbsyncer.common.event.RowChangedEvent;
  4. import org.dbsyncer.common.model.AbstractConnectorConfig;
  5. import org.dbsyncer.common.scheduled.ScheduledTaskJob;
  6. import org.dbsyncer.common.scheduled.ScheduledTaskService;
  7. import org.dbsyncer.common.util.CollectionUtils;
  8. import org.dbsyncer.connector.ConnectorFactory;
  9. import org.dbsyncer.connector.model.Field;
  10. import org.dbsyncer.connector.model.Table;
  11. import org.dbsyncer.listener.AbstractExtractor;
  12. import org.dbsyncer.listener.Extractor;
  13. import org.dbsyncer.listener.Listener;
  14. import org.dbsyncer.listener.config.ListenerConfig;
  15. import org.dbsyncer.listener.enums.ListenerTypeEnum;
  16. import org.dbsyncer.listener.quartz.AbstractQuartzExtractor;
  17. import org.dbsyncer.listener.quartz.TableGroupCommand;
  18. import org.dbsyncer.manager.Manager;
  19. import org.dbsyncer.manager.ManagerException;
  20. import org.dbsyncer.manager.config.FieldPicker;
  21. import org.dbsyncer.parser.Parser;
  22. import org.dbsyncer.parser.logger.LogService;
  23. import org.dbsyncer.parser.logger.LogType;
  24. import org.dbsyncer.parser.model.*;
  25. import org.dbsyncer.parser.util.PickerUtil;
  26. import org.slf4j.Logger;
  27. import org.slf4j.LoggerFactory;
  28. import org.springframework.beans.factory.annotation.Autowired;
  29. import org.springframework.beans.factory.annotation.Qualifier;
  30. import org.springframework.stereotype.Component;
  31. import org.springframework.util.Assert;
  32. import javax.annotation.PostConstruct;
  33. import java.time.Instant;
  34. import java.time.LocalDateTime;
  35. import java.util.*;
  36. import java.util.concurrent.ConcurrentHashMap;
  37. import java.util.concurrent.Executor;
  38. import java.util.concurrent.atomic.AtomicInteger;
  39. import java.util.stream.Collectors;
  40. /**
  41. * 增量同步
  42. *
  43. * @author AE86
  44. * @version 1.0.0
  45. * @date 2020/04/26 15:28
  46. */
  47. @Component
  48. public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob {
  49. private final Logger logger = LoggerFactory.getLogger(getClass());
  50. @Autowired
  51. private Parser parser;
  52. @Autowired
  53. private Listener listener;
  54. @Autowired
  55. private Manager manager;
  56. @Autowired
  57. private LogService logService;
  58. @Autowired
  59. private ScheduledTaskService scheduledTaskService;
  60. @Autowired
  61. private ConnectorFactory connectorFactory;
  62. @Qualifier("taskExecutor")
  63. @Autowired
  64. private Executor taskExecutor;
  65. private Map<String, Extractor> map = new ConcurrentHashMap<>();
  66. @PostConstruct
  67. private void init() {
  68. scheduledTaskService.start(3000, this);
  69. }
  70. @Override
  71. public void start(Mapping mapping) {
  72. final String mappingId = mapping.getId();
  73. final String metaId = mapping.getMetaId();
  74. logger.info("开始增量同步:{}, {}", metaId, mapping.getName());
  75. Connector connector = manager.getConnector(mapping.getSourceConnectorId());
  76. Assert.notNull(connector, "连接器不能为空.");
  77. List<TableGroup> list = manager.getSortedTableGroupAll(mappingId);
  78. Assert.notEmpty(list, "映射关系不能为空.");
  79. Meta meta = manager.getMeta(metaId);
  80. Assert.notNull(meta, "Meta不能为空.");
  81. Thread worker = new Thread(()->{
  82. try {
  83. long now = Instant.now().toEpochMilli();
  84. meta.setBeginTime(now);
  85. meta.setEndTime(now);
  86. manager.editMeta(meta);
  87. map.putIfAbsent(metaId, getExtractor(mapping, connector, list, meta));
  88. map.get(metaId).start();
  89. } catch (Exception e) {
  90. close(metaId);
  91. logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
  92. logger.error("运行异常,结束增量同步{}:{}", metaId, e.getMessage());
  93. }
  94. });
  95. worker.setName(new StringBuilder("increment-worker-").append(mapping.getId()).toString());
  96. worker.setDaemon(false);
  97. worker.start();
  98. }
  99. @Override
  100. public void close(String metaId) {
  101. Extractor extractor = map.get(metaId);
  102. if (null != extractor) {
  103. extractor.clearAllListener();
  104. extractor.close();
  105. }
  106. map.remove(metaId);
  107. publishClosedEvent(metaId);
  108. logger.info("关闭成功:{}", metaId);
  109. }
  110. @Override
  111. public void run() {
  112. // 定时同步增量信息
  113. map.forEach((k, v) -> v.flushEvent());
  114. }
  115. private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta)
  116. throws InstantiationException, IllegalAccessException {
  117. AbstractConnectorConfig connectorConfig = connector.getConfig();
  118. ListenerConfig listenerConfig = mapping.getListener();
  119. // timing/log
  120. final String listenerType = listenerConfig.getListenerType();
  121. // 默认定时抽取
  122. if (ListenerTypeEnum.isTiming(listenerType)) {
  123. AbstractQuartzExtractor extractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
  124. extractor.setCommands(list.stream().map(t -> {
  125. Picker picker = new Picker(t.getFieldMapping());
  126. return new TableGroupCommand(picker.getSourcePrimaryKeyName(t), t.getCommand());
  127. }).collect(Collectors.toList()));
  128. setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getSnapshot(), new QuartzListener(mapping, list));
  129. return extractor;
  130. }
  131. // 基于日志抽取
  132. if (ListenerTypeEnum.isLog(listenerType)) {
  133. AbstractExtractor extractor = listener.getExtractor(ListenerTypeEnum.LOG, connectorConfig.getConnectorType(), AbstractExtractor.class);
  134. Set<String> filterTable = new HashSet<>();
  135. LogListener logListener = new LogListener(mapping, list, extractor);
  136. logListener.getTablePicker().forEach((k, fieldPickers) -> filterTable.add(k));
  137. extractor.setFilterTable(filterTable);
  138. setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getSnapshot(), logListener);
  139. return extractor;
  140. }
  141. throw new ManagerException("未知的监听配置.");
  142. }
  143. private void setExtractorConfig(AbstractExtractor extractor, AbstractConnectorConfig connector, ListenerConfig listener,
  144. Map<String, String> snapshot, AbstractListener event) {
  145. extractor.setConnectorFactory(connectorFactory);
  146. extractor.setScheduledTaskService(scheduledTaskService);
  147. extractor.setConnectorConfig(connector);
  148. extractor.setListenerConfig(listener);
  149. extractor.setSnapshot(snapshot);
  150. extractor.addListener(event);
  151. extractor.setMetaId(event.metaId);
  152. }
  153. abstract class AbstractListener implements Event {
  154. private static final int FLUSH_DELAYED_SECONDS = 30;
  155. protected Mapping mapping;
  156. protected String metaId;
  157. private LocalDateTime updateTime = LocalDateTime.now();
  158. @Override
  159. public void flushEvent(Map<String, String> snapshot) {
  160. // 30s内更新,执行写入
  161. if (updateTime.isAfter(LocalDateTime.now().minusSeconds(FLUSH_DELAYED_SECONDS))) {
  162. if (!CollectionUtils.isEmpty(snapshot)) {
  163. logger.debug("{}", snapshot);
  164. }
  165. forceFlushEvent(snapshot);
  166. }
  167. }
  168. @Override
  169. public void forceFlushEvent(Map<String, String> snapshot) {
  170. Meta meta = manager.getMeta(metaId);
  171. if (null != meta) {
  172. meta.setSnapshot(snapshot);
  173. manager.editMeta(meta);
  174. }
  175. }
  176. @Override
  177. public void refreshFlushEventUpdateTime() {
  178. updateTime = LocalDateTime.now();
  179. }
  180. @Override
  181. public void errorEvent(Exception e) {
  182. logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
  183. }
  184. @Override
  185. public void interruptException(Exception e) {
  186. errorEvent(e);
  187. close(metaId);
  188. }
  189. }
  190. /**
  191. * </p>定时模式
  192. * <ol>
  193. * <li>根据过滤条件筛选</li>
  194. * </ol>
  195. * </p>同步关系:
  196. * </p>数据源表 >> 目标源表
  197. * <ul>
  198. * <li>A >> B</li>
  199. * <li>A >> C</li>
  200. * <li>E >> F</li>
  201. * </ul>
  202. * </p>PS:
  203. * <ol>
  204. * <li>依次执行同步关系A >> B 然后 A >> C ...</li>
  205. * </ol>
  206. */
  207. final class QuartzListener extends AbstractListener {
  208. private List<FieldPicker> tablePicker;
  209. public QuartzListener(Mapping mapping, List<TableGroup> list) {
  210. this.mapping = mapping;
  211. this.metaId = mapping.getMetaId();
  212. this.tablePicker = new LinkedList<>();
  213. list.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
  214. }
  215. @Override
  216. public void changedEvent(RowChangedEvent rowChangedEvent) {
  217. final FieldPicker picker = tablePicker.get(rowChangedEvent.getTableGroupIndex());
  218. TableGroup tableGroup = picker.getTableGroup();
  219. rowChangedEvent.setSourceTableName(tableGroup.getSourceTable().getName());
  220. // 处理过程有异常向上抛
  221. parser.execute(mapping, tableGroup, rowChangedEvent);
  222. // 标记有变更记录
  223. refreshFlushEventUpdateTime();
  224. }
  225. }
  226. /**
  227. * </p>日志模式
  228. * <ol>
  229. * <li>监听表增量数据</li>
  230. * <li>根据过滤条件筛选</li>
  231. * </ol>
  232. * </p>同步关系:
  233. * </p>数据源表 >> 目标源表
  234. * <ul>
  235. * <li>A >> B</li>
  236. * <li>A >> C</li>
  237. * <li>E >> F</li>
  238. * </ul>
  239. * </p>PS:
  240. * <ol>
  241. * <li>为减少开销而选择复用监听器实例, 启动时只需创建一个数据源连接器.</li>
  242. * <li>关系A >> B和A >> C会复用A监听的数据, A监听到增量数据,会发送给B和C.</li>
  243. * <li>该模式下,会监听表所有字段.</li>
  244. * </ol>
  245. */
  246. final class LogListener extends AbstractListener {
  247. private Extractor extractor;
  248. private Map<String, List<FieldPicker>> tablePicker;
  249. private AtomicInteger eventCounter;
  250. private static final int MAX_LOG_CACHE_SIZE = 128;
  251. public LogListener(Mapping mapping, List<TableGroup> list, Extractor extractor) {
  252. this.mapping = mapping;
  253. this.metaId = mapping.getMetaId();
  254. this.extractor = extractor;
  255. this.tablePicker = new LinkedHashMap<>();
  256. this.eventCounter = new AtomicInteger();
  257. list.forEach(t -> {
  258. final Table table = t.getSourceTable();
  259. final String tableName = table.getName();
  260. List<Field> pkList = t.getTargetTable().getColumn().stream().filter(field -> field.isPk()).collect(Collectors.toList());
  261. tablePicker.putIfAbsent(tableName, new ArrayList<>());
  262. TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
  263. tablePicker.get(tableName).add(new FieldPicker(group, pkList, group.getFilter(), table.getColumn(), group.getFieldMapping()));
  264. });
  265. }
  266. @Override
  267. public void changedEvent(RowChangedEvent rowChangedEvent) {
  268. // 处理过程有异常向上抛
  269. List<FieldPicker> pickers = tablePicker.get(rowChangedEvent.getSourceTableName());
  270. if (!CollectionUtils.isEmpty(pickers)) {
  271. pickers.forEach(picker -> {
  272. final Map<String, Object> dataMap = picker.getColumns(rowChangedEvent.getDataList());
  273. if (picker.filter(dataMap)) {
  274. rowChangedEvent.setDataMap(dataMap);
  275. parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
  276. }
  277. });
  278. // 标记有变更记录
  279. refreshFlushEventUpdateTime();
  280. eventCounter.set(0);
  281. return;
  282. }
  283. // 防止挤压无效的增量数据,刷新最新的有效记录点
  284. if (eventCounter.incrementAndGet() >= MAX_LOG_CACHE_SIZE) {
  285. extractor.forceFlushEvent();
  286. eventCounter.set(0);
  287. }
  288. }
  289. public Map<String, List<FieldPicker>> getTablePicker() {
  290. return tablePicker;
  291. }
  292. }
  293. }