package org.dbsyncer.parser;
import org.dbsyncer.cache.CacheService;
import org.dbsyncer.common.event.RowChangedEvent;
import org.dbsyncer.common.model.AbstractConnectorConfig;
import org.dbsyncer.common.model.FullConvertContext;
import org.dbsyncer.common.model.Result;
import org.dbsyncer.common.spi.ConnectorMapper;
import org.dbsyncer.common.spi.ConvertContext;
import org.dbsyncer.common.util.CollectionUtils;
import org.dbsyncer.common.util.JsonUtil;
import org.dbsyncer.common.util.StringUtil;
import org.dbsyncer.connector.ConnectorFactory;
import org.dbsyncer.connector.config.CommandConfig;
import org.dbsyncer.connector.config.ReaderConfig;
import org.dbsyncer.connector.config.WriterBatchConfig;
import org.dbsyncer.connector.constant.ConnectorConstant;
import org.dbsyncer.connector.enums.ConnectorEnum;
import org.dbsyncer.connector.enums.FilterEnum;
import org.dbsyncer.connector.enums.OperationEnum;
import org.dbsyncer.connector.model.Field;
import org.dbsyncer.connector.model.MetaInfo;
import org.dbsyncer.connector.model.Table;
import org.dbsyncer.connector.util.PrimaryKeyUtil;
import org.dbsyncer.listener.enums.QuartzFilterEnum;
import org.dbsyncer.parser.enums.ConvertEnum;
import org.dbsyncer.parser.event.FullRefreshEvent;
import org.dbsyncer.parser.logger.LogService;
import org.dbsyncer.parser.logger.LogType;
import org.dbsyncer.parser.model.BatchWriter;
import org.dbsyncer.parser.model.Connector;
import org.dbsyncer.parser.model.FieldMapping;
import org.dbsyncer.parser.model.Mapping;
import org.dbsyncer.parser.model.Picker;
import org.dbsyncer.parser.model.TableGroup;
import org.dbsyncer.parser.model.Task;
import org.dbsyncer.parser.strategy.FlushStrategy;
import org.dbsyncer.parser.strategy.ParserStrategy;
import org.dbsyncer.parser.util.ConvertUtil;
import org.dbsyncer.parser.util.PickerUtil;
import org.dbsyncer.plugin.PluginFactory;
import org.dbsyncer.storage.enums.StorageDataStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
/**
* @author AE86
* @version 1.0.0
* @date 2019/9/29 22:38
*/
@Component
public class ParserFactory implements Parser {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private ConnectorFactory connectorFactory;
@Autowired
private PluginFactory pluginFactory;
@Autowired
private CacheService cacheService;
@Autowired
private LogService logService;
@Autowired
private FlushStrategy flushStrategy;
@Autowired
@Qualifier("taskExecutor")
private Executor taskExecutor;
@Qualifier("webApplicationContext")
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ParserStrategy parserStrategy;
@Override
public ConnectorMapper connect(AbstractConnectorConfig config) {
return connectorFactory.connect(config);
}
@Override
public boolean refreshConnectorConfig(AbstractConnectorConfig config) {
return connectorFactory.refresh(config);
}
@Override
public boolean isAliveConnectorConfig(AbstractConnectorConfig config) {
boolean alive = false;
try {
alive = connectorFactory.isAlive(config);
} catch (Exception e) {
LogType.ConnectorLog logType = LogType.ConnectorLog.FAILED;
logService.log(logType, "%s%s", logType.getName(), e.getMessage());
}
// 断线重连
if (!alive) {
try {
alive = connectorFactory.refresh(config);
} catch (Exception e) {
logger.error(e.getMessage());
}
if (alive) {
logger.info(LogType.ConnectorLog.RECONNECT_SUCCESS.getMessage());
}
}
return alive;
}
@Override
public List
getTable(ConnectorMapper config) {
return connectorFactory.getTable(config);
}
@Override
public MetaInfo getMetaInfo(String connectorId, String tableName) {
Connector connector = getConnector(connectorId);
ConnectorMapper connectorMapper = connectorFactory.connect(connector.getConfig());
MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableName);
if (!CollectionUtils.isEmpty(connector.getTable())) {
for (Table t : connector.getTable()) {
if (t.getName().equals(tableName)) {
metaInfo.setTableType(t.getType());
metaInfo.setSql(t.getSql());
break;
}
}
}
return metaInfo;
}
@Override
public Map getCommand(Mapping mapping, TableGroup tableGroup) {
AbstractConnectorConfig sConnConfig = getConnectorConfig(mapping.getSourceConnectorId());
AbstractConnectorConfig tConnConfig = getConnectorConfig(mapping.getTargetConnectorId());
Table sourceTable = tableGroup.getSourceTable();
Table targetTable = tableGroup.getTargetTable();
Table sTable = new Table(sourceTable.getName(), sourceTable.getType(), new ArrayList<>(), sourceTable.getSql());
Table tTable = new Table(targetTable.getName(), targetTable.getType(), new ArrayList<>(), sourceTable.getSql());
List fieldMapping = tableGroup.getFieldMapping();
if (!CollectionUtils.isEmpty(fieldMapping)) {
fieldMapping.forEach(m -> {
if (null != m.getSource()) {
sTable.getColumn().add(m.getSource());
}
if (null != m.getTarget()) {
tTable.getColumn().add(m.getTarget());
}
});
}
final CommandConfig sourceConfig = new CommandConfig(sConnConfig.getConnectorType(), sTable, sConnConfig, tableGroup.getFilter());
final CommandConfig targetConfig = new CommandConfig(tConnConfig.getConnectorType(), tTable, tConnConfig, null);
// 获取连接器同步参数
Map command = connectorFactory.getCommand(sourceConfig, targetConfig);
return command;
}
@Override
public long getCount(String connectorId, Map command) {
ConnectorMapper connectorMapper = connectorFactory.connect(getConnectorConfig(connectorId));
return connectorFactory.getCount(connectorMapper, command);
}
@Override
public Connector parseConnector(String json) {
Map conn = JsonUtil.parseMap(json);
Map config = (Map) conn.remove("config");
Connector connector = JsonUtil.jsonToObj(conn.toString(), Connector.class);
Assert.notNull(connector, "Connector can not be null.");
String connectorType = (String) config.get("connectorType");
Class> configClass = ConnectorEnum.getConfigClass(connectorType);
AbstractConnectorConfig obj = (AbstractConnectorConfig) JsonUtil.jsonToObj(config.toString(), configClass);
connector.setConfig(obj);
return connector;
}
@Override
public T parseObject(String json, Class clazz) {
T t = JsonUtil.jsonToObj(json, clazz);
return t;
}
@Override
public List getConnectorEnumAll() {
return Arrays.asList(ConnectorEnum.values());
}
@Override
public List getOperationEnumAll() {
return Arrays.asList(OperationEnum.values());
}
@Override
public List getQuartzFilterEnumAll() {
return Arrays.asList(QuartzFilterEnum.values());
}
@Override
public List getFilterEnumAll() {
return Arrays.asList(FilterEnum.values());
}
@Override
public List getConvertEnumAll() {
return Arrays.asList(ConvertEnum.values());
}
@Override
public List getStorageDataStatusEnumAll() {
return Arrays.asList(StorageDataStatusEnum.values());
}
@Override
public void execute(Task task, Mapping mapping, TableGroup tableGroup, ExecutorService executorService) {
final String metaId = task.getId();
final String sourceConnectorId = mapping.getSourceConnectorId();
final String targetConnectorId = mapping.getTargetConnectorId();
AbstractConnectorConfig sConfig = getConnectorConfig(sourceConnectorId);
Assert.notNull(sConfig, "数据源配置不能为空.");
AbstractConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
Assert.notNull(tConfig, "目标源配置不能为空.");
TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
Map command = group.getCommand();
Assert.notEmpty(command, "执行命令不能为空.");
List fieldMapping = group.getFieldMapping();
Table sourceTable = group.getSourceTable();
String sTableName = sourceTable.getName();
String tTableName = group.getTargetTable().getName();
Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
// 获取同步字段
Picker picker = new Picker(fieldMapping);
List primaryKeys = PrimaryKeyUtil.findTablePrimaryKeys(sourceTable);
boolean supportedCursor = StringUtil.isNotBlank(command.get(ConnectorConstant.OPERTION_QUERY_CURSOR));
int pageSize = mapping.getReadNum();
int batchSize = mapping.getBatchNum();
final ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
final ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
final String event = ConnectorConstant.OPERTION_INSERT;
final FullConvertContext context = new FullConvertContext(sConnectorMapper, tConnectorMapper, sTableName, tTableName, event);
for (; ; ) {
if (!task.isRunning()) {
logger.warn("任务被中止:{}", metaId);
break;
}
// 1、获取数据源数据
ReaderConfig readerConfig = new ReaderConfig(sourceTable, command, new ArrayList<>(), supportedCursor, task.getCursors(), task.getPageIndex(), pageSize);
Result reader = connectorFactory.reader(sConnectorMapper, readerConfig);
List