ParserFactory.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. package org.dbsyncer.parser;
  2. import org.apache.commons.lang.StringUtils;
  3. import org.dbsyncer.cache.CacheService;
  4. import org.dbsyncer.common.event.FullRefreshEvent;
  5. import org.dbsyncer.common.event.RowChangedEvent;
  6. import org.dbsyncer.common.model.Result;
  7. import org.dbsyncer.common.model.Task;
  8. import org.dbsyncer.common.util.CollectionUtils;
  9. import org.dbsyncer.common.util.JsonUtil;
  10. import org.dbsyncer.connector.ConnectorFactory;
  11. import org.dbsyncer.connector.ConnectorMapper;
  12. import org.dbsyncer.connector.config.*;
  13. import org.dbsyncer.connector.constant.ConnectorConstant;
  14. import org.dbsyncer.connector.enums.ConnectorEnum;
  15. import org.dbsyncer.connector.enums.FilterEnum;
  16. import org.dbsyncer.connector.enums.OperationEnum;
  17. import org.dbsyncer.listener.enums.QuartzFilterEnum;
  18. import org.dbsyncer.parser.enums.ConvertEnum;
  19. import org.dbsyncer.parser.enums.ParserEnum;
  20. import org.dbsyncer.parser.flush.FlushService;
  21. import org.dbsyncer.parser.logger.LogType;
  22. import org.dbsyncer.parser.model.*;
  23. import org.dbsyncer.parser.util.ConvertUtil;
  24. import org.dbsyncer.parser.util.PickerUtil;
  25. import org.dbsyncer.plugin.PluginFactory;
  26. import org.dbsyncer.storage.enums.StorageDataStatusEnum;
  27. import org.json.JSONException;
  28. import org.json.JSONObject;
  29. import org.slf4j.Logger;
  30. import org.slf4j.LoggerFactory;
  31. import org.springframework.beans.BeanUtils;
  32. import org.springframework.beans.factory.annotation.Autowired;
  33. import org.springframework.context.ApplicationContext;
  34. import org.springframework.stereotype.Component;
  35. import org.springframework.util.Assert;
  36. import java.time.Instant;
  37. import java.util.*;
  38. import java.util.concurrent.ConcurrentLinkedQueue;
  39. import java.util.concurrent.CountDownLatch;
  40. import java.util.concurrent.Executor;
  41. /**
  42. * @author AE86
  43. * @version 1.0.0
  44. * @date 2019/9/29 22:38
  45. */
  46. @Component
  47. public class ParserFactory implements Parser {
  48. private final Logger logger = LoggerFactory.getLogger(getClass());
  49. @Autowired
  50. private ConnectorFactory connectorFactory;
  51. @Autowired
  52. private PluginFactory pluginFactory;
  53. @Autowired
  54. private CacheService cacheService;
  55. @Autowired
  56. private FlushService flushService;
  57. @Autowired
  58. private Executor taskExecutor;
  59. @Autowired
  60. private ApplicationContext applicationContext;
  61. @Override
  62. public ConnectorMapper connect(ConnectorConfig config) {
  63. return connectorFactory.connect(config);
  64. }
  65. @Override
  66. public boolean refreshConnectorConfig(ConnectorConfig config) {
  67. return connectorFactory.refresh(config);
  68. }
  69. @Override
  70. public boolean isAliveConnectorConfig(ConnectorConfig config) {
  71. try {
  72. return connectorFactory.isAlive(config);
  73. } catch (Exception e) {
  74. LogType.ConnectorLog logType = LogType.ConnectorLog.FAILED;
  75. flushService.asyncWrite(logType.getType(), String.format("%s%s", logType.getName(), e.getMessage()));
  76. }
  77. return false;
  78. }
  79. @Override
  80. public List<String> getTable(ConnectorMapper config) {
  81. return connectorFactory.getTable(config);
  82. }
  83. @Override
  84. public MetaInfo getMetaInfo(String connectorId, String tableName) {
  85. ConnectorMapper connectorMapper = connectorFactory.connect(getConnectorConfig(connectorId));
  86. return connectorFactory.getMetaInfo(connectorMapper, tableName);
  87. }
  88. @Override
  89. public Map<String, String> getCommand(Mapping mapping, TableGroup tableGroup) {
  90. String sType = getConnectorConfig(mapping.getSourceConnectorId()).getConnectorType();
  91. String tType = getConnectorConfig(mapping.getTargetConnectorId()).getConnectorType();
  92. Table sourceTable = tableGroup.getSourceTable();
  93. Table targetTable = tableGroup.getTargetTable();
  94. Table sTable = new Table().setName(sourceTable.getName()).setColumn(new ArrayList<>());
  95. Table tTable = new Table().setName(targetTable.getName()).setColumn(new ArrayList<>());
  96. List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
  97. if (!CollectionUtils.isEmpty(fieldMapping)) {
  98. fieldMapping.forEach(m -> {
  99. if (null != m.getSource()) {
  100. sTable.getColumn().add(m.getSource());
  101. }
  102. if (null != m.getTarget()) {
  103. tTable.getColumn().add(m.getTarget());
  104. }
  105. });
  106. }
  107. final CommandConfig sourceConfig = new CommandConfig(sType, sTable, sourceTable, tableGroup.getFilter());
  108. final CommandConfig targetConfig = new CommandConfig(tType, tTable, targetTable);
  109. // 获取连接器同步参数
  110. Map<String, String> command = connectorFactory.getCommand(sourceConfig, targetConfig);
  111. return command;
  112. }
  113. @Override
  114. public long getCount(String connectorId, Map<String, String> command) {
  115. ConnectorMapper connectorMapper = connectorFactory.connect(getConnectorConfig(connectorId));
  116. return connectorFactory.getCount(connectorMapper, command);
  117. }
  118. @Override
  119. public Connector parseConnector(String json) {
  120. try {
  121. JSONObject conn = new JSONObject(json);
  122. JSONObject config = (JSONObject) conn.remove("config");
  123. Connector connector = JsonUtil.jsonToObj(conn.toString(), Connector.class);
  124. Assert.notNull(connector, "Connector can not be null.");
  125. String connectorType = config.getString("connectorType");
  126. Class<?> configClass = ConnectorEnum.getConfigClass(connectorType);
  127. ConnectorConfig obj = (ConnectorConfig) JsonUtil.jsonToObj(config.toString(), configClass);
  128. connector.setConfig(obj);
  129. return connector;
  130. } catch (JSONException e) {
  131. logger.error(e.getMessage());
  132. throw new ParserException(e.getMessage());
  133. }
  134. }
  135. @Override
  136. public <T> T parseObject(String json, Class<T> clazz) {
  137. try {
  138. JSONObject obj = new JSONObject(json);
  139. T t = JsonUtil.jsonToObj(obj.toString(), clazz);
  140. String format = String.format("%s can not be null.", clazz.getSimpleName());
  141. Assert.notNull(t, format);
  142. return t;
  143. } catch (JSONException e) {
  144. logger.error(e.getMessage());
  145. throw new ParserException(e.getMessage());
  146. }
  147. }
  148. @Override
  149. public List<ConnectorEnum> getConnectorEnumAll() {
  150. return Arrays.asList(ConnectorEnum.values());
  151. }
  152. @Override
  153. public List<OperationEnum> getOperationEnumAll() {
  154. return Arrays.asList(OperationEnum.values());
  155. }
  156. @Override
  157. public List<QuartzFilterEnum> getQuartzFilterEnumAll() {
  158. return Arrays.asList(QuartzFilterEnum.values());
  159. }
  160. @Override
  161. public List<FilterEnum> getFilterEnumAll() {
  162. return Arrays.asList(FilterEnum.values());
  163. }
  164. @Override
  165. public List<ConvertEnum> getConvertEnumAll() {
  166. return Arrays.asList(ConvertEnum.values());
  167. }
  168. @Override
  169. public List<StorageDataStatusEnum> getStorageDataStatusEnumAll() {
  170. return Arrays.asList(StorageDataStatusEnum.values());
  171. }
  172. @Override
  173. public void execute(Task task, Mapping mapping, TableGroup tableGroup) {
  174. final String metaId = task.getId();
  175. final String sourceConnectorId = mapping.getSourceConnectorId();
  176. final String targetConnectorId = mapping.getTargetConnectorId();
  177. ConnectorConfig sConfig = getConnectorConfig(sourceConnectorId);
  178. Assert.notNull(sConfig, "数据源配置不能为空.");
  179. ConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
  180. Assert.notNull(tConfig, "目标源配置不能为空.");
  181. TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
  182. Map<String, String> command = group.getCommand();
  183. Assert.notEmpty(command, "执行命令不能为空.");
  184. List<FieldMapping> fieldMapping = group.getFieldMapping();
  185. String sTableName = group.getSourceTable().getName();
  186. String tTableName = group.getTargetTable().getName();
  187. Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
  188. // 获取同步字段
  189. Picker picker = new Picker(fieldMapping);
  190. // 检查分页参数
  191. Map<String, String> params = getMeta(metaId).getMap();
  192. params.putIfAbsent(ParserEnum.PAGE_INDEX.getCode(), ParserEnum.PAGE_INDEX.getDefaultValue());
  193. int pageSize = mapping.getReadNum();
  194. int batchSize = mapping.getBatchNum();
  195. ConnectorMapper sConnectionMapper = connectorFactory.connect(sConfig);
  196. ConnectorMapper tConnectionMapper = connectorFactory.connect(tConfig);
  197. for (; ; ) {
  198. if (!task.isRunning()) {
  199. logger.warn("任务被中止:{}", metaId);
  200. break;
  201. }
  202. // 1、获取数据源数据
  203. int pageIndex = Integer.parseInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
  204. Result reader = connectorFactory.reader(new ReaderConfig(sConnectionMapper, command, new ArrayList<>(), pageIndex, pageSize));
  205. List<Map> data = reader.getData();
  206. if (CollectionUtils.isEmpty(data)) {
  207. params.clear();
  208. logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
  209. break;
  210. }
  211. // 2、映射字段
  212. List<Map> target = picker.pickData(reader.getData());
  213. // 3、参数转换
  214. ConvertUtil.convert(group.getConvert(), target);
  215. // 4、插件转换
  216. pluginFactory.convert(group.getPlugin(), data, target);
  217. // 5、写入目标源
  218. Result writer = writeBatch(tConnectionMapper, command, picker.getTargetFields(), target, batchSize);
  219. // 6、更新结果
  220. flush(task, writer, target);
  221. // 7、更新分页数
  222. params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
  223. }
  224. }
  225. @Override
  226. public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent rowChangedEvent) {
  227. logger.info("Table[{}] {}, before:{}, after:{}", rowChangedEvent.getTableName(), rowChangedEvent.getEvent(),
  228. rowChangedEvent.getBefore(), rowChangedEvent.getAfter());
  229. final String metaId = mapping.getMetaId();
  230. ConnectorMapper tConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
  231. // 1、获取映射字段
  232. final String event = rowChangedEvent.getEvent();
  233. Map<String, Object> data = StringUtils.equals(ConnectorConstant.OPERTION_DELETE, event) ? rowChangedEvent.getBefore() : rowChangedEvent.getAfter();
  234. Picker picker = new Picker(tableGroup.getFieldMapping(), data);
  235. Map target = picker.getTargetMap();
  236. // 2、参数转换
  237. ConvertUtil.convert(tableGroup.getConvert(), target);
  238. // 3、插件转换
  239. pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
  240. // 4、写入目标源
  241. Result writer = connectorFactory.writer(new WriterSingleConfig(tConnectorMapper, picker.getTargetFields(), tableGroup.getCommand(), event, target, rowChangedEvent.getTableName()));
  242. // 5、更新结果
  243. flush(metaId, writer, event, picker.getTargetMapList());
  244. }
  245. /**
  246. * 更新缓存
  247. *
  248. * @param task
  249. * @param writer
  250. * @param data
  251. */
  252. private void flush(Task task, Result writer, List<Map> data) {
  253. flush(task.getId(), writer, ConnectorConstant.OPERTION_INSERT, data);
  254. // 发布刷新事件给FullExtractor
  255. task.setEndTime(Instant.now().toEpochMilli());
  256. applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
  257. }
  258. private void flush(String metaId, Result writer, String event, List<Map> data) {
  259. // 引用传递
  260. long total = data.size();
  261. long fail = writer.getFail().get();
  262. Meta meta = getMeta(metaId);
  263. meta.getFail().getAndAdd(fail);
  264. meta.getSuccess().getAndAdd(total - fail);
  265. // 记录错误数据
  266. Queue<Map> failData = writer.getFailData();
  267. boolean success = CollectionUtils.isEmpty(failData);
  268. if (!success) {
  269. data.clear();
  270. data.addAll(failData);
  271. }
  272. String error = writer.getError().toString();
  273. flushService.asyncWrite(metaId, event, success, data, error);
  274. }
  275. /**
  276. * 获取Meta(注: 没有bean拷贝, 便于直接更新缓存)
  277. *
  278. * @param metaId
  279. * @return
  280. */
  281. private Meta getMeta(String metaId) {
  282. Assert.hasText(metaId, "Meta id can not be empty.");
  283. Meta meta = cacheService.get(metaId, Meta.class);
  284. Assert.notNull(meta, "Meta can not be null.");
  285. return meta;
  286. }
  287. /**
  288. * 获取连接配置
  289. *
  290. * @param connectorId
  291. * @return
  292. */
  293. private ConnectorConfig getConnectorConfig(String connectorId) {
  294. Assert.hasText(connectorId, "Connector id can not be empty.");
  295. Connector conn = cacheService.get(connectorId, Connector.class);
  296. Assert.notNull(conn, "Connector can not be null.");
  297. Connector connector = new Connector();
  298. BeanUtils.copyProperties(conn, connector);
  299. return connector.getConfig();
  300. }
  301. /**
  302. * 批量写入
  303. *
  304. * @param connectorMapper
  305. * @param command
  306. * @param fields
  307. * @param target
  308. * @param batchSize
  309. * @return
  310. */
  311. private Result writeBatch(ConnectorMapper connectorMapper, Map<String, String> command, List<Field> fields, List<Map> target, int batchSize) {
  312. // 总数
  313. int total = target.size();
  314. // 单次任务
  315. if (total <= batchSize) {
  316. return connectorFactory.writer(new WriterBatchConfig(connectorMapper, command, fields, target));
  317. }
  318. // 批量任务, 拆分
  319. int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
  320. // 转换为消息队列,并发写入
  321. Queue<Map> queue = new ConcurrentLinkedQueue<>(target);
  322. final Result result = new Result();
  323. final CountDownLatch latch = new CountDownLatch(taskSize);
  324. for (int i = 0; i < taskSize; i++) {
  325. taskExecutor.execute(() -> {
  326. try {
  327. Result w = parallelTask(batchSize, queue, connectorMapper, command, fields);
  328. // CAS
  329. result.getFailData().addAll(w.getFailData());
  330. result.getFail().getAndAdd(w.getFail().get());
  331. result.getError().append(w.getError());
  332. } catch (Exception e) {
  333. result.getError().append(e.getMessage()).append(System.lineSeparator());
  334. } finally {
  335. latch.countDown();
  336. }
  337. });
  338. }
  339. try {
  340. latch.await();
  341. } catch (InterruptedException e) {
  342. logger.error(e.getMessage());
  343. }
  344. return result;
  345. }
  346. private Result parallelTask(int batchSize, Queue<Map> queue, ConnectorMapper connectorMapper, Map<String, String> command,
  347. List<Field> fields) {
  348. List<Map> data = new ArrayList<>();
  349. for (int j = 0; j < batchSize; j++) {
  350. Map poll = queue.poll();
  351. if (null == poll) {
  352. break;
  353. }
  354. data.add(poll);
  355. }
  356. return connectorFactory.writer(new WriterBatchConfig(connectorMapper, command, fields, data));
  357. }
  358. }