ParserFactory.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. package org.dbsyncer.parser;
  2. import org.dbsyncer.cache.CacheService;
  3. import org.dbsyncer.common.event.RowChangedEvent;
  4. import org.dbsyncer.common.model.Result;
  5. import org.dbsyncer.common.util.CollectionUtils;
  6. import org.dbsyncer.common.util.JsonUtil;
  7. import org.dbsyncer.common.util.StringUtil;
  8. import org.dbsyncer.connector.ConnectorFactory;
  9. import org.dbsyncer.connector.ConnectorMapper;
  10. import org.dbsyncer.connector.config.*;
  11. import org.dbsyncer.connector.constant.ConnectorConstant;
  12. import org.dbsyncer.connector.enums.ConnectorEnum;
  13. import org.dbsyncer.connector.enums.FilterEnum;
  14. import org.dbsyncer.connector.enums.OperationEnum;
  15. import org.dbsyncer.listener.enums.QuartzFilterEnum;
  16. import org.dbsyncer.parser.enums.ConvertEnum;
  17. import org.dbsyncer.parser.enums.ParserEnum;
  18. import org.dbsyncer.parser.event.FullRefreshEvent;
  19. import org.dbsyncer.parser.logger.LogService;
  20. import org.dbsyncer.parser.logger.LogType;
  21. import org.dbsyncer.parser.model.*;
  22. import org.dbsyncer.parser.strategy.FlushStrategy;
  23. import org.dbsyncer.parser.util.ConvertUtil;
  24. import org.dbsyncer.parser.util.PickerUtil;
  25. import org.dbsyncer.plugin.PluginFactory;
  26. import org.dbsyncer.storage.enums.StorageDataStatusEnum;
  27. import org.json.JSONArray;
  28. import org.json.JSONException;
  29. import org.json.JSONObject;
  30. import org.slf4j.Logger;
  31. import org.slf4j.LoggerFactory;
  32. import org.springframework.beans.BeanUtils;
  33. import org.springframework.beans.factory.annotation.Autowired;
  34. import org.springframework.beans.factory.annotation.Qualifier;
  35. import org.springframework.context.ApplicationContext;
  36. import org.springframework.stereotype.Component;
  37. import org.springframework.util.Assert;
  38. import java.time.Instant;
  39. import java.util.ArrayList;
  40. import java.util.Arrays;
  41. import java.util.List;
  42. import java.util.Map;
  43. import java.util.concurrent.CountDownLatch;
  44. import java.util.concurrent.Executor;
  45. /**
  46. * @author AE86
  47. * @version 1.0.0
  48. * @date 2019/9/29 22:38
  49. */
  50. @Component
  51. public class ParserFactory implements Parser {
  52. private final Logger logger = LoggerFactory.getLogger(getClass());
  53. @Autowired
  54. private ConnectorFactory connectorFactory;
  55. @Autowired
  56. private PluginFactory pluginFactory;
  57. @Autowired
  58. private CacheService cacheService;
  59. @Autowired
  60. private LogService logService;
  61. @Autowired
  62. private FlushStrategy flushStrategy;
  63. @Autowired
  64. @Qualifier("taskExecutor")
  65. private Executor taskExecutor;
  66. @Autowired
  67. private ApplicationContext applicationContext;
  68. @Override
  69. public ConnectorMapper connect(ConnectorConfig config) {
  70. return connectorFactory.connect(config);
  71. }
  72. @Override
  73. public boolean refreshConnectorConfig(ConnectorConfig config) {
  74. return connectorFactory.refresh(config);
  75. }
  76. @Override
  77. public boolean isAliveConnectorConfig(ConnectorConfig config) {
  78. boolean alive = false;
  79. try {
  80. alive = connectorFactory.isAlive(config);
  81. } catch (Exception e) {
  82. LogType.ConnectorLog logType = LogType.ConnectorLog.FAILED;
  83. logService.log(logType, "%s%s", logType.getName(), e.getMessage());
  84. }
  85. // 断线重连
  86. if (!alive) {
  87. try {
  88. alive = connectorFactory.refresh(config);
  89. } catch (Exception e) {
  90. logger.error(e.getMessage());
  91. }
  92. if (alive) {
  93. logger.info(LogType.ConnectorLog.RECONNECT_SUCCESS.getMessage());
  94. }
  95. }
  96. return alive;
  97. }
  98. @Override
  99. public List<Table> getTable(ConnectorMapper config) {
  100. return connectorFactory.getTable(config);
  101. }
  102. @Override
  103. public MetaInfo getMetaInfo(String connectorId, String tableName) {
  104. Connector connector = getConnector(connectorId);
  105. ConnectorMapper connectorMapper = connectorFactory.connect(connector.getConfig());
  106. MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableName);
  107. if (!CollectionUtils.isEmpty(connector.getTable())) {
  108. for (Table t : connector.getTable()) {
  109. if (t.getName().equals(tableName)) {
  110. metaInfo.setTableType(t.getType());
  111. break;
  112. }
  113. }
  114. }
  115. return metaInfo;
  116. }
  117. @Override
  118. public Map<String, String> getCommand(Mapping mapping, TableGroup tableGroup) {
  119. ConnectorConfig connectorConfig = getConnectorConfig(mapping.getSourceConnectorId());
  120. String sType = connectorConfig.getConnectorType();
  121. String tType = getConnectorConfig(mapping.getTargetConnectorId()).getConnectorType();
  122. Table sourceTable = tableGroup.getSourceTable();
  123. Table targetTable = tableGroup.getTargetTable();
  124. Table sTable = new Table(sourceTable.getName(), sourceTable.getType(), new ArrayList<>());
  125. Table tTable = new Table(targetTable.getName(), targetTable.getType(), new ArrayList<>());
  126. List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
  127. if (!CollectionUtils.isEmpty(fieldMapping)) {
  128. fieldMapping.forEach(m -> {
  129. if (null != m.getSource()) {
  130. sTable.getColumn().add(m.getSource());
  131. }
  132. if (null != m.getTarget()) {
  133. tTable.getColumn().add(m.getTarget());
  134. }
  135. });
  136. }
  137. final CommandConfig sourceConfig = new CommandConfig(sType, sTable, sourceTable, tableGroup.getFilter(), connectorConfig);
  138. final CommandConfig targetConfig = new CommandConfig(tType, tTable, targetTable);
  139. // 获取连接器同步参数
  140. Map<String, String> command = connectorFactory.getCommand(sourceConfig, targetConfig);
  141. return command;
  142. }
  143. @Override
  144. public long getCount(String connectorId, Map<String, String> command) {
  145. ConnectorMapper connectorMapper = connectorFactory.connect(getConnectorConfig(connectorId));
  146. return connectorFactory.getCount(connectorMapper, command);
  147. }
  148. @Override
  149. public Connector parseConnector(String json) {
  150. try {
  151. JSONObject conn = new JSONObject(json);
  152. JSONObject config = (JSONObject) conn.remove("config");
  153. JSONArray table = (JSONArray) conn.remove("table");
  154. Connector connector = JsonUtil.jsonToObj(conn.toString(), Connector.class);
  155. Assert.notNull(connector, "Connector can not be null.");
  156. String connectorType = config.getString("connectorType");
  157. Class<?> configClass = ConnectorEnum.getConfigClass(connectorType);
  158. ConnectorConfig obj = (ConnectorConfig) JsonUtil.jsonToObj(config.toString(), configClass);
  159. connector.setConfig(obj);
  160. List<Table> tableList = new ArrayList<>();
  161. boolean exist = false;
  162. for (int i = 0; i < table.length(); i++) {
  163. if (table.get(i) instanceof String) {
  164. tableList.add(new Table(table.getString(i)));
  165. exist = true;
  166. }
  167. }
  168. if (!exist) {
  169. tableList = JsonUtil.jsonToArray(table.toString(), Table.class);
  170. }
  171. connector.setTable(tableList);
  172. return connector;
  173. } catch (JSONException e) {
  174. logger.error(e.getMessage());
  175. throw new ParserException(e.getMessage());
  176. }
  177. }
  178. @Override
  179. public <T> T parseObject(String json, Class<T> clazz) {
  180. try {
  181. JSONObject obj = new JSONObject(json);
  182. T t = JsonUtil.jsonToObj(obj.toString(), clazz);
  183. String format = String.format("%s can not be null.", clazz.getSimpleName());
  184. Assert.notNull(t, format);
  185. return t;
  186. } catch (JSONException e) {
  187. logger.error(e.getMessage());
  188. throw new ParserException(e.getMessage());
  189. }
  190. }
  191. @Override
  192. public List<ConnectorEnum> getConnectorEnumAll() {
  193. return Arrays.asList(ConnectorEnum.values());
  194. }
  195. @Override
  196. public List<OperationEnum> getOperationEnumAll() {
  197. return Arrays.asList(OperationEnum.values());
  198. }
  199. @Override
  200. public List<QuartzFilterEnum> getQuartzFilterEnumAll() {
  201. return Arrays.asList(QuartzFilterEnum.values());
  202. }
  203. @Override
  204. public List<FilterEnum> getFilterEnumAll() {
  205. return Arrays.asList(FilterEnum.values());
  206. }
  207. @Override
  208. public List<ConvertEnum> getConvertEnumAll() {
  209. return Arrays.asList(ConvertEnum.values());
  210. }
  211. @Override
  212. public List<StorageDataStatusEnum> getStorageDataStatusEnumAll() {
  213. return Arrays.asList(StorageDataStatusEnum.values());
  214. }
  215. @Override
  216. public void execute(Task task, Mapping mapping, TableGroup tableGroup) {
  217. final String metaId = task.getId();
  218. final String sourceConnectorId = mapping.getSourceConnectorId();
  219. final String targetConnectorId = mapping.getTargetConnectorId();
  220. ConnectorConfig sConfig = getConnectorConfig(sourceConnectorId);
  221. Assert.notNull(sConfig, "数据源配置不能为空.");
  222. ConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
  223. Assert.notNull(tConfig, "目标源配置不能为空.");
  224. TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
  225. Map<String, String> command = group.getCommand();
  226. Assert.notEmpty(command, "执行命令不能为空.");
  227. List<FieldMapping> fieldMapping = group.getFieldMapping();
  228. String sTableName = group.getSourceTable().getName();
  229. String tTableName = group.getTargetTable().getName();
  230. Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
  231. // 获取同步字段
  232. Picker picker = new Picker(fieldMapping);
  233. // 检查分页参数
  234. Map<String, String> params = getMeta(metaId).getMap();
  235. params.putIfAbsent(ParserEnum.PAGE_INDEX.getCode(), ParserEnum.PAGE_INDEX.getDefaultValue());
  236. int pageSize = mapping.getReadNum();
  237. int batchSize = mapping.getBatchNum();
  238. ConnectorMapper sConnectionMapper = connectorFactory.connect(sConfig);
  239. ConnectorMapper tConnectionMapper = connectorFactory.connect(tConfig);
  240. for (; ; ) {
  241. if (!task.isRunning()) {
  242. logger.warn("任务被中止:{}", metaId);
  243. break;
  244. }
  245. // 1、获取数据源数据
  246. int pageIndex = Integer.parseInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
  247. Result reader = connectorFactory.reader(sConnectionMapper, new ReaderConfig(command, new ArrayList<>(), pageIndex, pageSize));
  248. List<Map> data = reader.getData();
  249. if (CollectionUtils.isEmpty(data)) {
  250. params.clear();
  251. logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
  252. break;
  253. }
  254. // 2、映射字段
  255. List<Map> target = picker.pickData(reader.getData());
  256. // 3、参数转换
  257. ConvertUtil.convert(group.getConvert(), target);
  258. // 4、插件转换
  259. pluginFactory.convert(group.getPlugin(), data, target);
  260. // 5、写入目标源
  261. Result writer = writeBatch(tConnectionMapper, command, picker.getTargetFields(), target, batchSize);
  262. // 6、更新结果
  263. flush(task, writer, target);
  264. // 7、更新分页数
  265. params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
  266. }
  267. }
  268. @Override
  269. public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent rowChangedEvent) {
  270. logger.info("Table[{}] {}, before:{}, after:{}", rowChangedEvent.getTableName(), rowChangedEvent.getEvent(),
  271. rowChangedEvent.getBefore(), rowChangedEvent.getAfter());
  272. final String metaId = mapping.getMetaId();
  273. ConnectorMapper tConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
  274. // 1、获取映射字段
  275. final String event = rowChangedEvent.getEvent();
  276. Map<String, Object> data = StringUtil.equals(ConnectorConstant.OPERTION_DELETE, event) ? rowChangedEvent.getBefore() : rowChangedEvent.getAfter();
  277. Picker picker = new Picker(tableGroup.getFieldMapping(), data);
  278. Map target = picker.getTargetMap();
  279. // 2、参数转换
  280. ConvertUtil.convert(tableGroup.getConvert(), target);
  281. // 3、插件转换
  282. pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
  283. // 4、写入目标源
  284. Result writer = connectorFactory.writer(tConnectorMapper, new WriterSingleConfig(picker.getTargetFields(), tableGroup.getCommand(), event, target, rowChangedEvent.getTableName(), rowChangedEvent.isForceUpdate()));
  285. // 5、更新结果
  286. flushStrategy.flushIncrementData(metaId, writer, event, picker.getTargetMapList());
  287. }
  288. /**
  289. * 更新缓存
  290. *
  291. * @param task
  292. * @param writer
  293. * @param data
  294. */
  295. private void flush(Task task, Result writer, List<Map> data) {
  296. flushStrategy.flushFullData(task.getId(), writer, ConnectorConstant.OPERTION_INSERT, data);
  297. // 发布刷新事件给FullExtractor
  298. task.setEndTime(Instant.now().toEpochMilli());
  299. applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
  300. }
  301. /**
  302. * 获取Meta(注: 没有bean拷贝, 便于直接更新缓存)
  303. *
  304. * @param metaId
  305. * @return
  306. */
  307. private Meta getMeta(String metaId) {
  308. Assert.hasText(metaId, "Meta id can not be empty.");
  309. Meta meta = cacheService.get(metaId, Meta.class);
  310. Assert.notNull(meta, "Meta can not be null.");
  311. return meta;
  312. }
  313. /**
  314. * 获取连接器
  315. *
  316. * @param connectorId
  317. * @return
  318. */
  319. private Connector getConnector(String connectorId) {
  320. Assert.hasText(connectorId, "Connector id can not be empty.");
  321. Connector conn = cacheService.get(connectorId, Connector.class);
  322. Assert.notNull(conn, "Connector can not be null.");
  323. Connector connector = new Connector();
  324. BeanUtils.copyProperties(conn, connector);
  325. return connector;
  326. }
  327. /**
  328. * 获取连接配置
  329. *
  330. * @param connectorId
  331. * @return
  332. */
  333. private ConnectorConfig getConnectorConfig(String connectorId) {
  334. Connector connector = getConnector(connectorId);
  335. return connector.getConfig();
  336. }
  337. /**
  338. * 批量写入
  339. *
  340. * @param connectorMapper
  341. * @param command
  342. * @param fields
  343. * @param target
  344. * @param batchSize
  345. * @return
  346. */
  347. private Result writeBatch(ConnectorMapper connectorMapper, Map<String, String> command, List<Field> fields, List<Map> target, int batchSize) {
  348. // 总数
  349. int total = target.size();
  350. // 单次任务
  351. if (total <= batchSize) {
  352. return connectorFactory.writer(connectorMapper, new WriterBatchConfig(command, fields, target));
  353. }
  354. // 批量任务, 拆分
  355. int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
  356. final Result result = new Result();
  357. final CountDownLatch latch = new CountDownLatch(taskSize);
  358. int fromIndex = 0;
  359. int toIndex = batchSize;
  360. for (int i = 0; i < taskSize; i++) {
  361. final List<Map> data;
  362. if (toIndex > total) {
  363. toIndex = fromIndex + (total % batchSize);
  364. data = target.subList(fromIndex, toIndex);
  365. } else {
  366. data = target.subList(fromIndex, toIndex);
  367. fromIndex += batchSize;
  368. toIndex += batchSize;
  369. }
  370. taskExecutor.execute(() -> {
  371. try {
  372. Result w = connectorFactory.writer(connectorMapper, new WriterBatchConfig(command, fields, data));
  373. // CAS
  374. result.getFailData().addAll(w.getFailData());
  375. result.getFail().getAndAdd(w.getFail().get());
  376. result.getError().append(w.getError());
  377. } catch (Exception e) {
  378. result.getError().append(e.getMessage()).append(System.lineSeparator());
  379. } finally {
  380. latch.countDown();
  381. }
  382. });
  383. }
  384. try {
  385. latch.await();
  386. } catch (InterruptedException e) {
  387. logger.error(e.getMessage());
  388. }
  389. return result;
  390. }
  391. }