123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- /**
- * DBSyncer Copyright 2020-2023 All Rights Reserved.
- */
- package org.dbsyncer.manager.impl;
- import org.dbsyncer.common.util.CollectionUtils;
- import org.dbsyncer.connector.base.ConnectorFactory;
- import org.dbsyncer.manager.AbstractPuller;
- import org.dbsyncer.manager.ManagerException;
- import org.dbsyncer.parser.LogService;
- import org.dbsyncer.parser.LogType;
- import org.dbsyncer.parser.ProfileComponent;
- import org.dbsyncer.parser.consumer.impl.LogConsumer;
- import org.dbsyncer.parser.consumer.impl.QuartzConsumer;
- import org.dbsyncer.parser.event.RefreshOffsetEvent;
- import org.dbsyncer.parser.flush.impl.BufferActuatorRouter;
- import org.dbsyncer.parser.model.Connector;
- import org.dbsyncer.parser.model.Mapping;
- import org.dbsyncer.parser.model.Meta;
- import org.dbsyncer.parser.model.TableGroup;
- import org.dbsyncer.sdk.config.ListenerConfig;
- import org.dbsyncer.sdk.enums.ListenerTypeEnum;
- import org.dbsyncer.sdk.listener.AbstractListener;
- import org.dbsyncer.sdk.listener.AbstractQuartzListener;
- import org.dbsyncer.sdk.listener.Listener;
- import org.dbsyncer.sdk.model.ChangedOffset;
- import org.dbsyncer.sdk.model.ConnectorConfig;
- import org.dbsyncer.sdk.model.Table;
- import org.dbsyncer.sdk.model.TableGroupQuartzCommand;
- import org.dbsyncer.common.scheduled.ScheduledTaskJob;
- import org.dbsyncer.common.scheduled.ScheduledTaskService;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.context.ApplicationListener;
- import org.springframework.stereotype.Component;
- import org.springframework.util.Assert;
- import javax.annotation.PostConstruct;
- import javax.annotation.Resource;
- import java.time.Instant;
- import java.util.ArrayList;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.stream.Collectors;
- /**
- * 增量同步
- *
- * @Version 1.0.0
- * @Author AE86
- * @Date 2020-04-26 15:28
- */
- @Component
- public final class IncrementPuller extends AbstractPuller implements ApplicationListener<RefreshOffsetEvent>, ScheduledTaskJob {
- private final Logger logger = LoggerFactory.getLogger(getClass());
- @Resource
- private BufferActuatorRouter bufferActuatorRouter;
- @Resource
- private ScheduledTaskService scheduledTaskService;
- @Resource
- private ConnectorFactory connectorFactory;
- @Resource
- private ProfileComponent profileComponent;
- @Resource
- private LogService logService;
- private Map<String, Listener> map = new ConcurrentHashMap<>();
- @PostConstruct
- private void init() {
- scheduledTaskService.start(3000, this);
- }
- @Override
- public void start(Mapping mapping) {
- final String mappingId = mapping.getId();
- final String metaId = mapping.getMetaId();
- logger.info("开始增量同步:{}, {}", metaId, mapping.getName());
- Connector connector = profileComponent.getConnector(mapping.getSourceConnectorId());
- Assert.notNull(connector, "连接器不能为空.");
- List<TableGroup> list = profileComponent.getSortedTableGroupAll(mappingId);
- Assert.notEmpty(list, "表映射关系不能为空,请先添加源表到目标表关系.");
- Meta meta = profileComponent.getMeta(metaId);
- Assert.notNull(meta, "Meta不能为空.");
- Thread worker = new Thread(() -> {
- try {
- long now = Instant.now().toEpochMilli();
- meta.setBeginTime(now);
- meta.setEndTime(now);
- profileComponent.editConfigModel(meta);
- map.putIfAbsent(metaId, getListener(mapping, connector, list, meta));
- map.get(metaId).start();
- } catch (Exception e) {
- close(metaId);
- logService.log(LogType.TableGroupLog.INCREMENT_FAILED, e.getMessage());
- logger.error("运行异常,结束增量同步{}:{}", metaId, e.getMessage());
- }
- });
- worker.setName(new StringBuilder("increment-worker-").append(mapping.getId()).toString());
- worker.setDaemon(false);
- worker.start();
- }
- @Override
- public void close(String metaId) {
- Listener listener = map.get(metaId);
- if (null != listener) {
- bufferActuatorRouter.unbind(metaId);
- listener.close();
- }
- map.remove(metaId);
- publishClosedEvent(metaId);
- logger.info("关闭成功:{}", metaId);
- }
- @Override
- public void onApplicationEvent(RefreshOffsetEvent event) {
- List<ChangedOffset> offsetList = event.getOffsetList();
- if (!CollectionUtils.isEmpty(offsetList)) {
- offsetList.forEach(offset -> {
- if (offset.isRefreshOffset() && map.containsKey(offset.getMetaId())) {
- map.get(offset.getMetaId()).refreshEvent(offset);
- }
- });
- }
- }
- @Override
- public void run() {
- // 定时同步增量信息
- map.values().forEach(listener -> listener.flushEvent());
- }
- private Listener getListener(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta) {
- ConnectorConfig connectorConfig = connector.getConfig();
- ListenerConfig listenerConfig = mapping.getListener();
- String listenerType = listenerConfig.getListenerType();
- Listener listener = connectorFactory.getListener(connectorConfig.getConnectorType(), listenerType);
- if (null == listener) {
- throw new ManagerException(String.format("Unsupported listener type \"%s\".", connectorConfig.getConnectorType()));
- }
- // 默认定时抽取
- if (ListenerTypeEnum.isTiming(listenerType) && listener instanceof AbstractQuartzListener) {
- AbstractQuartzListener quartzListener = (AbstractQuartzListener) listener;
- quartzListener.setCommands(list.stream().map(t -> new TableGroupQuartzCommand(t.getSourceTable(), t.getCommand())).collect(Collectors.toList()));
- quartzListener.register(new QuartzConsumer().init(bufferActuatorRouter, profileComponent, logService, meta.getId(), mapping, list));
- }
- // 基于日志抽取
- if (ListenerTypeEnum.isLog(listenerType) && listener instanceof AbstractListener) {
- AbstractListener abstractListener = (AbstractListener) listener;
- abstractListener.register(new LogConsumer().init(bufferActuatorRouter, profileComponent, logService, meta.getId(), mapping, list));
- }
- if (listener instanceof AbstractListener) {
- AbstractListener abstractListener = (AbstractListener) listener;
- Set<String> filterTable = new HashSet<>();
- List<Table> sourceTable = new ArrayList<>();
- list.forEach(t -> {
- Table table = t.getSourceTable();
- if (!filterTable.contains(t.getName())) {
- sourceTable.add(table);
- }
- filterTable.add(table.getName());
- });
- abstractListener.setConnectorService(connectorFactory.getConnectorService(connectorConfig.getConnectorType()));
- abstractListener.setConnectorInstance(connectorFactory.connect(connectorConfig));
- abstractListener.setScheduledTaskService(scheduledTaskService);
- abstractListener.setConnectorConfig(connectorConfig);
- abstractListener.setListenerConfig(listenerConfig);
- abstractListener.setFilterTable(filterTable);
- abstractListener.setSourceTable(sourceTable);
- abstractListener.setSnapshot(meta.getSnapshot());
- abstractListener.setMetaId(meta.getId());
- }
- listener.init();
- return listener;
- }
- }
|