ParserFactory.java 15 KB


  1. package org.dbsyncer.parser;
  2. import com.alibaba.fastjson.JSONException;
  3. import com.alibaba.fastjson.JSONObject;
  4. import org.dbsyncer.cache.CacheService;
  5. import org.dbsyncer.common.event.RowChangedEvent;
  6. import org.dbsyncer.common.model.Result;
  7. import org.dbsyncer.common.util.CollectionUtils;
  8. import org.dbsyncer.common.util.JsonUtil;
  9. import org.dbsyncer.connector.ConnectorFactory;
  10. import org.dbsyncer.connector.ConnectorMapper;
  11. import org.dbsyncer.connector.config.CommandConfig;
  12. import org.dbsyncer.connector.config.ConnectorConfig;
  13. import org.dbsyncer.connector.config.ReaderConfig;
  14. import org.dbsyncer.connector.config.WriterBatchConfig;
  15. import org.dbsyncer.connector.constant.ConnectorConstant;
  16. import org.dbsyncer.connector.enums.ConnectorEnum;
  17. import org.dbsyncer.connector.enums.FilterEnum;
  18. import org.dbsyncer.connector.enums.OperationEnum;
  19. import org.dbsyncer.connector.model.Field;
  20. import org.dbsyncer.connector.model.MetaInfo;
  21. import org.dbsyncer.connector.model.Table;
  22. import org.dbsyncer.listener.enums.QuartzFilterEnum;
  23. import org.dbsyncer.parser.enums.ConvertEnum;
  24. import org.dbsyncer.parser.event.FullRefreshEvent;
  25. import org.dbsyncer.parser.logger.LogService;
  26. import org.dbsyncer.parser.logger.LogType;
  27. import org.dbsyncer.parser.model.*;
  28. import org.dbsyncer.parser.strategy.FlushStrategy;
  29. import org.dbsyncer.parser.strategy.ParserStrategy;
  30. import org.dbsyncer.parser.util.ConvertUtil;
  31. import org.dbsyncer.parser.util.PickerUtil;
  32. import org.dbsyncer.plugin.PluginFactory;
  33. import org.dbsyncer.storage.enums.StorageDataStatusEnum;
  34. import org.slf4j.Logger;
  35. import org.slf4j.LoggerFactory;
  36. import org.springframework.beans.factory.annotation.Autowired;
  37. import org.springframework.beans.factory.annotation.Qualifier;
  38. import org.springframework.context.ApplicationContext;
  39. import org.springframework.stereotype.Component;
  40. import org.springframework.util.Assert;
  41. import java.time.Instant;
  42. import java.util.ArrayList;
  43. import java.util.Arrays;
  44. import java.util.List;
  45. import java.util.Map;
  46. import java.util.concurrent.CountDownLatch;
  47. import java.util.concurrent.Executor;
  48. import java.util.concurrent.ExecutorService;
  49. /**
  50. * @author AE86
  51. * @version 1.0.0
  52. * @date 2019/9/29 22:38
  53. */
  54. @Component
  55. public class ParserFactory implements Parser {
  56. private final Logger logger = LoggerFactory.getLogger(getClass());
  57. @Autowired
  58. private ConnectorFactory connectorFactory;
  59. @Autowired
  60. private PluginFactory pluginFactory;
  61. @Autowired
  62. private CacheService cacheService;
  63. @Autowired
  64. private LogService logService;
  65. @Autowired
  66. private FlushStrategy flushStrategy;
  67. @Autowired
  68. @Qualifier("taskExecutor")
  69. private Executor taskExecutor;
  70. @Qualifier("webApplicationContext")
  71. @Autowired
  72. private ApplicationContext applicationContext;
  73. @Autowired
  74. private ParserStrategy parserStrategy;
  75. @Override
  76. public ConnectorMapper connect(ConnectorConfig config) {
  77. return connectorFactory.connect(config);
  78. }
  79. @Override
  80. public boolean refreshConnectorConfig(ConnectorConfig config) {
  81. return connectorFactory.refresh(config);
  82. }
  83. @Override
  84. public boolean isAliveConnectorConfig(ConnectorConfig config) {
  85. boolean alive = false;
  86. try {
  87. alive = connectorFactory.isAlive(config);
  88. } catch (Exception e) {
  89. LogType.ConnectorLog logType = LogType.ConnectorLog.FAILED;
  90. logService.log(logType, "%s%s", logType.getName(), e.getMessage());
  91. }
  92. // 断线重连
  93. if (!alive) {
  94. try {
  95. alive = connectorFactory.refresh(config);
  96. } catch (Exception e) {
  97. logger.error(e.getMessage());
  98. }
  99. if (alive) {
  100. logger.info(LogType.ConnectorLog.RECONNECT_SUCCESS.getMessage());
  101. }
  102. }
  103. return alive;
  104. }
  105. @Override
  106. public List<Table> getTable(ConnectorMapper config) {
  107. return connectorFactory.getTable(config);
  108. }
  109. @Override
  110. public MetaInfo getMetaInfo(String connectorId, String tableName) {
  111. Connector connector = getConnector(connectorId);
  112. ConnectorMapper connectorMapper = connectorFactory.connect(connector.getConfig());
  113. MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableName);
  114. if (!CollectionUtils.isEmpty(connector.getTable())) {
  115. for (Table t : connector.getTable()) {
  116. if (t.getName().equals(tableName)) {
  117. metaInfo.setTableType(t.getType());
  118. break;
  119. }
  120. }
  121. }
  122. return metaInfo;
  123. }
  124. @Override
  125. public Map<String, String> getCommand(Mapping mapping, TableGroup tableGroup) {
  126. ConnectorConfig sConnConfig = getConnectorConfig(mapping.getSourceConnectorId());
  127. ConnectorConfig tConnConfig = getConnectorConfig(mapping.getTargetConnectorId());
  128. Table sourceTable = tableGroup.getSourceTable();
  129. Table targetTable = tableGroup.getTargetTable();
  130. Table sTable = new Table(sourceTable.getName(), sourceTable.getType(), new ArrayList<>());
  131. Table tTable = new Table(targetTable.getName(), targetTable.getType(), new ArrayList<>());
  132. List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
  133. if (!CollectionUtils.isEmpty(fieldMapping)) {
  134. fieldMapping.forEach(m -> {
  135. if (null != m.getSource()) {
  136. sTable.getColumn().add(m.getSource());
  137. }
  138. if (null != m.getTarget()) {
  139. tTable.getColumn().add(m.getTarget());
  140. }
  141. });
  142. }
  143. final CommandConfig sourceConfig = new CommandConfig(sConnConfig.getConnectorType(), sTable, sourceTable, sConnConfig, tableGroup.getFilter());
  144. final CommandConfig targetConfig = new CommandConfig(tConnConfig.getConnectorType(), tTable, targetTable, tConnConfig);
  145. // 获取连接器同步参数
  146. Map<String, String> command = connectorFactory.getCommand(sourceConfig, targetConfig);
  147. return command;
  148. }
  149. @Override
  150. public long getCount(String connectorId, Map<String, String> command) {
  151. ConnectorMapper connectorMapper = connectorFactory.connect(getConnectorConfig(connectorId));
  152. return connectorFactory.getCount(connectorMapper, command);
  153. }
  154. @Override
  155. public Connector parseConnector(String json) {
  156. try {
  157. JSONObject conn = JsonUtil.parseObject(json);
  158. JSONObject config = (JSONObject) conn.remove("config");
  159. Connector connector = JsonUtil.jsonToObj(conn.toString(), Connector.class);
  160. Assert.notNull(connector, "Connector can not be null.");
  161. String connectorType = config.getString("connectorType");
  162. Class<?> configClass = ConnectorEnum.getConfigClass(connectorType);
  163. ConnectorConfig obj = (ConnectorConfig) JsonUtil.jsonToObj(config.toString(), configClass);
  164. connector.setConfig(obj);
  165. return connector;
  166. } catch (JSONException e) {
  167. logger.error(e.getMessage());
  168. throw new ParserException(e.getMessage());
  169. }
  170. }
  171. @Override
  172. public <T> T parseObject(String json, Class<T> clazz) {
  173. T t = JsonUtil.jsonToObj(json, clazz);
  174. return t;
  175. }
  176. @Override
  177. public List<ConnectorEnum> getConnectorEnumAll() {
  178. return Arrays.asList(ConnectorEnum.values());
  179. }
  180. @Override
  181. public List<OperationEnum> getOperationEnumAll() {
  182. return Arrays.asList(OperationEnum.values());
  183. }
  184. @Override
  185. public List<QuartzFilterEnum> getQuartzFilterEnumAll() {
  186. return Arrays.asList(QuartzFilterEnum.values());
  187. }
  188. @Override
  189. public List<FilterEnum> getFilterEnumAll() {
  190. return Arrays.asList(FilterEnum.values());
  191. }
  192. @Override
  193. public List<ConvertEnum> getConvertEnumAll() {
  194. return Arrays.asList(ConvertEnum.values());
  195. }
  196. @Override
  197. public List<StorageDataStatusEnum> getStorageDataStatusEnumAll() {
  198. return Arrays.asList(StorageDataStatusEnum.values());
  199. }
  200. @Override
  201. public void execute(Task task, Mapping mapping, TableGroup tableGroup, ExecutorService executorService) {
  202. final String metaId = task.getId();
  203. final String sourceConnectorId = mapping.getSourceConnectorId();
  204. final String targetConnectorId = mapping.getTargetConnectorId();
  205. ConnectorConfig sConfig = getConnectorConfig(sourceConnectorId);
  206. Assert.notNull(sConfig, "数据源配置不能为空.");
  207. ConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
  208. Assert.notNull(tConfig, "目标源配置不能为空.");
  209. TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
  210. Map<String, String> command = group.getCommand();
  211. Assert.notEmpty(command, "执行命令不能为空.");
  212. List<FieldMapping> fieldMapping = group.getFieldMapping();
  213. String sTableName = group.getSourceTable().getName();
  214. String tTableName = group.getTargetTable().getName();
  215. Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
  216. // 获取同步字段
  217. Picker picker = new Picker(fieldMapping);
  218. String pk = picker.getSourcePrimaryKeyName(sConfig);
  219. int pageSize = mapping.getReadNum();
  220. int batchSize = mapping.getBatchNum();
  221. ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
  222. ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
  223. for (; ; ) {
  224. if (!task.isRunning()) {
  225. logger.warn("任务被中止:{}", metaId);
  226. break;
  227. }
  228. // 1、获取数据源数据
  229. Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), task.getCursor(), task.getPageIndex(), pageSize));
  230. List<Map> data = reader.getSuccessData();
  231. if (CollectionUtils.isEmpty(data)) {
  232. logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
  233. break;
  234. }
  235. // 2、映射字段
  236. List<Map> target = picker.pickData(data);
  237. // 3、参数转换
  238. ConvertUtil.convert(group.getConvert(), target);
  239. // 4、插件转换
  240. pluginFactory.convert(group.getPlugin(), data, target);
  241. // 5、写入目标源
  242. BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, tTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
  243. Result writer = writeBatch(batchWriter, executorService);
  244. // 6、更新结果
  245. task.setPageIndex(task.getPageIndex() + 1);
  246. task.setCursor(getLastCursor(data, pk));
  247. flush(task, writer);
  248. // 7、判断尾页
  249. if (data.size() < pageSize) {
  250. logger.info("完成全量:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
  251. break;
  252. }
  253. }
  254. }
  255. @Override
  256. public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event) {
  257. logger.debug("Table[{}] {}, data:{}", event.getSourceTableName(), event.getEvent(), event.getDataMap());
  258. // 1、获取映射字段
  259. final Picker picker = new Picker(tableGroup.getFieldMapping());
  260. final Map target = picker.pickData(event.getDataMap());
  261. // 2、参数转换
  262. ConvertUtil.convert(tableGroup.getConvert(), target);
  263. // 3、插件转换
  264. pluginFactory.convert(tableGroup.getPlugin(), event.getEvent(), event.getDataMap(), target);
  265. // 4、处理数据
  266. parserStrategy.execute(tableGroup.getId(), event.getEvent(), target);
  267. }
  268. /**
  269. * 批量写入
  270. *
  271. * @param batchWriter
  272. * @return
  273. */
  274. @Override
  275. public Result writeBatch(BatchWriter batchWriter) {
  276. return writeBatch(batchWriter, taskExecutor);
  277. }
  278. /**
  279. * 批量写入
  280. *
  281. * @param batchWriter
  282. * @param taskExecutor
  283. * @return
  284. */
  285. private Result writeBatch(BatchWriter batchWriter, Executor taskExecutor) {
  286. List<Map> dataList = batchWriter.getDataList();
  287. int batchSize = batchWriter.getBatchSize();
  288. String tableName = batchWriter.getTableName();
  289. String event = batchWriter.getEvent();
  290. Map<String, String> command = batchWriter.getCommand();
  291. List<Field> fields = batchWriter.getFields();
  292. // 总数
  293. int total = dataList.size();
  294. // 单次任务
  295. if (total <= batchSize) {
  296. return connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, dataList));
  297. }
  298. // 批量任务, 拆分
  299. int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
  300. final Result result = new Result();
  301. final CountDownLatch latch = new CountDownLatch(taskSize);
  302. int fromIndex = 0;
  303. int toIndex = batchSize;
  304. for (int i = 0; i < taskSize; i++) {
  305. final List<Map> data;
  306. if (toIndex > total) {
  307. toIndex = fromIndex + (total % batchSize);
  308. data = dataList.subList(fromIndex, toIndex);
  309. } else {
  310. data = dataList.subList(fromIndex, toIndex);
  311. fromIndex += batchSize;
  312. toIndex += batchSize;
  313. }
  314. taskExecutor.execute(() -> {
  315. try {
  316. Result w = connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, data));
  317. result.addSuccessData(w.getSuccessData());
  318. result.addFailData(w.getFailData());
  319. result.getError().append(w.getError());
  320. } catch (Exception e) {
  321. logger.error(e.getMessage());
  322. } finally {
  323. latch.countDown();
  324. }
  325. });
  326. }
  327. try {
  328. latch.await();
  329. } catch (InterruptedException e) {
  330. logger.error(e.getMessage());
  331. }
  332. return result;
  333. }
  334. /**
  335. * 更新缓存
  336. *
  337. * @param task
  338. * @param writer
  339. */
  340. private void flush(Task task, Result writer) {
  341. flushStrategy.flushFullData(task.getId(), writer, ConnectorConstant.OPERTION_INSERT);
  342. // 发布刷新事件给FullExtractor
  343. task.setEndTime(Instant.now().toEpochMilli());
  344. applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
  345. }
  346. /**
  347. * 获取连接器
  348. *
  349. * @param connectorId
  350. * @return
  351. */
  352. private Connector getConnector(String connectorId) {
  353. Assert.hasText(connectorId, "Connector id can not be empty.");
  354. Connector conn = cacheService.get(connectorId, Connector.class);
  355. Assert.notNull(conn, "Connector can not be null.");
  356. return conn;
  357. }
  358. /**
  359. * 获取连接配置
  360. *
  361. * @param connectorId
  362. * @return
  363. */
  364. private ConnectorConfig getConnectorConfig(String connectorId) {
  365. return getConnector(connectorId).getConfig();
  366. }
  367. /**
  368. * 获取最新游标值
  369. *
  370. * @param data
  371. * @param pk
  372. * @return
  373. */
  374. private Object getLastCursor(List<Map> data, String pk) {
  375. return CollectionUtils.isEmpty(data) ? null : data.get(data.size() - 1).get(pk);
  376. }
  377. }