package org.dbsyncer.manager; import org.dbsyncer.common.event.ClosedEvent; import org.dbsyncer.common.model.Paging; import org.dbsyncer.connector.config.ConnectorConfig; import org.dbsyncer.connector.config.MetaInfo; import org.dbsyncer.connector.enums.ConnectorEnum; import org.dbsyncer.connector.enums.FilterEnum; import org.dbsyncer.connector.enums.OperationEnum; import org.dbsyncer.listener.enums.QuartzFilterEnum; import org.dbsyncer.manager.config.OperationConfig; import org.dbsyncer.manager.config.QueryConfig; import org.dbsyncer.manager.enums.GroupStrategyEnum; import org.dbsyncer.manager.enums.HandlerEnum; import org.dbsyncer.manager.puller.Puller; import org.dbsyncer.manager.template.impl.DataTemplate; import org.dbsyncer.manager.template.impl.OperationTemplate; import org.dbsyncer.parser.Parser; import org.dbsyncer.parser.enums.ConvertEnum; import org.dbsyncer.parser.enums.MetaEnum; import org.dbsyncer.parser.model.*; import org.dbsyncer.plugin.PluginFactory; import org.dbsyncer.plugin.config.Plugin; import org.dbsyncer.storage.constant.ConfigConstant; import org.dbsyncer.storage.enums.StorageDataStatusEnum; import org.dbsyncer.storage.enums.StorageEnum; import org.dbsyncer.storage.query.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.util.Assert; import java.time.Instant; import java.util.List; import java.util.Map; /** * @author AE86 * @version 1.0.0 * @date 2019/9/16 23:59 */ @Component public class ManagerFactory implements Manager, ApplicationListener { private final Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private Parser parser; @Autowired private PluginFactory pluginFactory; @Autowired private OperationTemplate operationTemplate; @Autowired private DataTemplate dataTemplate; @Autowired private Map map; @Override public boolean alive(ConnectorConfig config) { return parser.alive(config); } @Override public List getTable(ConnectorConfig config) { return parser.getTable(config); } @Override public MetaInfo getMetaInfo(String connectorId, String tableName) { return parser.getMetaInfo(connectorId, tableName); } @Override public String addConnector(ConfigModel model) { return operationTemplate.execute(new OperationConfig(model, HandlerEnum.OPR_ADD.getHandler())); } @Override public String editConnector(ConfigModel model) { return operationTemplate.execute(new OperationConfig(model, HandlerEnum.OPR_EDIT.getHandler())); } @Override public void removeConnector(String connectorId) { operationTemplate.remove(new OperationConfig(connectorId)); } @Override public Connector getConnector(String connectorId) { return operationTemplate.queryObject(Connector.class, connectorId); } @Override public List getConnectorAll() { Connector connector = new Connector(); connector.setType(ConfigConstant.CONNECTOR); QueryConfig queryConfig = new QueryConfig<>(connector); List connectors = operationTemplate.queryAll(queryConfig); return connectors; } @Override public String addMapping(ConfigModel model) { return operationTemplate.execute(new OperationConfig(model, HandlerEnum.OPR_ADD.getHandler())); } @Override public String editMapping(ConfigModel model) { return operationTemplate.execute(new OperationConfig(model, HandlerEnum.OPR_EDIT.getHandler())); } @Override public void removeMapping(String mappingId) { operationTemplate.remove(new OperationConfig(mappingId)); } @Override public Mapping getMapping(String mappingId) { return operationTemplate.queryObject(Mapping.class, mappingId); } @Override public List getMappingAll() { Mapping mapping = new Mapping(); mapping.setType(ConfigConstant.MAPPING); QueryConfig queryConfig = new QueryConfig<>(mapping); List mappings = operationTemplate.queryAll(queryConfig); return mappings; } @Override public String addTableGroup(ConfigModel model) { return operationTemplate.execute(new OperationConfig(model, GroupStrategyEnum.TABLE, HandlerEnum.OPR_ADD.getHandler())); } @Override public String editTableGroup(ConfigModel model) { return operationTemplate.execute(new OperationConfig(model, GroupStrategyEnum.TABLE, HandlerEnum.OPR_EDIT.getHandler())); } @Override public void removeTableGroup(String tableGroupId) { operationTemplate.remove(new OperationConfig(tableGroupId, GroupStrategyEnum.TABLE)); } @Override public TableGroup getTableGroup(String tableGroupId) { return operationTemplate.queryObject(TableGroup.class, tableGroupId); } @Override public List getTableGroupAll(String mappingId) { TableGroup tableGroup = new TableGroup(); tableGroup.setType(ConfigConstant.TABLE_GROUP); tableGroup.setMappingId(mappingId); QueryConfig queryConfig = new QueryConfig<>(tableGroup, GroupStrategyEnum.TABLE); List tableGroups = operationTemplate.queryAll(queryConfig); return tableGroups; } @Override public Map getCommand(Mapping mapping, TableGroup tableGroup) { return parser.getCommand(mapping, tableGroup); } @Override public long getCount(String connectorId, Map command) { return parser.getCount(connectorId, command); } @Override public String addMeta(ConfigModel model) { return operationTemplate.execute(new OperationConfig(model, HandlerEnum.OPR_ADD.getHandler())); } @Override public String editMeta(ConfigModel model) { return operationTemplate.execute(new OperationConfig(model, HandlerEnum.OPR_EDIT.getHandler())); } @Override public Meta getMeta(String metaId) { return operationTemplate.queryObject(Meta.class, metaId); } @Override public void removeMeta(String metaId) { operationTemplate.remove(new OperationConfig(metaId)); } @Override public List getMetaAll() { Meta meta = new Meta(); meta.setType(ConfigConstant.META); QueryConfig queryConfig = new QueryConfig<>(meta); return operationTemplate.queryAll(queryConfig); } @Override public String addConfig(ConfigModel model) { return operationTemplate.execute(new OperationConfig(model, HandlerEnum.OPR_ADD.getHandler())); } @Override public String editConfig(ConfigModel model) { return operationTemplate.execute(new OperationConfig(model, HandlerEnum.OPR_EDIT.getHandler())); } @Override public Config getConfig(String configId) { return operationTemplate.queryObject(Config.class, configId); } @Override public void removeConfig(String configId) { operationTemplate.remove(new OperationConfig(configId)); } @Override public List getConfigAll() { Config config = new Config(); config.setType(ConfigConstant.CONFIG); QueryConfig queryConfig = new QueryConfig<>(config); return operationTemplate.queryAll(queryConfig); } @Override public Paging queryData(Query query, String collectionId) { query.setType(StorageEnum.DATA); query.setCollection(collectionId); return dataTemplate.query(query); } @Override public void clearData(String collectionId) { dataTemplate.clear(StorageEnum.DATA, collectionId); } @Override public Paging queryLog(Query query) { query.setType(StorageEnum.LOG); return dataTemplate.query(query); } @Override public void clearLog() { dataTemplate.clear(StorageEnum.LOG, null); } @Override public List getConnectorEnumAll() { return parser.getConnectorEnumAll(); } @Override public List getOperationEnumAll() { return parser.getOperationEnumAll(); } @Override public List getQuartzFilterEnumAll() { return parser.getQuartzFilterEnumAll(); } @Override public List getFilterEnumAll() { return parser.getFilterEnumAll(); } @Override public List getConvertEnumAll() { return parser.getConvertEnumAll(); } @Override public List getStorageDataStatusEnumAll() { return parser.getStorageDataStatusEnumAll(); } @Override public List getPluginAll() { return pluginFactory.getPluginAll(); } @Override public String getPluginPath() { return pluginFactory.getPluginPath(); } @Override public void loadPlugins() { pluginFactory.loadPlugins(); } @Override public void start(Mapping mapping) { Puller puller = getPuller(mapping); // 标记运行中 changeMetaState(mapping.getMetaId(), MetaEnum.RUNNING); puller.asyncStart(mapping); } @Override public void close(Mapping mapping) { Puller puller = getPuller(mapping); // 标记停止中 String metaId = mapping.getMetaId(); changeMetaState(metaId, MetaEnum.STOPPING); puller.close(metaId); } @Override public void changeMetaState(String metaId, MetaEnum metaEnum) { Meta meta = getMeta(metaId); int code = metaEnum.getCode(); if (null != meta && meta.getState() != code) { meta.setState(code); meta.setUpdateTime(Instant.now().toEpochMilli()); editMeta(meta); } } @Override public void onApplicationEvent(ClosedEvent event) { // 异步监听任务关闭事件 changeMetaState(event.getId(), MetaEnum.READY); } private Puller getPuller(Mapping mapping) { Assert.notNull(mapping, "驱动不能为空"); String model = mapping.getModel(); String metaId = mapping.getMetaId(); Assert.hasText(model, "同步方式不能为空"); Assert.hasText(metaId, "任务ID不能为空"); Puller puller = map.get(model.concat("Puller")); Assert.notNull(puller, String.format("未知的同步方式: %s", model)); return puller; } }