IncrementPuller.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package org.dbsyncer.manager.puller;
  2. import org.dbsyncer.common.event.Event;
  3. import org.dbsyncer.common.event.RowChangedEvent;
  4. import org.dbsyncer.common.model.AbstractConnectorConfig;
  5. import org.dbsyncer.common.scheduled.ScheduledTaskJob;
  6. import org.dbsyncer.common.scheduled.ScheduledTaskService;
  7. import org.dbsyncer.common.util.CollectionUtils;
  8. import org.dbsyncer.connector.ConnectorFactory;
  9. import org.dbsyncer.connector.model.Field;
  10. import org.dbsyncer.connector.model.Table;
  11. import org.dbsyncer.connector.util.PrimaryKeyUtil;
  12. import org.dbsyncer.listener.AbstractExtractor;
  13. import org.dbsyncer.listener.Extractor;
  14. import org.dbsyncer.listener.Listener;
  15. import org.dbsyncer.listener.config.ListenerConfig;
  16. import org.dbsyncer.listener.enums.ListenerTypeEnum;
  17. import org.dbsyncer.listener.quartz.AbstractQuartzExtractor;
  18. import org.dbsyncer.listener.quartz.TableGroupCommand;
  19. import org.dbsyncer.manager.Manager;
  20. import org.dbsyncer.manager.ManagerException;
  21. import org.dbsyncer.manager.model.FieldPicker;
  22. import org.dbsyncer.parser.Parser;
  23. import org.dbsyncer.parser.logger.LogService;
  24. import org.dbsyncer.parser.logger.LogType;
  25. import org.dbsyncer.parser.model.Connector;
  26. import org.dbsyncer.parser.model.Mapping;
  27. import org.dbsyncer.parser.model.Meta;
  28. import org.dbsyncer.parser.model.TableGroup;
  29. import org.dbsyncer.parser.util.PickerUtil;
  30. import org.slf4j.Logger;
  31. import org.slf4j.LoggerFactory;
  32. import org.springframework.stereotype.Component;
  33. import org.springframework.util.Assert;
  34. import javax.annotation.PostConstruct;
  35. import javax.annotation.Resource;
  36. import java.sql.Timestamp;
  37. import java.time.Instant;
  38. import java.time.LocalDateTime;
  39. import java.util.ArrayList;
  40. import java.util.HashSet;
  41. import java.util.LinkedHashMap;
  42. import java.util.LinkedList;
  43. import java.util.List;
  44. import java.util.Map;
  45. import java.util.Set;
  46. import java.util.concurrent.ConcurrentHashMap;
  47. import java.util.concurrent.atomic.AtomicInteger;
  48. import java.util.stream.Collectors;
  49. /**
  50. * 增量同步
  51. *
  52. * @author AE86
  53. * @version 1.0.0
  54. * @date 2020/04/26 15:28
  55. */
  56. @Component
  57. public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob {
  58. private final Logger logger = LoggerFactory.getLogger(getClass());
  59. @Resource
  60. private Parser parser;
  61. @Resource
  62. private Listener listener;
  63. @Resource
  64. private Manager manager;
  65. @Resource
  66. private LogService logService;
  67. @Resource
  68. private ScheduledTaskService scheduledTaskService;
  69. @Resource
  70. private ConnectorFactory connectorFactory;
  71. private Map<String, Extractor> map = new ConcurrentHashMap<>();
  72. @PostConstruct
  73. private void init() {
  74. scheduledTaskService.start(3000, this);
  75. }
  76. @Override
  77. public void start(Mapping mapping) {
  78. final String mappingId = mapping.getId();
  79. final String metaId = mapping.getMetaId();
  80. logger.info("开始增量同步:{}, {}", metaId, mapping.getName());
  81. Connector connector = manager.getConnector(mapping.getSourceConnectorId());
  82. Assert.notNull(connector, "连接器不能为空.");
  83. List<TableGroup> list = manager.getSortedTableGroupAll(mappingId);
  84. Assert.notEmpty(list, "映射关系不能为空.");
  85. Meta meta = manager.getMeta(metaId);
  86. Assert.notNull(meta, "Meta不能为空.");
  87. Thread worker = new Thread(() -> {
  88. try {
  89. long now = Instant.now().toEpochMilli();
  90. meta.setBeginTime(now);
  91. meta.setEndTime(now);
  92. manager.editConfigModel(meta);
  93. map.putIfAbsent(metaId, getExtractor(mapping, connector, list, meta));
  94. map.get(metaId).start();
  95. } catch (Exception e) {
  96. close(metaId);
  97. logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
  98. logger.error("运行异常,结束增量同步{}:{}", metaId, e.getMessage());
  99. }
  100. });
  101. worker.setName(new StringBuilder("increment-worker-").append(mapping.getId()).toString());
  102. worker.setDaemon(false);
  103. worker.start();
  104. }
  105. @Override
  106. public void close(String metaId) {
  107. Extractor extractor = map.get(metaId);
  108. if (null != extractor) {
  109. extractor.close();
  110. }
  111. map.remove(metaId);
  112. publishClosedEvent(metaId);
  113. logger.info("关闭成功:{}", metaId);
  114. }
  115. @Override
  116. public void run() {
  117. // 定时同步增量信息
  118. map.forEach((k, v) -> v.flushEvent());
  119. }
  120. private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta) throws InstantiationException, IllegalAccessException {
  121. AbstractConnectorConfig connectorConfig = connector.getConfig();
  122. ListenerConfig listenerConfig = mapping.getListener();
  123. // timing/log
  124. final String listenerType = listenerConfig.getListenerType();
  125. AbstractExtractor extractor = null;
  126. // 默认定时抽取
  127. if (ListenerTypeEnum.isTiming(listenerType)) {
  128. AbstractQuartzExtractor quartzExtractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
  129. quartzExtractor.setCommands(list.stream().map(t -> {
  130. List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(t.getSourceTable());
  131. return new TableGroupCommand(primaryKeys, t.getCommand());
  132. }).collect(Collectors.toList()));
  133. quartzExtractor.register(new QuartzListener(mapping, list));
  134. extractor = quartzExtractor;
  135. }
  136. // 基于日志抽取
  137. if (ListenerTypeEnum.isLog(listenerType)) {
  138. extractor = listener.getExtractor(ListenerTypeEnum.LOG, connectorConfig.getConnectorType(), AbstractExtractor.class);
  139. extractor.register(new LogListener(mapping, list, extractor));
  140. }
  141. if (null != extractor) {
  142. Set<String> filterTable = new HashSet<>();
  143. List<Table> sourceTable = new ArrayList<>();
  144. list.forEach(t -> {
  145. Table table = t.getSourceTable();
  146. if (!filterTable.contains(t.getName())) {
  147. sourceTable.add(table);
  148. }
  149. filterTable.add(table.getName());
  150. });
  151. extractor.setConnectorFactory(connectorFactory);
  152. extractor.setScheduledTaskService(scheduledTaskService);
  153. extractor.setConnectorConfig(connectorConfig);
  154. extractor.setListenerConfig(listenerConfig);
  155. extractor.setFilterTable(filterTable);
  156. extractor.setSourceTable(sourceTable);
  157. extractor.setSnapshot(meta.getSnapshot());
  158. extractor.setMetaId(meta.getId());
  159. return extractor;
  160. }
  161. throw new ManagerException("未知的监听配置.");
  162. }
  163. abstract class AbstractListener implements Event {
  164. private static final int FLUSH_DELAYED_SECONDS = 30;
  165. protected Mapping mapping;
  166. protected String metaId;
  167. @Override
  168. public void flushEvent(Map<String, String> snapshot) {
  169. // 30s内更新,执行写入
  170. Meta meta = manager.getMeta(metaId);
  171. LocalDateTime lastSeconds = LocalDateTime.now().minusSeconds(FLUSH_DELAYED_SECONDS);
  172. if (meta.getUpdateTime() > Timestamp.valueOf(lastSeconds).getTime()) {
  173. if (!CollectionUtils.isEmpty(snapshot)) {
  174. logger.debug("{}", snapshot);
  175. }
  176. forceFlushEvent(snapshot);
  177. }
  178. }
  179. @Override
  180. public void forceFlushEvent(Map<String, String> snapshot) {
  181. Meta meta = manager.getMeta(metaId);
  182. if (null != meta) {
  183. meta.setSnapshot(snapshot);
  184. manager.editConfigModel(meta);
  185. }
  186. }
  187. @Override
  188. public void errorEvent(Exception e) {
  189. logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
  190. }
  191. @Override
  192. public void interruptException(Exception e) {
  193. errorEvent(e);
  194. close(metaId);
  195. }
  196. }
  197. /**
  198. * </p>定时模式
  199. * <ol>
  200. * <li>根据过滤条件筛选</li>
  201. * </ol>
  202. * </p>同步关系:
  203. * </p>数据源表 >> 目标源表
  204. * <ul>
  205. * <li>A >> B</li>
  206. * <li>A >> C</li>
  207. * <li>E >> F</li>
  208. * </ul>
  209. * </p>PS:
  210. * <ol>
  211. * <li>依次执行同步关系A >> B 然后 A >> C ...</li>
  212. * </ol>
  213. */
  214. final class QuartzListener extends AbstractListener {
  215. private List<FieldPicker> tablePicker;
  216. public QuartzListener(Mapping mapping, List<TableGroup> tableGroups) {
  217. this.mapping = mapping;
  218. this.metaId = mapping.getMetaId();
  219. this.tablePicker = new LinkedList<>();
  220. tableGroups.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
  221. }
  222. @Override
  223. public void changedEvent(RowChangedEvent rowChangedEvent) {
  224. final FieldPicker picker = tablePicker.get(rowChangedEvent.getTableGroupIndex());
  225. TableGroup tableGroup = picker.getTableGroup();
  226. rowChangedEvent.setSourceTableName(tableGroup.getSourceTable().getName());
  227. // 处理过程有异常向上抛
  228. parser.execute(mapping, tableGroup, rowChangedEvent);
  229. }
  230. }
  231. /**
  232. * </p>日志模式
  233. * <ol>
  234. * <li>监听表增量数据</li>
  235. * <li>根据过滤条件筛选</li>
  236. * </ol>
  237. * </p>同步关系:
  238. * </p>数据源表 >> 目标源表
  239. * <ul>
  240. * <li>A >> B</li>
  241. * <li>A >> C</li>
  242. * <li>E >> F</li>
  243. * </ul>
  244. * </p>PS:
  245. * <ol>
  246. * <li>为减少开销而选择复用监听器实例, 启动时只需创建一个数据源连接器.</li>
  247. * <li>关系A >> B和A >> C会复用A监听的数据, A监听到增量数据,会发送给B和C.</li>
  248. * <li>该模式下,会监听表所有字段.</li>
  249. * </ol>
  250. */
  251. final class LogListener extends AbstractListener {
  252. private Extractor extractor;
  253. private Map<String, List<FieldPicker>> tablePicker;
  254. private AtomicInteger eventCounter;
  255. private static final int MAX_LOG_CACHE_SIZE = 128;
  256. public LogListener(Mapping mapping, List<TableGroup> tableGroups, Extractor extractor) {
  257. this.mapping = mapping;
  258. this.metaId = mapping.getMetaId();
  259. this.extractor = extractor;
  260. this.tablePicker = new LinkedHashMap<>();
  261. this.eventCounter = new AtomicInteger();
  262. tableGroups.forEach(t -> {
  263. final Table table = t.getSourceTable();
  264. final String tableName = table.getName();
  265. tablePicker.putIfAbsent(tableName, new ArrayList<>());
  266. TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, t);
  267. tablePicker.get(tableName).add(new FieldPicker(group, group.getFilter(), table.getColumn(), group.getFieldMapping()));
  268. });
  269. }
  270. @Override
  271. public void changedEvent(RowChangedEvent rowChangedEvent) {
  272. // 处理过程有异常向上抛
  273. List<FieldPicker> pickers = tablePicker.get(rowChangedEvent.getSourceTableName());
  274. if (!CollectionUtils.isEmpty(pickers)) {
  275. pickers.forEach(picker -> {
  276. final Map<String, Object> dataMap = picker.getColumns(rowChangedEvent.getDataList());
  277. if (picker.filter(dataMap)) {
  278. rowChangedEvent.setDataMap(dataMap);
  279. parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
  280. }
  281. });
  282. eventCounter.set(0);
  283. return;
  284. }
  285. // 防止挤压无效的增量数据,刷新最新的有效记录点
  286. if (eventCounter.incrementAndGet() >= MAX_LOG_CACHE_SIZE) {
  287. extractor.forceFlushEvent();
  288. eventCounter.set(0);
  289. }
  290. }
  291. }
  292. }