ParserComponentImpl.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. /**
  2. * DBSyncer Copyright 2020-2024 All Rights Reserved.
  3. */
  4. package org.dbsyncer.parser.impl;
  5. import org.dbsyncer.common.model.Result;
  6. import org.dbsyncer.common.util.CollectionUtils;
  7. import org.dbsyncer.common.util.StringUtil;
  8. import org.dbsyncer.connector.base.ConnectorFactory;
  9. import org.dbsyncer.sdk.config.CommandConfig;
  10. import org.dbsyncer.sdk.config.ReaderConfig;
  11. import org.dbsyncer.sdk.config.WriterBatchConfig;
  12. import org.dbsyncer.sdk.constant.ConnectorConstant;
  13. import org.dbsyncer.sdk.model.Field;
  14. import org.dbsyncer.sdk.model.MetaInfo;
  15. import org.dbsyncer.sdk.model.Table;
  16. import org.dbsyncer.sdk.util.PrimaryKeyUtil;
  17. import org.dbsyncer.parser.ParserComponent;
  18. import org.dbsyncer.parser.ProfileComponent;
  19. import org.dbsyncer.parser.event.FullRefreshEvent;
  20. import org.dbsyncer.parser.model.BatchWriter;
  21. import org.dbsyncer.parser.model.Connector;
  22. import org.dbsyncer.parser.model.FieldMapping;
  23. import org.dbsyncer.parser.model.Mapping;
  24. import org.dbsyncer.parser.model.Picker;
  25. import org.dbsyncer.parser.model.TableGroup;
  26. import org.dbsyncer.parser.model.Task;
  27. import org.dbsyncer.parser.strategy.FlushStrategy;
  28. import org.dbsyncer.parser.util.ConvertUtil;
  29. import org.dbsyncer.parser.util.PickerUtil;
  30. import org.dbsyncer.plugin.PluginFactory;
  31. import org.dbsyncer.sdk.connector.ConnectorInstance;
  32. import org.dbsyncer.sdk.model.ConnectorConfig;
  33. import org.dbsyncer.plugin.impl.FullPluginContext;
  34. import org.dbsyncer.sdk.plugin.PluginContext;
  35. import org.slf4j.Logger;
  36. import org.slf4j.LoggerFactory;
  37. import org.springframework.context.ApplicationContext;
  38. import org.springframework.stereotype.Component;
  39. import org.springframework.util.Assert;
  40. import javax.annotation.Resource;
  41. import java.time.Instant;
  42. import java.util.ArrayList;
  43. import java.util.List;
  44. import java.util.Map;
  45. import java.util.concurrent.CountDownLatch;
  46. import java.util.concurrent.Executor;
  47. /**
  48. * @author AE86
  49. * @version 1.0.0
  50. * @date 2019/9/29 22:38
  51. */
  52. @Component
  53. public class ParserComponentImpl implements ParserComponent {
  54. private final Logger logger = LoggerFactory.getLogger(getClass());
  55. @Resource
  56. private ConnectorFactory connectorFactory;
  57. @Resource
  58. private PluginFactory pluginFactory;
  59. @Resource
  60. private FlushStrategy flushStrategy;
  61. @Resource
  62. private ProfileComponent profileComponent;
  63. @Resource
  64. private ApplicationContext applicationContext;
  65. @Override
  66. public MetaInfo getMetaInfo(String connectorId, String tableName) {
  67. Connector connector = profileComponent.getConnector(connectorId);
  68. ConnectorInstance connectorInstance = connectorFactory.connect(connector.getConfig());
  69. MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorInstance, tableName);
  70. if (!CollectionUtils.isEmpty(connector.getTable())) {
  71. for (Table t : connector.getTable()) {
  72. if (t.getName().equals(tableName)) {
  73. metaInfo.setTableType(t.getType());
  74. metaInfo.setSql(t.getSql());
  75. break;
  76. }
  77. }
  78. }
  79. return metaInfo;
  80. }
  81. @Override
  82. public Map<String, String> getCommand(Mapping mapping, TableGroup tableGroup) {
  83. ConnectorConfig sConnConfig = getConnectorConfig(mapping.getSourceConnectorId());
  84. ConnectorConfig tConnConfig = getConnectorConfig(mapping.getTargetConnectorId());
  85. Table sourceTable = tableGroup.getSourceTable();
  86. Table targetTable = tableGroup.getTargetTable();
  87. Table sTable = sourceTable.clone().setColumn(new ArrayList<>());
  88. Table tTable = targetTable.clone().setColumn(new ArrayList<>());
  89. List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
  90. if (!CollectionUtils.isEmpty(fieldMapping)) {
  91. fieldMapping.forEach(m -> {
  92. if (null != m.getSource()) {
  93. sTable.getColumn().add(m.getSource());
  94. }
  95. if (null != m.getTarget()) {
  96. tTable.getColumn().add(m.getTarget());
  97. }
  98. });
  99. }
  100. final CommandConfig sourceConfig = new CommandConfig(sConnConfig.getConnectorType(), sTable, sConnConfig, tableGroup.getFilter());
  101. final CommandConfig targetConfig = new CommandConfig(tConnConfig.getConnectorType(), tTable, tConnConfig, null);
  102. // 获取连接器同步参数
  103. return connectorFactory.getCommand(sourceConfig, targetConfig);
  104. }
  105. @Override
  106. public long getCount(String connectorId, Map<String, String> command) {
  107. ConnectorInstance connectorInstance = connectorFactory.connect(getConnectorConfig(connectorId));
  108. return connectorFactory.getCount(connectorInstance, command);
  109. }
  110. @Override
  111. public void execute(Task task, Mapping mapping, TableGroup tableGroup, Executor executor) {
  112. final String metaId = task.getId();
  113. final String sourceConnectorId = mapping.getSourceConnectorId();
  114. final String targetConnectorId = mapping.getTargetConnectorId();
  115. ConnectorConfig sConfig = getConnectorConfig(sourceConnectorId);
  116. Assert.notNull(sConfig, "数据源配置不能为空.");
  117. ConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
  118. Assert.notNull(tConfig, "目标源配置不能为空.");
  119. TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
  120. Map<String, String> command = group.getCommand();
  121. Assert.notEmpty(command, "执行命令不能为空.");
  122. List<FieldMapping> fieldMapping = group.getFieldMapping();
  123. Table sourceTable = group.getSourceTable();
  124. String sTableName = sourceTable.getName();
  125. String tTableName = group.getTargetTable().getName();
  126. Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
  127. // 获取同步字段
  128. Picker picker = new Picker(fieldMapping);
  129. List<String> primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(sourceTable);
  130. boolean supportedCursor = StringUtil.isNotBlank(command.get(ConnectorConstant.OPERTION_QUERY_CURSOR));
  131. int pageSize = mapping.getReadNum();
  132. int batchSize = mapping.getBatchNum();
  133. final ConnectorInstance sConnectorInstance = connectorFactory.connect(sConfig);
  134. final ConnectorInstance tConnectorInstance = connectorFactory.connect(tConfig);
  135. final String event = ConnectorConstant.OPERTION_INSERT;
  136. final FullPluginContext context = new FullPluginContext(sConnectorInstance, tConnectorInstance, sTableName, tTableName, event, mapping.getPluginExtInfo());
  137. for (; ; ) {
  138. if (!task.isRunning()) {
  139. logger.warn("任务被中止:{}", metaId);
  140. break;
  141. }
  142. // 1、获取数据源数据
  143. ReaderConfig readerConfig = new ReaderConfig(sourceTable, command, new ArrayList<>(), supportedCursor, task.getCursors(), task.getPageIndex(), pageSize);
  144. Result reader = connectorFactory.reader(sConnectorInstance, readerConfig);
  145. List<Map> source = reader.getSuccessData();
  146. if (CollectionUtils.isEmpty(source)) {
  147. logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
  148. break;
  149. }
  150. // 2、映射字段
  151. List<Map> target = picker.pickTargetData(source);
  152. // 3、参数转换
  153. ConvertUtil.convert(group.getConvert(), target);
  154. // 4、插件转换
  155. context.setSourceList(source);
  156. context.setTargetList(target);
  157. pluginFactory.convert(group.getPlugin(), context);
  158. // 5、写入目标源
  159. BatchWriter batchWriter = new BatchWriter(tConnectorInstance, command, tTableName, event, picker.getTargetFields(), target, batchSize, mapping.isForceUpdate());
  160. Result result = writeBatch(context, batchWriter, executor);
  161. // 6、更新结果
  162. task.setPageIndex(task.getPageIndex() + 1);
  163. task.setCursors(PrimaryKeyUtil.getLastCursors(source, primaryKeys));
  164. result.setTableGroupId(tableGroup.getId());
  165. result.setTargetTableGroupName(tTableName);
  166. flush(task, result);
  167. // 7、同步完成后通知插件做后置处理
  168. pluginFactory.postProcessAfter(group.getPlugin(), context);
  169. // 8、判断尾页
  170. if (source.size() < pageSize) {
  171. logger.info("完成全量:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
  172. break;
  173. }
  174. }
  175. }
  176. @Override
  177. public Result writeBatch(PluginContext pluginContext, BatchWriter batchWriter, Executor executor) {
  178. final Result result = new Result();
  179. // 终止同步数据到目标源库
  180. if (pluginContext.isTerminated()) {
  181. result.getSuccessData().addAll(batchWriter.getDataList());
  182. return result;
  183. }
  184. List<Map> dataList = batchWriter.getDataList();
  185. int batchSize = batchWriter.getBatchSize();
  186. String tableName = batchWriter.getTableName();
  187. String event = batchWriter.getEvent();
  188. Map<String, String> command = batchWriter.getCommand();
  189. List<Field> fields = batchWriter.getFields();
  190. // 总数
  191. int total = dataList.size();
  192. // 单次任务
  193. if (total <= batchSize) {
  194. return connectorFactory.writer(batchWriter.getConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, dataList, batchWriter.isForceUpdate()));
  195. }
  196. // 批量任务, 拆分
  197. int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
  198. final CountDownLatch latch = new CountDownLatch(taskSize);
  199. int fromIndex = 0;
  200. int toIndex = batchSize;
  201. for (int i = 0; i < taskSize; i++) {
  202. final List<Map> data;
  203. if (toIndex > total) {
  204. toIndex = fromIndex + (total % batchSize);
  205. data = dataList.subList(fromIndex, toIndex);
  206. } else {
  207. data = dataList.subList(fromIndex, toIndex);
  208. fromIndex += batchSize;
  209. toIndex += batchSize;
  210. }
  211. executor.execute(() -> {
  212. try {
  213. Result w = connectorFactory.writer(batchWriter.getConnectorInstance(), new WriterBatchConfig(tableName, event, command, fields, data, batchWriter.isForceUpdate()));
  214. result.addSuccessData(w.getSuccessData());
  215. result.addFailData(w.getFailData());
  216. result.getError().append(w.getError());
  217. } catch (Exception e) {
  218. logger.error(e.getMessage());
  219. } finally {
  220. latch.countDown();
  221. }
  222. });
  223. }
  224. try {
  225. latch.await();
  226. } catch (InterruptedException e) {
  227. logger.error(e.getMessage());
  228. }
  229. return result;
  230. }
  231. /**
  232. * 更新缓存
  233. *
  234. * @param task
  235. * @param result
  236. */
  237. private void flush(Task task, Result result) {
  238. flushStrategy.flushFullData(task.getId(), result, ConnectorConstant.OPERTION_INSERT);
  239. // 发布刷新事件给FullExtractor
  240. task.setEndTime(Instant.now().toEpochMilli());
  241. applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
  242. }
  243. /**
  244. * 获取连接配置
  245. *
  246. * @param connectorId
  247. * @return
  248. */
  249. private ConnectorConfig getConnectorConfig(String connectorId) {
  250. return profileComponent.getConnector(connectorId).getConfig();
  251. }
  252. }