MappingServiceImpl.java 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package org.dbsyncer.biz.impl;
  2. import org.dbsyncer.biz.BizException;
  3. import org.dbsyncer.biz.MappingService;
  4. import org.dbsyncer.biz.checker.Checker;
  5. import org.dbsyncer.biz.vo.ConnectorVo;
  6. import org.dbsyncer.biz.vo.MappingVo;
  7. import org.dbsyncer.biz.vo.MetaVo;
  8. import org.dbsyncer.common.util.CollectionUtils;
  9. import org.dbsyncer.parser.logger.LogService;
  10. import org.dbsyncer.monitor.Monitor;
  11. import org.dbsyncer.parser.enums.ModelEnum;
  12. import org.dbsyncer.parser.logger.LogType;
  13. import org.dbsyncer.parser.model.*;
  14. import org.dbsyncer.storage.constant.ConfigConstant;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import org.springframework.beans.BeanUtils;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.stereotype.Service;
  20. import org.springframework.util.Assert;
  21. import java.util.Comparator;
  22. import java.util.List;
  23. import java.util.Map;
  24. import java.util.stream.Collectors;
  25. /**
  26. * @author AE86
  27. * @version 1.0.0
  28. * @date 2019/10/17 23:20
  29. */
  30. @Service
  31. public class MappingServiceImpl extends BaseServiceImpl implements MappingService {
  32. private final Logger logger = LoggerFactory.getLogger(getClass());
  33. @Autowired
  34. private Monitor monitor;
  35. @Autowired
  36. private Checker mappingChecker;
  37. @Autowired
  38. private LogService logService;
  39. @Override
  40. public String add(Map<String, String> params) {
  41. ConfigModel model = mappingChecker.checkAddConfigModel(params);
  42. return manager.addMapping(model);
  43. }
  44. @Override
  45. public String edit(Map<String, String> params) {
  46. String id = params.get(ConfigConstant.CONFIG_MODEL_ID);
  47. Mapping mapping = assertMappingExist(id);
  48. synchronized (LOCK) {
  49. assertRunning(mapping.getMetaId());
  50. ConfigModel model = mappingChecker.checkEditConfigModel(params);
  51. return manager.editMapping(model);
  52. }
  53. }
  54. @Override
  55. public String remove(String id) {
  56. Mapping mapping = assertMappingExist(id);
  57. String metaId = mapping.getMetaId();
  58. synchronized (LOCK) {
  59. assertRunning(metaId);
  60. // 删除数据
  61. manager.clearData(metaId);
  62. // 删除meta
  63. manager.removeMeta(metaId);
  64. // 删除tableGroup
  65. List<TableGroup> groupList = manager.getTableGroupAll(id);
  66. if (!CollectionUtils.isEmpty(groupList)) {
  67. groupList.forEach(t -> manager.removeTableGroup(t.getId()));
  68. }
  69. // 删除驱动
  70. manager.removeMapping(id);
  71. log(mapping, LogType.MappingLog.DELETE);
  72. }
  73. return "驱动删除成功";
  74. }
  75. @Override
  76. public MappingVo getMapping(String id) {
  77. Mapping mapping = manager.getMapping(id);
  78. return convertMapping2Vo(mapping);
  79. }
  80. @Override
  81. public List<MappingVo> getMappingAll() {
  82. List<MappingVo> list = manager.getMappingAll()
  83. .stream()
  84. .map(m -> convertMapping2Vo(m))
  85. .sorted(Comparator.comparing(MappingVo::getUpdateTime).reversed())
  86. .collect(Collectors.toList());
  87. return list;
  88. }
  89. @Override
  90. public String start(String id) {
  91. Mapping mapping = assertMappingExist(id);
  92. final String metaId = mapping.getMetaId();
  93. synchronized (LOCK) {
  94. assertRunning(metaId);
  95. // 清空同步记录
  96. Meta meta = manager.getMeta(metaId);
  97. meta.getFail().set(0);
  98. meta.getSuccess().set(0);
  99. manager.editMeta(meta);
  100. manager.start(mapping);
  101. log(mapping, LogType.MappingLog.RUNNING);
  102. }
  103. return "驱动启动成功";
  104. }
  105. @Override
  106. public String stop(String id) {
  107. Mapping mapping = assertMappingExist(id);
  108. synchronized (LOCK) {
  109. if (!isRunning(mapping.getMetaId())) {
  110. throw new BizException("驱动已停止.");
  111. }
  112. manager.close(mapping);
  113. log(mapping, LogType.MappingLog.STOP);
  114. }
  115. return "驱动停止成功";
  116. }
  117. private void log(Mapping mapping, LogType logType) {
  118. String model = ModelEnum.getModelEnum(mapping.getModel()).getName();
  119. String msg = String.format("%s:%s(%s)", logType.getMessage(), mapping.getName(), model);
  120. logService.log(logType, msg);
  121. }
  122. private MappingVo convertMapping2Vo(Mapping mapping) {
  123. String model = mapping.getModel();
  124. Assert.notNull(mapping, "Mapping can not be null.");
  125. Connector s = manager.getConnector(mapping.getSourceConnectorId());
  126. Connector t = manager.getConnector(mapping.getTargetConnectorId());
  127. ConnectorVo sConn = new ConnectorVo(monitor.alive(s.getId()));
  128. BeanUtils.copyProperties(s, sConn);
  129. ConnectorVo tConn = new ConnectorVo(monitor.alive(t.getId()));
  130. BeanUtils.copyProperties(t, tConn);
  131. // 元信息
  132. Meta meta = manager.getMeta(mapping.getMetaId());
  133. Assert.notNull(meta, "Meta can not be null.");
  134. MetaVo metaVo = new MetaVo(ModelEnum.getModelEnum(model).getName(), mapping.getName());
  135. BeanUtils.copyProperties(meta, metaVo);
  136. MappingVo vo = new MappingVo(sConn, tConn, metaVo);
  137. BeanUtils.copyProperties(mapping, vo);
  138. return vo;
  139. }
  140. /**
  141. * 检查是否存在驱动
  142. *
  143. * @param mappingId
  144. * @return
  145. */
  146. private Mapping assertMappingExist(String mappingId) {
  147. Mapping mapping = manager.getMapping(mappingId);
  148. Assert.notNull(mapping, "驱动不存在.");
  149. return mapping;
  150. }
  151. }