ParserFactory.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. package org.dbsyncer.parser;
  2. import com.alibaba.fastjson.JSONException;
  3. import com.alibaba.fastjson.JSONObject;
  4. import org.dbsyncer.cache.CacheService;
  5. import org.dbsyncer.common.event.RowChangedEvent;
  6. import org.dbsyncer.common.model.AbstractConnectorConfig;
  7. import org.dbsyncer.common.model.FullConvertContext;
  8. import org.dbsyncer.common.model.Result;
  9. import org.dbsyncer.common.spi.ConnectorMapper;
  10. import org.dbsyncer.common.spi.ConvertContext;
  11. import org.dbsyncer.common.util.CollectionUtils;
  12. import org.dbsyncer.common.util.JsonUtil;
  13. import org.dbsyncer.connector.ConnectorFactory;
  14. import org.dbsyncer.connector.config.CommandConfig;
  15. import org.dbsyncer.connector.config.ReaderConfig;
  16. import org.dbsyncer.connector.config.WriterBatchConfig;
  17. import org.dbsyncer.connector.constant.ConnectorConstant;
  18. import org.dbsyncer.connector.enums.ConnectorEnum;
  19. import org.dbsyncer.connector.enums.FilterEnum;
  20. import org.dbsyncer.connector.enums.OperationEnum;
  21. import org.dbsyncer.connector.model.Field;
  22. import org.dbsyncer.connector.model.MetaInfo;
  23. import org.dbsyncer.connector.model.Table;
  24. import org.dbsyncer.connector.util.PrimaryKeyUtil;
  25. import org.dbsyncer.listener.enums.QuartzFilterEnum;
  26. import org.dbsyncer.parser.enums.ConvertEnum;
  27. import org.dbsyncer.parser.event.FullRefreshEvent;
  28. import org.dbsyncer.parser.logger.LogService;
  29. import org.dbsyncer.parser.logger.LogType;
  30. import org.dbsyncer.parser.model.*;
  31. import org.dbsyncer.parser.strategy.FlushStrategy;
  32. import org.dbsyncer.parser.strategy.ParserStrategy;
  33. import org.dbsyncer.parser.util.ConvertUtil;
  34. import org.dbsyncer.parser.util.PickerUtil;
  35. import org.dbsyncer.plugin.PluginFactory;
  36. import org.dbsyncer.storage.enums.StorageDataStatusEnum;
  37. import org.slf4j.Logger;
  38. import org.slf4j.LoggerFactory;
  39. import org.springframework.beans.factory.annotation.Autowired;
  40. import org.springframework.beans.factory.annotation.Qualifier;
  41. import org.springframework.context.ApplicationContext;
  42. import org.springframework.stereotype.Component;
  43. import org.springframework.util.Assert;
  44. import java.time.Instant;
  45. import java.util.ArrayList;
  46. import java.util.Arrays;
  47. import java.util.List;
  48. import java.util.Map;
  49. import java.util.concurrent.CountDownLatch;
  50. import java.util.concurrent.Executor;
  51. import java.util.concurrent.ExecutorService;
  52. /**
  53. * @author AE86
  54. * @version 1.0.0
  55. * @date 2019/9/29 22:38
  56. */
  57. @Component
  58. public class ParserFactory implements Parser {
  59. private final Logger logger = LoggerFactory.getLogger(getClass());
  60. @Autowired
  61. private ConnectorFactory connectorFactory;
  62. @Autowired
  63. private PluginFactory pluginFactory;
  64. @Autowired
  65. private CacheService cacheService;
  66. @Autowired
  67. private LogService logService;
  68. @Autowired
  69. private FlushStrategy flushStrategy;
  70. @Autowired
  71. @Qualifier("taskExecutor")
  72. private Executor taskExecutor;
  73. @Qualifier("webApplicationContext")
  74. @Autowired
  75. private ApplicationContext applicationContext;
  76. @Autowired
  77. private ParserStrategy parserStrategy;
  78. @Override
  79. public ConnectorMapper connect(AbstractConnectorConfig config) {
  80. return connectorFactory.connect(config);
  81. }
  82. @Override
  83. public boolean refreshConnectorConfig(AbstractConnectorConfig config) {
  84. return connectorFactory.refresh(config);
  85. }
  86. @Override
  87. public boolean isAliveConnectorConfig(AbstractConnectorConfig config) {
  88. boolean alive = false;
  89. try {
  90. alive = connectorFactory.isAlive(config);
  91. } catch (Exception e) {
  92. LogType.ConnectorLog logType = LogType.ConnectorLog.FAILED;
  93. logService.log(logType, "%s%s", logType.getName(), e.getMessage());
  94. }
  95. // 断线重连
  96. if (!alive) {
  97. try {
  98. alive = connectorFactory.refresh(config);
  99. } catch (Exception e) {
  100. logger.error(e.getMessage());
  101. }
  102. if (alive) {
  103. logger.info(LogType.ConnectorLog.RECONNECT_SUCCESS.getMessage());
  104. }
  105. }
  106. return alive;
  107. }
  108. @Override
  109. public List<Table> getTable(ConnectorMapper config) {
  110. return connectorFactory.getTable(config);
  111. }
  112. @Override
  113. public MetaInfo getMetaInfo(String connectorId, String tableName) {
  114. Connector connector = getConnector(connectorId);
  115. ConnectorMapper connectorMapper = connectorFactory.connect(connector.getConfig());
  116. MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableName);
  117. if (!CollectionUtils.isEmpty(connector.getTable())) {
  118. for (Table t : connector.getTable()) {
  119. if (t.getName().equals(tableName)) {
  120. metaInfo.setTableType(t.getType());
  121. break;
  122. }
  123. }
  124. }
  125. return metaInfo;
  126. }
  127. @Override
  128. public Map<String, String> getCommand(Mapping mapping, TableGroup tableGroup) {
  129. AbstractConnectorConfig sConnConfig = getConnectorConfig(mapping.getSourceConnectorId());
  130. AbstractConnectorConfig tConnConfig = getConnectorConfig(mapping.getTargetConnectorId());
  131. Table sourceTable = tableGroup.getSourceTable();
  132. Table targetTable = tableGroup.getTargetTable();
  133. Table sTable = new Table(sourceTable.getName(), sourceTable.getType(), sourceTable.getPrimaryKey(), new ArrayList<>());
  134. Table tTable = new Table(targetTable.getName(), targetTable.getType(), targetTable.getPrimaryKey(), new ArrayList<>());
  135. List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
  136. if (!CollectionUtils.isEmpty(fieldMapping)) {
  137. fieldMapping.forEach(m -> {
  138. if (null != m.getSource()) {
  139. sTable.getColumn().add(m.getSource());
  140. }
  141. if (null != m.getTarget()) {
  142. tTable.getColumn().add(m.getTarget());
  143. }
  144. });
  145. }
  146. final CommandConfig sourceConfig = new CommandConfig(sConnConfig.getConnectorType(), sTable, sourceTable, sConnConfig, tableGroup.getFilter());
  147. final CommandConfig targetConfig = new CommandConfig(tConnConfig.getConnectorType(), tTable, targetTable, tConnConfig);
  148. // 获取连接器同步参数
  149. Map<String, String> command = connectorFactory.getCommand(sourceConfig, targetConfig);
  150. return command;
  151. }
  152. @Override
  153. public long getCount(String connectorId, Map<String, String> command) {
  154. ConnectorMapper connectorMapper = connectorFactory.connect(getConnectorConfig(connectorId));
  155. return connectorFactory.getCount(connectorMapper, command);
  156. }
  157. @Override
  158. public Connector parseConnector(String json) {
  159. try {
  160. JSONObject conn = JsonUtil.parseObject(json);
  161. JSONObject config = (JSONObject) conn.remove("config");
  162. Connector connector = JsonUtil.jsonToObj(conn.toString(), Connector.class);
  163. Assert.notNull(connector, "Connector can not be null.");
  164. String connectorType = config.getString("connectorType");
  165. Class<?> configClass = ConnectorEnum.getConfigClass(connectorType);
  166. AbstractConnectorConfig obj = (AbstractConnectorConfig) JsonUtil.jsonToObj(config.toString(), configClass);
  167. connector.setConfig(obj);
  168. return connector;
  169. } catch (JSONException e) {
  170. logger.error(e.getMessage());
  171. throw new ParserException(e.getMessage());
  172. }
  173. }
  174. @Override
  175. public <T> T parseObject(String json, Class<T> clazz) {
  176. T t = JsonUtil.jsonToObj(json, clazz);
  177. return t;
  178. }
  179. @Override
  180. public List<ConnectorEnum> getConnectorEnumAll() {
  181. return Arrays.asList(ConnectorEnum.values());
  182. }
  183. @Override
  184. public List<OperationEnum> getOperationEnumAll() {
  185. return Arrays.asList(OperationEnum.values());
  186. }
  187. @Override
  188. public List<QuartzFilterEnum> getQuartzFilterEnumAll() {
  189. return Arrays.asList(QuartzFilterEnum.values());
  190. }
  191. @Override
  192. public List<FilterEnum> getFilterEnumAll() {
  193. return Arrays.asList(FilterEnum.values());
  194. }
  195. @Override
  196. public List<ConvertEnum> getConvertEnumAll() {
  197. return Arrays.asList(ConvertEnum.values());
  198. }
  199. @Override
  200. public List<StorageDataStatusEnum> getStorageDataStatusEnumAll() {
  201. return Arrays.asList(StorageDataStatusEnum.values());
  202. }
  203. @Override
  204. public void execute(Task task, Mapping mapping, TableGroup tableGroup, ExecutorService executorService) {
  205. final String metaId = task.getId();
  206. final String sourceConnectorId = mapping.getSourceConnectorId();
  207. final String targetConnectorId = mapping.getTargetConnectorId();
  208. AbstractConnectorConfig sConfig = getConnectorConfig(sourceConnectorId);
  209. Assert.notNull(sConfig, "数据源配置不能为空.");
  210. AbstractConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
  211. Assert.notNull(tConfig, "目标源配置不能为空.");
  212. TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
  213. Map<String, String> command = group.getCommand();
  214. Assert.notEmpty(command, "执行命令不能为空.");
  215. List<FieldMapping> fieldMapping = group.getFieldMapping();
  216. String sTableName = group.getSourceTable().getName();
  217. String tTableName = group.getTargetTable().getName();
  218. Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
  219. // 获取同步字段
  220. Picker picker = new Picker(fieldMapping);
  221. String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(tableGroup.getSourceTable());
  222. int pageSize = mapping.getReadNum();
  223. int batchSize = mapping.getBatchNum();
  224. final ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
  225. final ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
  226. final String event = ConnectorConstant.OPERTION_INSERT;
  227. final FullConvertContext context = new FullConvertContext(sConnectorMapper, tConnectorMapper, sTableName, tTableName, event);
  228. for (; ; ) {
  229. if (!task.isRunning()) {
  230. logger.warn("任务被中止:{}", metaId);
  231. break;
  232. }
  233. // 1、获取数据源数据
  234. Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), task.getCursor(), task.getPageIndex(), pageSize));
  235. List<Map> source = reader.getSuccessData();
  236. if (CollectionUtils.isEmpty(source)) {
  237. logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
  238. break;
  239. }
  240. // 2、映射字段
  241. List<Map> target = picker.pickData(source);
  242. // 3、参数转换
  243. ConvertUtil.convert(group.getConvert(), target);
  244. // 4、插件转换
  245. context.setSourceList(source);
  246. context.setTargetList(target);
  247. pluginFactory.convert(group.getPlugin(), context);
  248. // 5、写入目标源
  249. BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, tTableName, event, picker.getTargetFields(), target, batchSize);
  250. Result result = writeBatch(context, batchWriter, executorService);
  251. // 6、同步完成后通知插件做后置处理
  252. pluginFactory.postProcessAfter(group.getPlugin(), context);
  253. // 7、更新结果
  254. task.setPageIndex(task.getPageIndex() + 1);
  255. task.setCursor(getLastCursor(source, pk));
  256. result.setTableGroupId(tableGroup.getId());
  257. result.setTargetTableGroupName(tTableName);
  258. flush(task, result);
  259. // 8、判断尾页
  260. if (source.size() < pageSize) {
  261. logger.info("完成全量:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
  262. break;
  263. }
  264. }
  265. }
  266. @Override
  267. public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event) {
  268. logger.debug("Table[{}] {}, data:{}", event.getSourceTableName(), event.getEvent(), event.getDataMap());
  269. parserStrategy.execute(tableGroup.getId(), event.getEvent(), event.getDataMap());
  270. }
  271. /**
  272. * 批量写入
  273. *
  274. * @param context
  275. * @param batchWriter
  276. * @return
  277. */
  278. @Override
  279. public Result writeBatch(ConvertContext context, BatchWriter batchWriter) {
  280. return writeBatch(context, batchWriter, taskExecutor);
  281. }
  282. /**
  283. * 批量写入
  284. *
  285. * @param context
  286. * @param batchWriter
  287. * @param taskExecutor
  288. * @return
  289. */
  290. private Result writeBatch(ConvertContext context, BatchWriter batchWriter, Executor taskExecutor) {
  291. final Result result = new Result();
  292. // 终止同步数据到目标源库
  293. if (context.isTerminated()) {
  294. result.getSuccessData().addAll(batchWriter.getDataList());
  295. return result;
  296. }
  297. List<Map> dataList = batchWriter.getDataList();
  298. int batchSize = batchWriter.getBatchSize();
  299. String tableName = batchWriter.getTableName();
  300. String event = batchWriter.getEvent();
  301. Map<String, String> command = batchWriter.getCommand();
  302. List<Field> fields = batchWriter.getFields();
  303. // 总数
  304. int total = dataList.size();
  305. // 单次任务
  306. if (total <= batchSize) {
  307. return connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, dataList));
  308. }
  309. // 批量任务, 拆分
  310. int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
  311. final CountDownLatch latch = new CountDownLatch(taskSize);
  312. int fromIndex = 0;
  313. int toIndex = batchSize;
  314. for (int i = 0; i < taskSize; i++) {
  315. final List<Map> data;
  316. if (toIndex > total) {
  317. toIndex = fromIndex + (total % batchSize);
  318. data = dataList.subList(fromIndex, toIndex);
  319. } else {
  320. data = dataList.subList(fromIndex, toIndex);
  321. fromIndex += batchSize;
  322. toIndex += batchSize;
  323. }
  324. taskExecutor.execute(() -> {
  325. try {
  326. Result w = connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, data));
  327. result.addSuccessData(w.getSuccessData());
  328. result.addFailData(w.getFailData());
  329. result.getError().append(w.getError());
  330. } catch (Exception e) {
  331. logger.error(e.getMessage());
  332. } finally {
  333. latch.countDown();
  334. }
  335. });
  336. }
  337. try {
  338. latch.await();
  339. } catch (InterruptedException e) {
  340. logger.error(e.getMessage());
  341. }
  342. return result;
  343. }
  344. /**
  345. * 更新缓存
  346. *
  347. * @param task
  348. * @param result
  349. */
  350. private void flush(Task task, Result result) {
  351. flushStrategy.flushFullData(task.getId(), result, ConnectorConstant.OPERTION_INSERT);
  352. // 发布刷新事件给FullExtractor
  353. task.setEndTime(Instant.now().toEpochMilli());
  354. applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
  355. }
  356. /**
  357. * 获取连接器
  358. *
  359. * @param connectorId
  360. * @return
  361. */
  362. private Connector getConnector(String connectorId) {
  363. Assert.hasText(connectorId, "Connector id can not be empty.");
  364. Connector conn = cacheService.get(connectorId, Connector.class);
  365. Assert.notNull(conn, "Connector can not be null.");
  366. return conn;
  367. }
  368. /**
  369. * 获取连接配置
  370. *
  371. * @param connectorId
  372. * @return
  373. */
  374. private AbstractConnectorConfig getConnectorConfig(String connectorId) {
  375. return getConnector(connectorId).getConfig();
  376. }
  377. /**
  378. * 获取最新游标值
  379. *
  380. * @param source
  381. * @param pk
  382. * @return
  383. */
  384. private Object getLastCursor(List<Map> source, String pk) {
  385. return CollectionUtils.isEmpty(source) ? null : source.get(source.size() - 1).get(pk);
  386. }
  387. }