ManagerFactory.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. package org.dbsyncer.manager;
  2. import org.dbsyncer.common.event.ClosedEvent;
  3. import org.dbsyncer.common.model.AbstractConnectorConfig;
  4. import org.dbsyncer.common.model.Paging;
  5. import org.dbsyncer.common.spi.ConnectorMapper;
  6. import org.dbsyncer.common.util.CollectionUtils;
  7. import org.dbsyncer.connector.enums.ConnectorEnum;
  8. import org.dbsyncer.connector.enums.FilterEnum;
  9. import org.dbsyncer.connector.enums.OperationEnum;
  10. import org.dbsyncer.connector.model.MetaInfo;
  11. import org.dbsyncer.connector.model.Table;
  12. import org.dbsyncer.listener.enums.QuartzFilterEnum;
  13. import org.dbsyncer.manager.enums.CommandEnum;
  14. import org.dbsyncer.manager.enums.GroupStrategyEnum;
  15. import org.dbsyncer.manager.model.OperationConfig;
  16. import org.dbsyncer.manager.model.QueryConfig;
  17. import org.dbsyncer.manager.template.OperationTemplate;
  18. import org.dbsyncer.parser.Parser;
  19. import org.dbsyncer.parser.enums.ConvertEnum;
  20. import org.dbsyncer.parser.enums.MetaEnum;
  21. import org.dbsyncer.parser.enums.ModelEnum;
  22. import org.dbsyncer.parser.logger.LogService;
  23. import org.dbsyncer.parser.logger.LogType;
  24. import org.dbsyncer.parser.model.*;
  25. import org.dbsyncer.plugin.PluginFactory;
  26. import org.dbsyncer.plugin.config.Plugin;
  27. import org.dbsyncer.storage.StorageService;
  28. import org.dbsyncer.storage.constant.ConfigConstant;
  29. import org.dbsyncer.storage.enums.StorageDataStatusEnum;
  30. import org.dbsyncer.storage.enums.StorageEnum;
  31. import org.dbsyncer.storage.query.Query;
  32. import org.springframework.beans.factory.annotation.Autowired;
  33. import org.springframework.context.ApplicationListener;
  34. import org.springframework.stereotype.Component;
  35. import org.springframework.util.Assert;
  36. import java.time.Instant;
  37. import java.util.Comparator;
  38. import java.util.List;
  39. import java.util.Map;
  40. import java.util.stream.Collectors;
  41. /**
  42. * @author AE86
  43. * @version 1.0.0
  44. * @date 2019/9/16 23:59
  45. */
  46. @Component
  47. public class ManagerFactory implements Manager, ApplicationListener<ClosedEvent> {
  48. @Autowired
  49. private Parser parser;
  50. @Autowired
  51. private PluginFactory pluginFactory;
  52. @Autowired
  53. private OperationTemplate operationTemplate;
  54. @Autowired
  55. private StorageService storageService;
  56. @Autowired
  57. private LogService logService;
  58. @Autowired
  59. private Map<String, Puller> map;
  60. @Override
  61. public String addProjectGroup(ConfigModel model) {
  62. return operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_ADD));
  63. }
  64. @Override
  65. public String editProjectGroup(ConfigModel model) {
  66. return operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_EDIT));
  67. }
  68. @Override
  69. public ProjectGroup getProjectGroup(String id) {
  70. return operationTemplate.queryObject(ProjectGroup.class, id);
  71. }
  72. @Override
  73. public void removeProjectGroup(String id) {
  74. operationTemplate.remove(new OperationConfig(id));
  75. }
  76. @Override
  77. public List<ProjectGroup> getProjectGroupAll() {
  78. ProjectGroup projectGroup = new ProjectGroup();
  79. projectGroup.setType(ConfigConstant.PROJECT_GROUP);
  80. QueryConfig<ProjectGroup> queryConfig = new QueryConfig<>(projectGroup);
  81. List<ProjectGroup> groups = operationTemplate.queryAll(queryConfig);
  82. return groups;
  83. }
  84. @Override
  85. public ConnectorMapper connect(AbstractConnectorConfig config) {
  86. return parser.connect(config);
  87. }
  88. @Override
  89. public boolean refreshConnectorConfig(AbstractConnectorConfig config) {
  90. return parser.refreshConnectorConfig(config);
  91. }
  92. @Override
  93. public boolean isAliveConnectorConfig(AbstractConnectorConfig config) {
  94. return parser.isAliveConnectorConfig(config);
  95. }
  96. @Override
  97. public List<Table> getTable(ConnectorMapper config) {
  98. return parser.getTable(config);
  99. }
  100. @Override
  101. public MetaInfo getMetaInfo(String connectorId, String tableName) {
  102. return parser.getMetaInfo(connectorId, tableName);
  103. }
  104. @Override
  105. public String addConnector(ConfigModel model) {
  106. return operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_ADD));
  107. }
  108. @Override
  109. public String editConnector(ConfigModel model) {
  110. return operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_EDIT));
  111. }
  112. @Override
  113. public void removeConnector(String connectorId) {
  114. operationTemplate.remove(new OperationConfig(connectorId));
  115. }
  116. @Override
  117. public Connector getConnector(String connectorId) {
  118. return operationTemplate.queryObject(Connector.class, connectorId);
  119. }
  120. @Override
  121. public List<Connector> getConnectorAll() {
  122. Connector connector = new Connector();
  123. connector.setType(ConfigConstant.CONNECTOR);
  124. QueryConfig<Connector> queryConfig = new QueryConfig<>(connector);
  125. List<Connector> connectors = operationTemplate.queryAll(queryConfig);
  126. return connectors;
  127. }
  128. @Override
  129. public void checkAllConnectorStatus() {
  130. List<Connector> list = getConnectorAll();
  131. if (!CollectionUtils.isEmpty(list)) {
  132. list.forEach(c -> {
  133. try {
  134. refreshConnectorConfig(c.getConfig());
  135. } catch (Exception e) {
  136. LogType.ConnectorLog logType = LogType.ConnectorLog.FAILED;
  137. logService.log(logType, "%s%s", logType.getName(), e.getMessage());
  138. }
  139. });
  140. }
  141. }
  142. @Override
  143. public String addMapping(ConfigModel model) {
  144. return operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_ADD));
  145. }
  146. @Override
  147. public String editMapping(ConfigModel model) {
  148. return operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_EDIT));
  149. }
  150. @Override
  151. public void removeMapping(String mappingId) {
  152. operationTemplate.remove(new OperationConfig(mappingId));
  153. }
  154. @Override
  155. public Mapping getMapping(String mappingId) {
  156. return operationTemplate.queryObject(Mapping.class, mappingId);
  157. }
  158. @Override
  159. public List<Mapping> getMappingAll() {
  160. Mapping mapping = new Mapping();
  161. mapping.setType(ConfigConstant.MAPPING);
  162. QueryConfig<Mapping> queryConfig = new QueryConfig<>(mapping);
  163. List<Mapping> mappings = operationTemplate.queryAll(queryConfig);
  164. return mappings;
  165. }
  166. @Override
  167. public String addTableGroup(ConfigModel model) {
  168. return operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_ADD, GroupStrategyEnum.TABLE));
  169. }
  170. @Override
  171. public String editTableGroup(ConfigModel model) {
  172. return operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_EDIT, GroupStrategyEnum.TABLE));
  173. }
  174. @Override
  175. public void removeTableGroup(String tableGroupId) {
  176. operationTemplate.remove(new OperationConfig(tableGroupId, GroupStrategyEnum.TABLE));
  177. }
  178. @Override
  179. public TableGroup getTableGroup(String tableGroupId) {
  180. return operationTemplate.queryObject(TableGroup.class, tableGroupId);
  181. }
  182. @Override
  183. public List<TableGroup> getTableGroupAll(String mappingId) {
  184. TableGroup tableGroup = new TableGroup();
  185. tableGroup.setType(ConfigConstant.TABLE_GROUP);
  186. tableGroup.setMappingId(mappingId);
  187. QueryConfig<TableGroup> queryConfig = new QueryConfig<>(tableGroup, GroupStrategyEnum.TABLE);
  188. List<TableGroup> tableGroups = operationTemplate.queryAll(queryConfig);
  189. return tableGroups;
  190. }
  191. @Override
  192. public List<TableGroup> getSortedTableGroupAll(String mappingId) {
  193. List<TableGroup> list = getTableGroupAll(mappingId)
  194. .stream()
  195. .sorted(Comparator.comparing(TableGroup::getIndex).reversed())
  196. .collect(Collectors.toList());
  197. return list;
  198. }
  199. @Override
  200. public int getTableGroupCount(String mappingId) {
  201. TableGroup tableGroup = new TableGroup();
  202. tableGroup.setType(ConfigConstant.TABLE_GROUP);
  203. tableGroup.setMappingId(mappingId);
  204. QueryConfig queryConfig = new QueryConfig<>(tableGroup, GroupStrategyEnum.TABLE);
  205. return operationTemplate.queryCount(queryConfig);
  206. }
  207. @Override
  208. public Map<String, String> getCommand(Mapping mapping, TableGroup tableGroup) {
  209. return parser.getCommand(mapping, tableGroup);
  210. }
  211. @Override
  212. public long getCount(String connectorId, Map<String, String> command) {
  213. return parser.getCount(connectorId, command);
  214. }
  215. @Override
  216. public String addMeta(ConfigModel model) {
  217. return operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_ADD));
  218. }
  219. @Override
  220. public String editMeta(ConfigModel model) {
  221. return operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_EDIT));
  222. }
  223. @Override
  224. public Meta getMeta(String metaId) {
  225. return operationTemplate.queryObject(Meta.class, metaId);
  226. }
  227. @Override
  228. public void removeMeta(String metaId) {
  229. operationTemplate.remove(new OperationConfig(metaId));
  230. }
  231. @Override
  232. public List<Meta> getMetaAll() {
  233. Meta meta = new Meta();
  234. meta.setType(ConfigConstant.META);
  235. QueryConfig<Meta> queryConfig = new QueryConfig<>(meta);
  236. return operationTemplate.queryAll(queryConfig);
  237. }
  238. @Override
  239. public String addConfig(ConfigModel model) {
  240. return operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_ADD));
  241. }
  242. @Override
  243. public String editConfig(ConfigModel model) {
  244. return operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_EDIT));
  245. }
  246. @Override
  247. public Config getConfig(String configId) {
  248. return operationTemplate.queryObject(Config.class, configId);
  249. }
  250. @Override
  251. public List<Config> getConfigAll() {
  252. Config config = new Config();
  253. config.setType(ConfigConstant.CONFIG);
  254. QueryConfig<Config> queryConfig = new QueryConfig<>(config);
  255. return operationTemplate.queryAll(queryConfig);
  256. }
  257. @Override
  258. public Paging queryData(Query query, String collectionId) {
  259. query.setType(StorageEnum.DATA);
  260. query.setCollection(collectionId);
  261. return storageService.query(query);
  262. }
  263. @Override
  264. public void clearData(String collectionId) {
  265. Meta meta = getMeta(collectionId);
  266. Mapping mapping = getMapping(meta.getMappingId());
  267. String model = ModelEnum.getModelEnum(mapping.getModel()).getName();
  268. LogType.MappingLog log = LogType.MappingLog.CLEAR_DATA;
  269. logService.log(log, "%s:%s(%s)", log.getMessage(), mapping.getName(), model);
  270. storageService.clear(StorageEnum.DATA, collectionId);
  271. }
  272. @Override
  273. public Paging queryLog(Query query) {
  274. query.setType(StorageEnum.LOG);
  275. return storageService.query(query);
  276. }
  277. @Override
  278. public void clearLog() {
  279. storageService.clear(StorageEnum.LOG, null);
  280. }
  281. @Override
  282. public List<ConnectorEnum> getConnectorEnumAll() {
  283. return parser.getConnectorEnumAll();
  284. }
  285. @Override
  286. public List<OperationEnum> getOperationEnumAll() {
  287. return parser.getOperationEnumAll();
  288. }
  289. @Override
  290. public List<QuartzFilterEnum> getQuartzFilterEnumAll() {
  291. return parser.getQuartzFilterEnumAll();
  292. }
  293. @Override
  294. public List<FilterEnum> getFilterEnumAll() {
  295. return parser.getFilterEnumAll();
  296. }
  297. @Override
  298. public List<ConvertEnum> getConvertEnumAll() {
  299. return parser.getConvertEnumAll();
  300. }
  301. @Override
  302. public List<StorageDataStatusEnum> getStorageDataStatusEnumAll() {
  303. return parser.getStorageDataStatusEnumAll();
  304. }
  305. @Override
  306. public List<Plugin> getPluginAll() {
  307. return pluginFactory.getPluginAll();
  308. }
  309. @Override
  310. public String getPluginPath() {
  311. return pluginFactory.getPluginPath();
  312. }
  313. @Override
  314. public String getLibraryPath() {
  315. return pluginFactory.getLibraryPath();
  316. }
  317. @Override
  318. public void loadPlugins() {
  319. pluginFactory.loadPlugins();
  320. }
  321. @Override
  322. public void start(Mapping mapping) {
  323. Puller puller = getPuller(mapping);
  324. // 标记运行中
  325. changeMetaState(mapping.getMetaId(), MetaEnum.RUNNING);
  326. puller.start(mapping);
  327. }
  328. @Override
  329. public void close(Mapping mapping) {
  330. Puller puller = getPuller(mapping);
  331. // 标记停止中
  332. String metaId = mapping.getMetaId();
  333. changeMetaState(metaId, MetaEnum.STOPPING);
  334. puller.close(metaId);
  335. }
  336. @Override
  337. public void changeMetaState(String metaId, MetaEnum metaEnum) {
  338. Meta meta = getMeta(metaId);
  339. int code = metaEnum.getCode();
  340. if (null != meta && meta.getState() != code) {
  341. meta.setState(code);
  342. meta.setUpdateTime(Instant.now().toEpochMilli());
  343. editMeta(meta);
  344. }
  345. }
  346. @Override
  347. public void onApplicationEvent(ClosedEvent event) {
  348. // 异步监听任务关闭事件
  349. changeMetaState(event.getId(), MetaEnum.READY);
  350. }
  351. private Puller getPuller(Mapping mapping) {
  352. Assert.notNull(mapping, "驱动不能为空");
  353. String model = mapping.getModel();
  354. String metaId = mapping.getMetaId();
  355. Assert.hasText(model, "同步方式不能为空");
  356. Assert.hasText(metaId, "任务ID不能为空");
  357. Puller puller = map.get(model.concat("Puller"));
  358. Assert.notNull(puller, String.format("未知的同步方式: %s", model));
  359. return puller;
  360. }
  361. }