123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- package org.dbsyncer.manager.puller;
- import org.dbsyncer.common.util.NumberUtil;
- import org.dbsyncer.common.util.StringUtil;
- import org.dbsyncer.connector.util.PrimaryKeyUtil;
- import org.dbsyncer.manager.Manager;
- import org.dbsyncer.parser.Parser;
- import org.dbsyncer.parser.enums.ParserEnum;
- import org.dbsyncer.parser.event.FullRefreshEvent;
- import org.dbsyncer.parser.logger.LogService;
- import org.dbsyncer.parser.logger.LogType;
- import org.dbsyncer.parser.model.Mapping;
- import org.dbsyncer.parser.model.Meta;
- import org.dbsyncer.parser.model.TableGroup;
- import org.dbsyncer.parser.model.Task;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.context.ApplicationListener;
- import org.springframework.stereotype.Component;
- import org.springframework.util.Assert;
- import javax.annotation.Resource;
- import java.time.Instant;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- /**
- * 全量同步
- *
- * @author AE86
- * @version 1.0.0
- * @date 2020/04/26 15:28
- */
- @Component
- public class FullPuller extends AbstractPuller implements ApplicationListener<FullRefreshEvent> {
- private final Logger logger = LoggerFactory.getLogger(getClass());
- @Resource
- private Parser parser;
- @Resource
- private Manager manager;
- @Resource
- private LogService logService;
- private Map<String, Task> map = new ConcurrentHashMap<>();
- @Override
- public void start(Mapping mapping) {
- Thread worker = new Thread(() -> {
- final String metaId = mapping.getMetaId();
- try {
- List<TableGroup> list = manager.getSortedTableGroupAll(mapping.getId());
- Assert.notEmpty(list, "映射关系不能为空");
- logger.info("开始全量同步:{}, {}", metaId, mapping.getName());
- map.putIfAbsent(metaId, new Task(metaId));
- final ExecutorService executor = Executors.newFixedThreadPool(mapping.getThreadNum());
- Task task = map.get(metaId);
- doTask(task, mapping, list, executor);
- } catch (Exception e) {
- logger.error(e.getMessage());
- logService.log(LogType.SystemLog.ERROR, e.getMessage());
- } finally {
- map.remove(metaId);
- publishClosedEvent(metaId);
- logger.info("结束全量同步:{}, {}", metaId, mapping.getName());
- }
- });
- worker.setName(new StringBuilder("full-worker-").append(mapping.getId()).toString());
- worker.setDaemon(false);
- worker.start();
- }
- @Override
- public void close(String metaId) {
- Task task = map.get(metaId);
- if (null != task) {
- task.stop();
- }
- }
- @Override
- public void onApplicationEvent(FullRefreshEvent event) {
- // 异步监听任务刷新事件
- flush(event.getTask());
- }
- private void doTask(Task task, Mapping mapping, List<TableGroup> list, ExecutorService executorService) {
- // 记录开始时间
- long now = Instant.now().toEpochMilli();
- task.setBeginTime(now);
- task.setEndTime(now);
- // 获取上次同步点
- Meta meta = manager.getMeta(task.getId());
- Map<String, String> snapshot = meta.getSnapshot();
- task.setPageIndex(NumberUtil.toInt(snapshot.get(ParserEnum.PAGE_INDEX.getCode()), ParserEnum.PAGE_INDEX.getDefaultValue()));
- // 反序列化游标值类型(通常为数字或字符串类型)
- String cursorValue = snapshot.get(ParserEnum.CURSOR.getCode());
- task.setCursors(PrimaryKeyUtil.getLastCursors(cursorValue));
- task.setTableGroupIndex(NumberUtil.toInt(snapshot.get(ParserEnum.TABLE_GROUP_INDEX.getCode()), ParserEnum.TABLE_GROUP_INDEX.getDefaultValue()));
- flush(task);
- int i = task.getTableGroupIndex();
- while (i < list.size()) {
- parser.execute(task, mapping, list.get(i), executorService);
- if (!task.isRunning()) {
- break;
- }
- task.setPageIndex(ParserEnum.PAGE_INDEX.getDefaultValue());
- task.setCursors(null);
- task.setTableGroupIndex(++i);
- flush(task);
- }
- // 记录结束时间
- task.setEndTime(Instant.now().toEpochMilli());
- task.setTableGroupIndex(ParserEnum.TABLE_GROUP_INDEX.getDefaultValue());
- flush(task);
- }
- private void flush(Task task) {
- Meta meta = manager.getMeta(task.getId());
- Assert.notNull(meta, "检查meta为空.");
- // 全量的过程中,有新数据则更新总数
- long finished = meta.getSuccess().get() + meta.getFail().get();
- if (meta.getTotal().get() < finished) {
- meta.getTotal().set(finished);
- }
- meta.setBeginTime(task.getBeginTime());
- meta.setEndTime(task.getEndTime());
- Map<String, String> snapshot = meta.getSnapshot();
- snapshot.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(task.getPageIndex()));
- snapshot.put(ParserEnum.CURSOR.getCode(), StringUtil.join(task.getCursors(), ","));
- snapshot.put(ParserEnum.TABLE_GROUP_INDEX.getCode(), String.valueOf(task.getTableGroupIndex()));
- manager.editConfigModel(meta);
- }
- }
|