ParserFactory.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. package org.dbsyncer.parser;
  2. import org.apache.commons.lang.StringUtils;
  3. import org.dbsyncer.cache.CacheService;
  4. import org.dbsyncer.common.event.FullRefreshEvent;
  5. import org.dbsyncer.common.model.Result;
  6. import org.dbsyncer.common.model.Task;
  7. import org.dbsyncer.common.util.CollectionUtils;
  8. import org.dbsyncer.common.util.JsonUtil;
  9. import org.dbsyncer.connector.ConnectorFactory;
  10. import org.dbsyncer.connector.config.*;
  11. import org.dbsyncer.connector.enums.ConnectorEnum;
  12. import org.dbsyncer.connector.enums.FilterEnum;
  13. import org.dbsyncer.connector.enums.OperationEnum;
  14. import org.dbsyncer.parser.enums.ConvertEnum;
  15. import org.dbsyncer.parser.enums.ParserEnum;
  16. import org.dbsyncer.parser.flush.FlushService;
  17. import org.dbsyncer.parser.model.*;
  18. import org.dbsyncer.parser.util.ConvertUtil;
  19. import org.dbsyncer.parser.util.PickerUtil;
  20. import org.dbsyncer.plugin.PluginFactory;
  21. import org.dbsyncer.plugin.config.Plugin;
  22. import org.json.JSONException;
  23. import org.json.JSONObject;
  24. import org.slf4j.Logger;
  25. import org.slf4j.LoggerFactory;
  26. import org.springframework.beans.BeanUtils;
  27. import org.springframework.beans.factory.annotation.Autowired;
  28. import org.springframework.context.ApplicationContext;
  29. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  30. import org.springframework.stereotype.Component;
  31. import org.springframework.util.Assert;
  32. import java.util.*;
  33. import java.util.concurrent.ConcurrentLinkedQueue;
  34. import java.util.concurrent.CountDownLatch;
  35. import java.util.concurrent.ThreadPoolExecutor;
  36. /**
  37. * @author AE86
  38. * @version 1.0.0
  39. * @date 2019/9/29 22:38
  40. */
  41. @Component
  42. public class ParserFactory implements Parser {
  43. private final Logger logger = LoggerFactory.getLogger(getClass());
  44. @Autowired
  45. private ConnectorFactory connectorFactory;
  46. @Autowired
  47. private PluginFactory pluginFactory;
  48. @Autowired
  49. private CacheService cacheService;
  50. @Autowired
  51. private FlushService flushService;
  52. @Autowired
  53. private ApplicationContext applicationContext;
  54. @Override
  55. public boolean alive(ConnectorConfig config) {
  56. return connectorFactory.isAlive(config);
  57. }
  58. @Override
  59. public List<String> getTable(ConnectorConfig config) {
  60. return connectorFactory.getTable(config);
  61. }
  62. @Override
  63. public MetaInfo getMetaInfo(String connectorId, String tableName) {
  64. ConnectorConfig config = getConnectorConfig(connectorId);
  65. return connectorFactory.getMetaInfo(config, tableName);
  66. }
  67. @Override
  68. public Map<String, String> getCommand(Mapping mapping, TableGroup tableGroup) {
  69. final String sourceConnectorId = mapping.getSourceConnectorId();
  70. final String targetConnectorId = mapping.getTargetConnectorId();
  71. List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
  72. if (CollectionUtils.isEmpty(fieldMapping)) {
  73. return null;
  74. }
  75. String sType = getConnectorConfig(sourceConnectorId).getConnectorType();
  76. String tType = getConnectorConfig(targetConnectorId).getConnectorType();
  77. String sTableName = tableGroup.getSourceTable().getName();
  78. String tTableName = tableGroup.getTargetTable().getName();
  79. Table sTable = new Table().setName(sTableName).setColumn(new ArrayList<>());
  80. Table tTable = new Table().setName(tTableName).setColumn(new ArrayList<>());
  81. fieldMapping.forEach(m -> {
  82. sTable.getColumn().add(m.getSource());
  83. tTable.getColumn().add(m.getTarget());
  84. });
  85. final CommandConfig sourceConfig = new CommandConfig(sType, sTable, tableGroup.getFilter());
  86. final CommandConfig targetConfig = new CommandConfig(tType, tTable);
  87. // 获取连接器同步参数
  88. Map<String, String> command = connectorFactory.getCommand(sourceConfig, targetConfig);
  89. return command;
  90. }
  91. @Override
  92. public long getCount(String connectorId, Map<String, String> command) {
  93. ConnectorConfig config = getConnectorConfig(connectorId);
  94. return connectorFactory.getCount(config, command);
  95. }
  96. @Override
  97. public Connector parseConnector(String json) {
  98. try {
  99. JSONObject conn = new JSONObject(json);
  100. JSONObject config = (JSONObject) conn.remove("config");
  101. Connector connector = JsonUtil.jsonToObj(conn.toString(), Connector.class);
  102. Assert.notNull(connector, "Connector can not be null.");
  103. String connectorType = config.getString("connectorType");
  104. Class<?> configClass = ConnectorEnum.getConfigClass(connectorType);
  105. ConnectorConfig obj = (ConnectorConfig) JsonUtil.jsonToObj(config.toString(), configClass);
  106. connector.setConfig(obj);
  107. return connector;
  108. } catch (JSONException e) {
  109. logger.error(e.getMessage());
  110. throw new ParserException(e.getMessage());
  111. }
  112. }
  113. @Override
  114. public <T> T parseObject(String json, Class<T> clazz) {
  115. try {
  116. JSONObject obj = new JSONObject(json);
  117. T t = JsonUtil.jsonToObj(obj.toString(), clazz);
  118. String format = String.format("%s can not be null.", clazz.getSimpleName());
  119. Assert.notNull(t, format);
  120. return t;
  121. } catch (JSONException e) {
  122. logger.error(e.getMessage());
  123. throw new ParserException(e.getMessage());
  124. }
  125. }
  126. @Override
  127. public List<ConnectorEnum> getConnectorEnumAll() {
  128. return Arrays.asList(ConnectorEnum.values());
  129. }
  130. @Override
  131. public List<OperationEnum> getOperationEnumAll() {
  132. return Arrays.asList(OperationEnum.values());
  133. }
  134. @Override
  135. public List<FilterEnum> getFilterEnumAll() {
  136. return Arrays.asList(FilterEnum.values());
  137. }
  138. @Override
  139. public List<ConvertEnum> getConvertEnumAll() {
  140. return Arrays.asList(ConvertEnum.values());
  141. }
  142. @Override
  143. public void execute(Task task, Mapping mapping, TableGroup tableGroup) {
  144. final String metaId = task.getId();
  145. final String sourceConnectorId = mapping.getSourceConnectorId();
  146. final String targetConnectorId = mapping.getTargetConnectorId();
  147. ConnectorConfig sConfig = getConnectorConfig(sourceConnectorId);
  148. Assert.notNull(sConfig, "数据源配置不能为空.");
  149. ConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
  150. Assert.notNull(tConfig, "目标源配置不能为空.");
  151. Map<String, String> command = tableGroup.getCommand();
  152. Assert.notEmpty(command, "执行命令不能为空.");
  153. List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
  154. String sTableName = tableGroup.getSourceTable().getName();
  155. String tTableName = tableGroup.getTargetTable().getName();
  156. Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
  157. // 转换配置(默认使用全局)
  158. List<Convert> convert = CollectionUtils.isEmpty(tableGroup.getConvert()) ? mapping.getConvert() : tableGroup.getConvert();
  159. // 插件配置(默认使用全局)
  160. Plugin plugin = null == tableGroup.getPlugin() ? mapping.getPlugin() : tableGroup.getPlugin();
  161. // 获取同步字段
  162. Picker picker = new Picker();
  163. PickerUtil.pickFields(picker, fieldMapping);
  164. // 检查分页参数
  165. Map<String, String> params = getMeta(metaId).getMap();
  166. params.putIfAbsent(ParserEnum.PAGE_INDEX.getCode(), ParserEnum.PAGE_INDEX.getDefaultValue());
  167. int pageSize = mapping.getReadNum();
  168. int threadSize = mapping.getThreadNum();
  169. int batchSize = mapping.getBatchNum();
  170. for (; ; ) {
  171. if (!task.isRunning()) {
  172. logger.warn("任务被中止:{}", metaId);
  173. break;
  174. }
  175. // 1、获取数据源数据
  176. int pageIndex = Integer.parseInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
  177. Result reader = connectorFactory.reader(sConfig, command, pageIndex, pageSize);
  178. List<Map<String, Object>> data = reader.getData();
  179. if (CollectionUtils.isEmpty(data)) {
  180. params.clear();
  181. logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
  182. break;
  183. }
  184. // 2、映射字段
  185. PickerUtil.pickData(picker, data);
  186. // 3、参数转换
  187. List<Map<String, Object>> target = picker.getTargetList();
  188. ConvertUtil.convert(convert, target);
  189. // 4、插件转换
  190. pluginFactory.convert(plugin, data, target);
  191. // 5、写入目标源
  192. Result writer = writeBatch(tConfig, command, picker.getTargetFields(), target, threadSize, batchSize);
  193. // 6、更新结果
  194. flush(task, writer, target);
  195. // 7、更新分页数
  196. params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
  197. }
  198. }
  199. @Override
  200. public void execute(Mapping mapping, TableGroup tableGroup, DataEvent dataEvent) {
  201. logger.info("同步数据=> dataEvent:{}", dataEvent);
  202. final String metaId = mapping.getMetaId();
  203. ConnectorConfig tConfig = getConnectorConfig(mapping.getTargetConnectorId());
  204. Map<String, String> command = tableGroup.getCommand();
  205. List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
  206. // 转换配置(默认使用全局)
  207. List<Convert> convert = CollectionUtils.isEmpty(tableGroup.getConvert()) ? mapping.getConvert() : tableGroup.getConvert();
  208. // 插件配置(默认使用全局)
  209. Plugin plugin = null == tableGroup.getPlugin() ? mapping.getPlugin() : tableGroup.getPlugin();
  210. // 获取同步字段
  211. Picker picker = new Picker();
  212. PickerUtil.pickFields(picker, fieldMapping);
  213. // 1、映射字段
  214. String event = dataEvent.getEvent();
  215. Map<String, Object> data = dataEvent.getData();
  216. PickerUtil.pickData(picker, data);
  217. // 2、参数转换
  218. Map<String, Object> target = picker.getTarget();
  219. ConvertUtil.convert(convert, target);
  220. // 3、插件转换
  221. pluginFactory.convert(plugin, event, data, target);
  222. // 4、写入目标源
  223. Result writer = connectorFactory.writer(tConfig, picker.getTargetFields(), command, event, target);
  224. // 5、更新结果
  225. List<Map<String, Object>> list = new ArrayList<>(1);
  226. list.add(target);
  227. flush(metaId, writer, list);
  228. }
  229. /**
  230. * 更新缓存
  231. *
  232. * @param task
  233. * @param writer
  234. * @param data
  235. */
  236. private void flush(Task task, Result writer, List<Map<String, Object>> data) {
  237. flush(task.getId(), writer, data);
  238. // 发布刷新事件给FullExtractor
  239. task.setEndTime(System.currentTimeMillis());
  240. applicationContext.publishEvent(new FullRefreshEvent(applicationContext, task));
  241. }
  242. private void flush(String metaId, Result writer, List<Map<String, Object>> data) {
  243. // 引用传递
  244. long total = data.size();
  245. long fail = writer.getFail().get();
  246. Meta meta = getMeta(metaId);
  247. meta.getFail().getAndAdd(fail);
  248. meta.getSuccess().getAndAdd(total - fail);
  249. // print process
  250. logger.info("任务:{}, 成功:{}, 失败:{}", metaId, meta.getSuccess(), meta.getFail());
  251. // 记录错误数据
  252. Queue<Map<String, Object>> failData = writer.getFailData();
  253. boolean success = CollectionUtils.isEmpty(failData);
  254. if (!success) {
  255. data.clear();
  256. data.addAll(failData);
  257. }
  258. flushService.asyncWrite(metaId, success, data);
  259. // 记录错误日志
  260. String error = writer.getError().toString();
  261. if (StringUtils.isNotBlank(error)) {
  262. flushService.asyncWrite(metaId, error);
  263. }
  264. }
  265. /**
  266. * 获取Meta(注: 没有bean拷贝, 便于直接更新缓存)
  267. *
  268. * @param metaId
  269. * @return
  270. */
  271. private Meta getMeta(String metaId) {
  272. Assert.hasText(metaId, "Meta id can not be empty.");
  273. Meta meta = cacheService.get(metaId, Meta.class);
  274. Assert.notNull(meta, "Meta can not be null.");
  275. return meta;
  276. }
  277. /**
  278. * 获取连接配置
  279. *
  280. * @param connectorId
  281. * @return
  282. */
  283. private ConnectorConfig getConnectorConfig(String connectorId) {
  284. Assert.hasText(connectorId, "Connector id can not be empty.");
  285. Connector conn = cacheService.get(connectorId, Connector.class);
  286. Assert.notNull(conn, "Connector can not be null.");
  287. Connector connector = new Connector();
  288. BeanUtils.copyProperties(conn, connector);
  289. return connector.getConfig();
  290. }
  291. /**
  292. * 批量写入
  293. *
  294. * @param config
  295. * @param command
  296. * @param fields
  297. * @param target
  298. * @param threadSize
  299. * @param batchSize
  300. * @return
  301. */
  302. private Result writeBatch(ConnectorConfig config, Map<String, String> command, List<Field> fields, List<Map<String, Object>> target,
  303. int threadSize, int batchSize) {
  304. // 总数
  305. int total = target.size();
  306. // 单次任务
  307. if (total <= batchSize) {
  308. return connectorFactory.writer(config, command, fields, target);
  309. }
  310. // 批量任务, 拆分
  311. int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
  312. threadSize = taskSize <= threadSize ? taskSize : threadSize;
  313. // 转换为消息队列,根据batchSize获取数据,并发写入
  314. Queue<Map<String, Object>> queue = new ConcurrentLinkedQueue<>(target);
  315. // 创建线程池
  316. final ThreadPoolTaskExecutor executor = getThreadPoolTaskExecutor(threadSize, taskSize - threadSize);
  317. final Result result = new Result();
  318. for (; ; ) {
  319. if (taskSize <= 0) {
  320. break;
  321. }
  322. // TODO 优化 CountDownLatch
  323. final CountDownLatch latch = new CountDownLatch(threadSize);
  324. for (int i = 0; i < threadSize; i++) {
  325. executor.execute(() -> {
  326. try {
  327. Result w = parallelTask(batchSize, queue, config, command, fields);
  328. // CAS
  329. result.getFailData().addAll(w.getFailData());
  330. result.getFail().getAndAdd(w.getFail().get());
  331. result.getError().append(w.getError());
  332. } catch (Exception e) {
  333. result.getError().append(e.getMessage()).append("\r\n");
  334. } finally {
  335. latch.countDown();
  336. }
  337. });
  338. }
  339. try {
  340. latch.await();
  341. } catch (InterruptedException e) {
  342. logger.error(e.getMessage());
  343. }
  344. taskSize -= threadSize;
  345. }
  346. executor.shutdown();
  347. return result;
  348. }
  349. private Result parallelTask(int batchSize, Queue<Map<String, Object>> queue, ConnectorConfig config, Map<String, String> command,
  350. List<Field> fields) {
  351. List<Map<String, Object>> data = new ArrayList<>();
  352. for (int j = 0; j < batchSize; j++) {
  353. Map<String, Object> poll = queue.poll();
  354. if (null == poll) {
  355. break;
  356. }
  357. data.add(poll);
  358. }
  359. return connectorFactory.writer(config, command, fields, data);
  360. }
  361. private ThreadPoolTaskExecutor getThreadPoolTaskExecutor(int threadSize, int queueCapacity) {
  362. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  363. executor.setCorePoolSize(threadSize);
  364. executor.setMaxPoolSize(threadSize);
  365. executor.setQueueCapacity(queueCapacity);
  366. executor.setKeepAliveSeconds(30);
  367. executor.setAwaitTerminationSeconds(30);
  368. executor.setThreadNamePrefix("ParserExecutor");
  369. executor.setWaitForTasksToCompleteOnShutdown(true);
  370. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
  371. executor.initialize();
  372. return executor;
  373. }
  374. }