package org.dbsyncer.parser;
import org.dbsyncer.cache.CacheService;
import org.dbsyncer.common.event.RowChangedEvent;
import org.dbsyncer.common.model.Result;
import org.dbsyncer.common.util.CollectionUtils;
import org.dbsyncer.common.util.JsonUtil;
import org.dbsyncer.common.util.StringUtil;
import org.dbsyncer.connector.ConnectorFactory;
import org.dbsyncer.connector.ConnectorMapper;
import org.dbsyncer.connector.config.*;
import org.dbsyncer.connector.constant.ConnectorConstant;
import org.dbsyncer.connector.enums.ConnectorEnum;
import org.dbsyncer.connector.enums.FilterEnum;
import org.dbsyncer.connector.enums.OperationEnum;
import org.dbsyncer.listener.enums.QuartzFilterEnum;
import org.dbsyncer.parser.enums.ConvertEnum;
import org.dbsyncer.parser.enums.ParserEnum;
import org.dbsyncer.parser.event.FullRefreshEvent;
import org.dbsyncer.parser.logger.LogService;
import org.dbsyncer.parser.logger.LogType;
import org.dbsyncer.parser.model.*;
import org.dbsyncer.parser.strategy.FlushStrategy;
import org.dbsyncer.parser.util.ConvertUtil;
import org.dbsyncer.parser.util.PickerUtil;
import org.dbsyncer.plugin.PluginFactory;
import org.dbsyncer.storage.enums.StorageDataStatusEnum;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
/**
* @author AE86
* @version 1.0.0
* @date 2019/9/29 22:38
*/
@Component
public class ParserFactory implements Parser {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private ConnectorFactory connectorFactory;
@Autowired
private PluginFactory pluginFactory;
@Autowired
private CacheService cacheService;
@Autowired
private LogService logService;
@Autowired
private FlushStrategy flushStrategy;
@Autowired
@Qualifier("taskExecutor")
private Executor taskExecutor;
@Autowired
private ApplicationContext applicationContext;
@Override
public ConnectorMapper connect(ConnectorConfig config) {
return connectorFactory.connect(config);
}
@Override
public boolean refreshConnectorConfig(ConnectorConfig config) {
return connectorFactory.refresh(config);
}
@Override
public boolean isAliveConnectorConfig(ConnectorConfig config) {
boolean alive = false;
try {
alive = connectorFactory.isAlive(config);
} catch (Exception e) {
LogType.ConnectorLog logType = LogType.ConnectorLog.FAILED;
logService.log(logType, "%s%s", logType.getName(), e.getMessage());
}
// 断线重连
if (!alive) {
try {
alive = connectorFactory.refresh(config);
} catch (Exception e) {
logger.error(e.getMessage());
}
if (alive) {
logger.info(LogType.ConnectorLog.RECONNECT_SUCCESS.getMessage());
}
}
return alive;
}
@Override
public List
getTable(ConnectorMapper config) {
return connectorFactory.getTable(config);
}
@Override
public MetaInfo getMetaInfo(String connectorId, String tableName) {
Connector connector = getConnector(connectorId);
ConnectorMapper connectorMapper = connectorFactory.connect(connector.getConfig());
MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableName);
if (!CollectionUtils.isEmpty(connector.getTable())) {
for (Table t : connector.getTable()) {
if (t.getName().equals(tableName)) {
metaInfo.setTableType(t.getType());
break;
}
}
}
return metaInfo;
}
@Override
public Map getCommand(Mapping mapping, TableGroup tableGroup) {
ConnectorConfig connectorConfig = getConnectorConfig(mapping.getSourceConnectorId());
String sType = connectorConfig.getConnectorType();
String tType = getConnectorConfig(mapping.getTargetConnectorId()).getConnectorType();
Table sourceTable = tableGroup.getSourceTable();
Table targetTable = tableGroup.getTargetTable();
Table sTable = new Table(sourceTable.getName(), sourceTable.getType(), new ArrayList<>());
Table tTable = new Table(targetTable.getName(), targetTable.getType(), new ArrayList<>());
List fieldMapping = tableGroup.getFieldMapping();
if (!CollectionUtils.isEmpty(fieldMapping)) {
fieldMapping.forEach(m -> {
if (null != m.getSource()) {
sTable.getColumn().add(m.getSource());
}
if (null != m.getTarget()) {
tTable.getColumn().add(m.getTarget());
}
});
}
final CommandConfig sourceConfig = new CommandConfig(sType, sTable, sourceTable, tableGroup.getFilter(), connectorConfig);
final CommandConfig targetConfig = new CommandConfig(tType, tTable, targetTable);
// 获取连接器同步参数
Map command = connectorFactory.getCommand(sourceConfig, targetConfig);
return command;
}
@Override
public long getCount(String connectorId, Map command) {
ConnectorMapper connectorMapper = connectorFactory.connect(getConnectorConfig(connectorId));
return connectorFactory.getCount(connectorMapper, command);
}
@Override
public Connector parseConnector(String json) {
try {
JSONObject conn = new JSONObject(json);
JSONObject config = (JSONObject) conn.remove("config");
JSONArray table = (JSONArray) conn.remove("table");
Connector connector = JsonUtil.jsonToObj(conn.toString(), Connector.class);
Assert.notNull(connector, "Connector can not be null.");
String connectorType = config.getString("connectorType");
Class> configClass = ConnectorEnum.getConfigClass(connectorType);
ConnectorConfig obj = (ConnectorConfig) JsonUtil.jsonToObj(config.toString(), configClass);
connector.setConfig(obj);
List tableList = new ArrayList<>();
boolean exist = false;
for (int i = 0; i < table.length(); i++) {
if (table.get(i) instanceof String) {
tableList.add(new Table(table.getString(i)));
exist = true;
}
}
if (!exist) {
tableList = JsonUtil.jsonToArray(table.toString(), Table.class);
}
connector.setTable(tableList);
return connector;
} catch (JSONException e) {
logger.error(e.getMessage());
throw new ParserException(e.getMessage());
}
}
@Override
public T parseObject(String json, Class clazz) {
try {
JSONObject obj = new JSONObject(json);
T t = JsonUtil.jsonToObj(obj.toString(), clazz);
String format = String.format("%s can not be null.", clazz.getSimpleName());
Assert.notNull(t, format);
return t;
} catch (JSONException e) {
logger.error(e.getMessage());
throw new ParserException(e.getMessage());
}
}
@Override
public List getConnectorEnumAll() {
return Arrays.asList(ConnectorEnum.values());
}
@Override
public List getOperationEnumAll() {
return Arrays.asList(OperationEnum.values());
}
@Override
public List getQuartzFilterEnumAll() {
return Arrays.asList(QuartzFilterEnum.values());
}
@Override
public List getFilterEnumAll() {
return Arrays.asList(FilterEnum.values());
}
@Override
public List getConvertEnumAll() {
return Arrays.asList(ConvertEnum.values());
}
@Override
public List getStorageDataStatusEnumAll() {
return Arrays.asList(StorageDataStatusEnum.values());
}
@Override
public void execute(Task task, Mapping mapping, TableGroup tableGroup) {
final String metaId = task.getId();
final String sourceConnectorId = mapping.getSourceConnectorId();
final String targetConnectorId = mapping.getTargetConnectorId();
ConnectorConfig sConfig = getConnectorConfig(sourceConnectorId);
Assert.notNull(sConfig, "数据源配置不能为空.");
ConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
Assert.notNull(tConfig, "目标源配置不能为空.");
TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
Map command = group.getCommand();
Assert.notEmpty(command, "执行命令不能为空.");
List fieldMapping = group.getFieldMapping();
String sTableName = group.getSourceTable().getName();
String tTableName = group.getTargetTable().getName();
Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
// 获取同步字段
Picker picker = new Picker(fieldMapping);
// 检查分页参数
Map params = getMeta(metaId).getMap();
params.putIfAbsent(ParserEnum.PAGE_INDEX.getCode(), ParserEnum.PAGE_INDEX.getDefaultValue());
int pageSize = mapping.getReadNum();
int batchSize = mapping.getBatchNum();
ConnectorMapper sConnectionMapper = connectorFactory.connect(sConfig);
ConnectorMapper tConnectionMapper = connectorFactory.connect(tConfig);
for (; ; ) {
if (!task.isRunning()) {
logger.warn("任务被中止:{}", metaId);
break;
}
// 1、获取数据源数据
int pageIndex = Integer.parseInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
Result reader = connectorFactory.reader(sConnectionMapper, new ReaderConfig(command, new ArrayList<>(), pageIndex, pageSize));
List