PreloadTemplate.java 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package org.dbsyncer.manager.impl;
  2. import org.dbsyncer.common.model.Paging;
  3. import org.dbsyncer.common.util.CollectionUtils;
  4. import org.dbsyncer.common.util.JsonUtil;
  5. import org.dbsyncer.connector.ConnectorFactory;
  6. import org.dbsyncer.manager.ManagerFactory;
  7. import org.dbsyncer.parser.LogService;
  8. import org.dbsyncer.parser.LogType;
  9. import org.dbsyncer.parser.ProfileComponent;
  10. import org.dbsyncer.parser.command.PreloadCommand;
  11. import org.dbsyncer.parser.enums.CommandEnum;
  12. import org.dbsyncer.parser.enums.GroupStrategyEnum;
  13. import org.dbsyncer.parser.enums.MetaEnum;
  14. import org.dbsyncer.parser.impl.OperationTemplate;
  15. import org.dbsyncer.parser.model.ConfigModel;
  16. import org.dbsyncer.parser.model.Connector;
  17. import org.dbsyncer.parser.model.Mapping;
  18. import org.dbsyncer.parser.model.Meta;
  19. import org.dbsyncer.parser.model.OperationConfig;
  20. import org.dbsyncer.plugin.PluginFactory;
  21. import org.dbsyncer.sdk.connector.ConnectorInstance;
  22. import org.dbsyncer.storage.StorageService;
  23. import org.dbsyncer.storage.constant.ConfigConstant;
  24. import org.dbsyncer.storage.enums.StorageEnum;
  25. import org.dbsyncer.storage.query.Query;
  26. import org.slf4j.Logger;
  27. import org.slf4j.LoggerFactory;
  28. import org.springframework.context.ApplicationListener;
  29. import org.springframework.context.event.ContextRefreshedEvent;
  30. import org.springframework.stereotype.Component;
  31. import javax.annotation.Resource;
  32. import java.util.Arrays;
  33. import java.util.List;
  34. import java.util.Map;
  35. import java.util.concurrent.Executor;
  36. import java.util.stream.Stream;
  37. /**
  38. * 预加载配置模板
  39. *
  40. * @author AE86
  41. * @version 1.0.0
  42. * @date 2019/9/16 23:59
  43. */
  44. @Component
  45. public final class PreloadTemplate implements ApplicationListener<ContextRefreshedEvent> {
  46. private final Logger logger = LoggerFactory.getLogger(getClass());
  47. @Resource
  48. private OperationTemplate operationTemplate;
  49. @Resource
  50. private ProfileComponent profileComponent;
  51. @Resource
  52. private ManagerFactory managerFactory;
  53. @Resource
  54. private ConnectorFactory connectorFactory;
  55. @Resource
  56. private PluginFactory pluginFactory;
  57. @Resource
  58. private StorageService storageService;
  59. @Resource
  60. private LogService logService;
  61. @Resource
  62. private Executor generalExecutor;
  63. private boolean preloadCompleted;
  64. @Override
  65. public void onApplicationEvent(ContextRefreshedEvent event) {
  66. // Load configModels
  67. Arrays.stream(CommandEnum.values()).filter(commandEnum -> commandEnum.isPreload()).forEach(commandEnum -> execute(commandEnum));
  68. // Load plugins
  69. pluginFactory.loadPlugins();
  70. // Load connectorInstances
  71. loadConnectorInstance();
  72. // Launch drivers
  73. launch();
  74. preloadCompleted = true;
  75. }
  76. /**
  77. * 是否完成预加载配置
  78. *
  79. * @return
  80. */
  81. public boolean isPreloadCompleted() {
  82. return preloadCompleted;
  83. }
  84. public void reload(String json) {
  85. Map<String, Map> map = JsonUtil.jsonToObj(json, Map.class);
  86. if (CollectionUtils.isEmpty(map)) {
  87. return;
  88. }
  89. // Load configModels
  90. Stream.of(CommandEnum.PRELOAD_SYSTEM, CommandEnum.PRELOAD_USER, CommandEnum.PRELOAD_CONNECTOR, CommandEnum.PRELOAD_MAPPING,
  91. CommandEnum.PRELOAD_META, CommandEnum.PRELOAD_PROJECT_GROUP).forEach(commandEnum -> reload(map, commandEnum));
  92. // Load connectorInstances
  93. loadConnectorInstance();
  94. // Launch drivers
  95. launch();
  96. }
  97. private void launch() {
  98. List<Meta> metas = profileComponent.getMetaAll();
  99. if (!CollectionUtils.isEmpty(metas)) {
  100. metas.forEach(m -> {
  101. // 恢复驱动状态
  102. if (MetaEnum.RUNNING.getCode() == m.getState()) {
  103. Mapping mapping = profileComponent.getMapping(m.getMappingId());
  104. managerFactory.start(mapping);
  105. } else if (MetaEnum.STOPPING.getCode() == m.getState()) {
  106. managerFactory.changeMetaState(m.getId(), MetaEnum.READY);
  107. }
  108. });
  109. }
  110. }
  111. private void execute(CommandEnum commandEnum) {
  112. Query query = new Query();
  113. query.setType(StorageEnum.CONFIG);
  114. String modelType = commandEnum.getModelType();
  115. query.addFilter(ConfigConstant.CONFIG_MODEL_TYPE, modelType);
  116. int pageNum = 1;
  117. int pageSize = 20;
  118. long total = 0;
  119. for (; ; ) {
  120. query.setPageNum(pageNum);
  121. query.setPageSize(pageSize);
  122. Paging paging = storageService.query(query);
  123. List<Map> data = (List<Map>) paging.getData();
  124. if (CollectionUtils.isEmpty(data)) {
  125. break;
  126. }
  127. data.forEach(map -> {
  128. String json = (String) map.get(ConfigConstant.CONFIG_MODEL_JSON);
  129. ConfigModel model = (ConfigModel) commandEnum.getCommandExecutor().execute(new PreloadCommand(profileComponent, json));
  130. if (null != model) {
  131. operationTemplate.cache(model, commandEnum.getGroupStrategyEnum());
  132. }
  133. });
  134. total += paging.getTotal();
  135. pageNum++;
  136. }
  137. logger.info("{}:{}", modelType, total);
  138. }
  139. private void reload(Map<String, Map> map, CommandEnum commandEnum) {
  140. reload(map, commandEnum, commandEnum.getModelType());
  141. }
  142. private void reload(Map<String, Map> map, CommandEnum commandEnum, String groupId) {
  143. Map config = map.get(groupId);
  144. if (null == config) {
  145. return;
  146. }
  147. OperationTemplate.Group group = JsonUtil.jsonToObj(config.toString(), OperationTemplate.Group.class);
  148. if (null == group) {
  149. return;
  150. }
  151. List<String> index = group.getIndex();
  152. if (CollectionUtils.isEmpty(index)) {
  153. return;
  154. }
  155. for (String e : index) {
  156. Map m = map.get(e);
  157. ConfigModel model = (ConfigModel) commandEnum.getCommandExecutor().execute(new PreloadCommand(profileComponent, m.toString()));
  158. operationTemplate.execute(new OperationConfig(model, CommandEnum.OPR_ADD, commandEnum.getGroupStrategyEnum()));
  159. // Load tableGroups
  160. if (CommandEnum.PRELOAD_MAPPING == commandEnum) {
  161. reload(map, CommandEnum.PRELOAD_TABLE_GROUP, operationTemplate.getGroupId(model, GroupStrategyEnum.PRELOAD_TABLE_GROUP));
  162. }
  163. }
  164. }
  165. private void loadConnectorInstance() {
  166. List<Connector> list = profileComponent.getConnectorAll();
  167. if (!CollectionUtils.isEmpty(list)) {
  168. list.forEach(connector -> {
  169. generalExecutor.execute(() -> {
  170. try {
  171. connectorFactory.disconnect(connector.getConfig());
  172. ConnectorInstance connectorInstance = connectorFactory.connect(connector.getConfig());
  173. logger.info("Completed connection {} {}", connector.getConfig().getConnectorType(), connectorInstance.getServiceUrl());
  174. } catch (Exception e) {
  175. logger.error("连接配置异常", e);
  176. logService.log(LogType.ConnectorLog.FAILED, e.getMessage());
  177. }
  178. });
  179. });
  180. }
  181. }
  182. }