ParserFactory.java 15 KB


  1. package org.dbsyncer.parser;
  2. import org.dbsyncer.cache.CacheService;
  3. import org.dbsyncer.common.event.RowChangedEvent;
  4. import org.dbsyncer.common.model.Result;
  5. import org.dbsyncer.common.util.CollectionUtils;
  6. import org.dbsyncer.common.util.JsonUtil;
  7. import org.dbsyncer.common.util.StringUtil;
  8. import org.dbsyncer.connector.ConnectorFactory;
  9. import org.dbsyncer.connector.ConnectorMapper;
  10. import org.dbsyncer.connector.config.*;
  11. import org.dbsyncer.connector.constant.ConnectorConstant;
  12. import org.dbsyncer.connector.enums.ConnectorEnum;
  13. import org.dbsyncer.connector.enums.FilterEnum;
  14. import org.dbsyncer.connector.enums.OperationEnum;
  15. import org.dbsyncer.connector.model.Field;
  16. import org.dbsyncer.connector.model.MetaInfo;
  17. import org.dbsyncer.connector.model.Table;
  18. import org.dbsyncer.listener.enums.QuartzFilterEnum;
  19. import org.dbsyncer.parser.enums.ConvertEnum;
  20. import org.dbsyncer.parser.enums.ParserEnum;
  21. import org.dbsyncer.parser.event.FullRefreshEvent;
  22. import org.dbsyncer.parser.flush.BufferActuator;
  23. import org.dbsyncer.parser.flush.FlushStrategy;
  24. import org.dbsyncer.parser.flush.model.WriterRequest;
  25. import org.dbsyncer.parser.logger.LogService;
  26. import org.dbsyncer.parser.logger.LogType;
  27. import org.dbsyncer.parser.model.*;
  28. import org.dbsyncer.parser.util.ConvertUtil;
  29. import org.dbsyncer.parser.util.PickerUtil;
  30. import org.dbsyncer.plugin.PluginFactory;
  31. import org.dbsyncer.storage.enums.StorageDataStatusEnum;
  32. import org.json.JSONException;
  33. import org.json.JSONObject;
  34. import org.slf4j.Logger;
  35. import org.slf4j.LoggerFactory;
  36. import org.springframework.beans.factory.annotation.Autowired;
  37. import org.springframework.beans.factory.annotation.Qualifier;
  38. import org.springframework.context.ApplicationContext;
  39. import org.springframework.stereotype.Component;
  40. import org.springframework.util.Assert;
  41. import java.time.Instant;
  42. import java.util.ArrayList;
  43. import java.util.Arrays;
  44. import java.util.List;
  45. import java.util.Map;
  46. import java.util.concurrent.CountDownLatch;
  47. import java.util.concurrent.Executor;
  48. /**
  49. * @author AE86
  50. * @version 1.0.0
  51. * @date 2019/9/29 22:38
  52. */
  53. @Component
  54. public class ParserFactory implements Parser {
  55. private final Logger logger = LoggerFactory.getLogger(getClass());
  56. @Autowired
  57. private ConnectorFactory connectorFactory;
  58. @Autowired
  59. private PluginFactory pluginFactory;
  60. @Autowired
  61. private CacheService cacheService;
  62. @Autowired
  63. private LogService logService;
  64. @Autowired
  65. private FlushStrategy flushStrategy;
  66. @Autowired
  67. @Qualifier("taskExecutor")
  68. private Executor taskExecutor;
  69. @Autowired
  70. private ApplicationContext applicationContext;
  71. @Autowired
  72. private BufferActuator writerBufferActuator;
  73. @Override
  74. public ConnectorMapper connect(ConnectorConfig config) {
  75. return connectorFactory.connect(config);
  76. }
  77. @Override
  78. public boolean refreshConnectorConfig(ConnectorConfig config) {
  79. return connectorFactory.refresh(config);
  80. }
  81. @Override
  82. public boolean isAliveConnectorConfig(ConnectorConfig config) {
  83. boolean alive = false;
  84. try {
  85. alive = connectorFactory.isAlive(config);
  86. } catch (Exception e) {
  87. LogType.ConnectorLog logType = LogType.ConnectorLog.FAILED;
  88. logService.log(logType, "%s%s", logType.getName(), e.getMessage());
  89. }
  90. // 断线重连
  91. if (!alive) {
  92. try {
  93. alive = connectorFactory.refresh(config);
  94. } catch (Exception e) {
  95. logger.error(e.getMessage());
  96. }
  97. if (alive) {
  98. logger.info(LogType.ConnectorLog.RECONNECT_SUCCESS.getMessage());
  99. }
  100. }
  101. return alive;
  102. }
  103. @Override
  104. public List<Table> getTable(ConnectorMapper config) {
  105. return connectorFactory.getTable(config);
  106. }
  107. @Override
  108. public MetaInfo getMetaInfo(String connectorId, String tableName) {
  109. Connector connector = getConnector(connectorId);
  110. ConnectorMapper connectorMapper = connectorFactory.connect(connector.getConfig());
  111. MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableName);
  112. if (!CollectionUtils.isEmpty(connector.getTable())) {
  113. for (Table t : connector.getTable()) {
  114. if (t.getName().equals(tableName)) {
  115. metaInfo.setTableType(t.getType());
  116. break;
  117. }
  118. }
  119. }
  120. return metaInfo;
  121. }
  122. @Override
  123. public Map<String, String> getCommand(Mapping mapping, TableGroup tableGroup) {
  124. ConnectorConfig connectorConfig = getConnectorConfig(mapping.getSourceConnectorId());
  125. String sType = connectorConfig.getConnectorType();
  126. String tType = getConnectorConfig(mapping.getTargetConnectorId()).getConnectorType();
  127. Table sourceTable = tableGroup.getSourceTable();
  128. Table targetTable = tableGroup.getTargetTable();
  129. Table sTable = new Table(sourceTable.getName(), sourceTable.getType(), new ArrayList<>());
  130. Table tTable = new Table(targetTable.getName(), targetTable.getType(), new ArrayList<>());
  131. List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
  132. if (!CollectionUtils.isEmpty(fieldMapping)) {
  133. fieldMapping.forEach(m -> {
  134. if (null != m.getSource()) {
  135. sTable.getColumn().add(m.getSource());
  136. }
  137. if (null != m.getTarget()) {
  138. tTable.getColumn().add(m.getTarget());
  139. }
  140. });
  141. }
  142. final CommandConfig sourceConfig = new CommandConfig(sType, sTable, sourceTable, tableGroup.getFilter(), connectorConfig);
  143. final CommandConfig targetConfig = new CommandConfig(tType, tTable, targetTable);
  144. // 获取连接器同步参数
  145. Map<String, String> command = connectorFactory.getCommand(sourceConfig, targetConfig);
  146. return command;
  147. }
  148. @Override
  149. public long getCount(String connectorId, Map<String, String> command) {
  150. ConnectorMapper connectorMapper = connectorFactory.connect(getConnectorConfig(connectorId));
  151. return connectorFactory.getCount(connectorMapper, command);
  152. }
  153. @Override
  154. public Connector parseConnector(String json) {
  155. try {
  156. JSONObject conn = new JSONObject(json);
  157. JSONObject config = (JSONObject) conn.remove("config");
  158. Connector connector = JsonUtil.jsonToObj(conn.toString(), Connector.class);
  159. Assert.notNull(connector, "Connector can not be null.");
  160. String connectorType = config.getString("connectorType");
  161. Class<?> configClass = ConnectorEnum.getConfigClass(connectorType);
  162. ConnectorConfig obj = (ConnectorConfig) JsonUtil.jsonToObj(config.toString(), configClass);
  163. connector.setConfig(obj);
  164. return connector;
  165. } catch (JSONException e) {
  166. logger.error(e.getMessage());
  167. throw new ParserException(e.getMessage());
  168. }
  169. }
  170. @Override
  171. public <T> T parseObject(String json, Class<T> clazz) {
  172. T t = JsonUtil.jsonToObj(json, clazz);
  173. return t;
  174. }
  175. @Override
  176. public List<ConnectorEnum> getConnectorEnumAll() {
  177. return Arrays.asList(ConnectorEnum.values());
  178. }
  179. @Override
  180. public List<OperationEnum> getOperationEnumAll() {
  181. return Arrays.asList(OperationEnum.values());
  182. }
  183. @Override
  184. public List<QuartzFilterEnum> getQuartzFilterEnumAll() {
  185. return Arrays.asList(QuartzFilterEnum.values());
  186. }
  187. @Override
  188. public List<FilterEnum> getFilterEnumAll() {
  189. return Arrays.asList(FilterEnum.values());
  190. }
  191. @Override
  192. public List<ConvertEnum> getConvertEnumAll() {
  193. return Arrays.asList(ConvertEnum.values());
  194. }
  195. @Override
  196. public List<StorageDataStatusEnum> getStorageDataStatusEnumAll() {
  197. return Arrays.asList(StorageDataStatusEnum.values());
  198. }
  199. @Override
  200. public void execute(Task task, Mapping mapping, TableGroup tableGroup) {
  201. final String metaId = task.getId();
  202. final String sourceConnectorId = mapping.getSourceConnectorId();
  203. final String targetConnectorId = mapping.getTargetConnectorId();
  204. ConnectorConfig sConfig = getConnectorConfig(sourceConnectorId);
  205. Assert.notNull(sConfig, "数据源配置不能为空.");
  206. ConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
  207. Assert.notNull(tConfig, "目标源配置不能为空.");
  208. TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
  209. Map<String, String> command = group.getCommand();
  210. Assert.notEmpty(command, "执行命令不能为空.");
  211. List<FieldMapping> fieldMapping = group.getFieldMapping();
  212. String sTableName = group.getSourceTable().getName();
  213. String tTableName = group.getTargetTable().getName();
  214. Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
  215. // 获取同步字段
  216. Picker picker = new Picker(fieldMapping);
  217. // 检查分页参数
  218. Map<String, String> params = getMeta(metaId).getMap();
  219. params.putIfAbsent(ParserEnum.PAGE_INDEX.getCode(), ParserEnum.PAGE_INDEX.getDefaultValue());
  220. int pageSize = mapping.getReadNum();
  221. int batchSize = mapping.getBatchNum();
  222. ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
  223. ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
  224. for (; ; ) {
  225. if (!task.isRunning()) {
  226. logger.warn("任务被中止:{}", metaId);
  227. break;
  228. }
  229. // 1、获取数据源数据
  230. int pageIndex = Integer.parseInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
  231. Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), pageIndex, pageSize));
  232. List<Map> data = reader.getSuccessData();
  233. if (CollectionUtils.isEmpty(data)) {
  234. params.clear();
  235. logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
  236. break;
  237. }
  238. // 2、映射字段
  239. List<Map> target = picker.pickData(data);
  240. // 3、参数转换
  241. ConvertUtil.convert(group.getConvert(), target);
  242. // 4、插件转换
  243. pluginFactory.convert(group.getPlugin(), data, target);
  244. // 5、写入目标源
  245. Result writer = writeBatch(new BatchWriter(tConnectorMapper, command, sTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize));
  246. // 6、更新结果
  247. flush(task, writer);
  248. // 7、更新分页数
  249. params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
  250. }
  251. }
  252. @Override
  253. public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event) {
  254. logger.info("Table[{}] {}, before:{}, after:{}", event.getSourceTableName(), event.getEvent(),
  255. event.getBefore(), event.getAfter());
  256. // 1、获取映射字段
  257. final String eventName = event.getEvent();
  258. Map<String, Object> data = StringUtil.equals(ConnectorConstant.OPERTION_DELETE, eventName) ? event.getBefore() : event.getAfter();
  259. Picker picker = new Picker(tableGroup.getFieldMapping(), data);
  260. Map target = picker.getTargetMap();
  261. // 2、参数转换
  262. ConvertUtil.convert(tableGroup.getConvert(), target);
  263. // 3、插件转换
  264. pluginFactory.convert(tableGroup.getPlugin(), eventName, data, target);
  265. // 4、写入缓冲执行器
  266. writerBufferActuator.offer(new WriterRequest(tableGroup.getId(), target, mapping.getMetaId(), mapping.getTargetConnectorId(), event.getSourceTableName(), event.getTargetTableName(), eventName, picker.getTargetFields(), tableGroup.getCommand()));
  267. }
  268. /**
  269. * 批量写入
  270. *
  271. * @param batchWriter
  272. * @return
  273. */
  274. @Override
  275. public Result writeBatch(BatchWriter batchWriter) {
  276. List<Map> dataList = batchWriter.getDataList();
  277. int batchSize = batchWriter.getBatchSize();
  278. String tableName = batchWriter.getTableName();
  279. String event = batchWriter.getEvent();
  280. Map<String, String> command = batchWriter.getCommand();
  281. List<Field> fields = batchWriter.getFields();
  282. boolean forceUpdate = batchWriter.isForceUpdate();
  283. // 总数
  284. int total = dataList.size();
  285. // 单次任务
  286. if (total <= batchSize) {
  287. return connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, dataList, forceUpdate));
  288. }
  289. // 批量任务, 拆分
  290. int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
  291. final Result result = new Result();
  292. final CountDownLatch latch = new CountDownLatch(taskSize);
  293. int fromIndex = 0;
  294. int toIndex = batchSize;
  295. for (int i = 0; i < taskSize; i++) {
  296. final List<Map> data;
  297. if (toIndex > total) {
  298. toIndex = fromIndex + (total % batchSize);
  299. data = dataList.subList(fromIndex, toIndex);
  300. } else {
  301. data = dataList.subList(fromIndex, toIndex);
  302. fromIndex += batchSize;
  303. toIndex += batchSize;
  304. }
  305. taskExecutor.execute(() -> {
  306. try {
  307. Result w = connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, data, forceUpdate));
  308. result.addSuccessData(w.getSuccessData());
  309. result.addFailData(w.getFailData());
  310. result.getError().append(w.getError());
  311. } catch (Exception e) {
  312. logger.error(e.getMessage());
  313. } finally {
  314. latch.countDown();
  315. }
  316. });
  317. }
  318. try {
  319. latch.await();
  320. } catch (InterruptedException e) {
  321. logger.error(e.getMessage());
  322. }
  323. return result;
  324. }
  325. /**
  326. * 更新缓存
  327. *
  328. * @param task
  329. * @param writer
  330. */
  331. private void flush(Task task, Result writer) {
  332. flushStrategy.flushFullData(task.getId(), writer, ConnectorConstant.OPERTION_INSERT);
  333. // 发布刷新事件给FullExtractor
  334. task.setEndTime(Instant.now().toEpochMilli());
  335. applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
  336. }
  337. /**
  338. * 获取Meta(注: 没有bean拷贝, 便于直接更新缓存)
  339. *
  340. * @param metaId
  341. * @return
  342. */
  343. private Meta getMeta(String metaId) {
  344. Assert.hasText(metaId, "Meta id can not be empty.");
  345. Meta meta = cacheService.get(metaId, Meta.class);
  346. Assert.notNull(meta, "Meta can not be null.");
  347. return meta;
  348. }
  349. /**
  350. * 获取连接器
  351. *
  352. * @param connectorId
  353. * @return
  354. */
  355. private Connector getConnector(String connectorId) {
  356. Assert.hasText(connectorId, "Connector id can not be empty.");
  357. Connector conn = cacheService.get(connectorId, Connector.class);
  358. Assert.notNull(conn, "Connector can not be null.");
  359. return conn;
  360. }
  361. /**
  362. * 获取连接配置
  363. *
  364. * @param connectorId
  365. * @return
  366. */
  367. private ConnectorConfig getConnectorConfig(String connectorId) {
  368. return getConnector(connectorId).getConfig();
  369. }
  370. }