|
@@ -4,13 +4,13 @@ import com.alibaba.fastjson.JSONException;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import org.dbsyncer.cache.CacheService;
|
|
|
import org.dbsyncer.common.event.RowChangedEvent;
|
|
|
+import org.dbsyncer.common.model.AbstractConnectorConfig;
|
|
|
import org.dbsyncer.common.model.Result;
|
|
|
+import org.dbsyncer.common.spi.ConnectorMapper;
|
|
|
import org.dbsyncer.common.util.CollectionUtils;
|
|
|
import org.dbsyncer.common.util.JsonUtil;
|
|
|
import org.dbsyncer.connector.ConnectorFactory;
|
|
|
-import org.dbsyncer.connector.ConnectorMapper;
|
|
|
import org.dbsyncer.connector.config.CommandConfig;
|
|
|
-import org.dbsyncer.connector.config.ConnectorConfig;
|
|
|
import org.dbsyncer.connector.config.ReaderConfig;
|
|
|
import org.dbsyncer.connector.config.WriterBatchConfig;
|
|
|
import org.dbsyncer.connector.constant.ConnectorConstant;
|
|
@@ -31,6 +31,7 @@ import org.dbsyncer.parser.strategy.ParserStrategy;
|
|
|
import org.dbsyncer.parser.util.ConvertUtil;
|
|
|
import org.dbsyncer.parser.util.PickerUtil;
|
|
|
import org.dbsyncer.plugin.PluginFactory;
|
|
|
+import org.dbsyncer.plugin.config.Plugin;
|
|
|
import org.dbsyncer.storage.enums.StorageDataStatusEnum;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -86,17 +87,17 @@ public class ParserFactory implements Parser {
|
|
|
private ParserStrategy parserStrategy;
|
|
|
|
|
|
@Override
|
|
|
- public ConnectorMapper connect(ConnectorConfig config) {
|
|
|
+ public ConnectorMapper connect(AbstractConnectorConfig config) {
|
|
|
return connectorFactory.connect(config);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public boolean refreshConnectorConfig(ConnectorConfig config) {
|
|
|
+ public boolean refreshConnectorConfig(AbstractConnectorConfig config) {
|
|
|
return connectorFactory.refresh(config);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public boolean isAliveConnectorConfig(ConnectorConfig config) {
|
|
|
+ public boolean isAliveConnectorConfig(AbstractConnectorConfig config) {
|
|
|
boolean alive = false;
|
|
|
try {
|
|
|
alive = connectorFactory.isAlive(config);
|
|
@@ -141,8 +142,8 @@ public class ParserFactory implements Parser {
|
|
|
|
|
|
@Override
|
|
|
public Map<String, String> getCommand(Mapping mapping, TableGroup tableGroup) {
|
|
|
- ConnectorConfig sConnConfig = getConnectorConfig(mapping.getSourceConnectorId());
|
|
|
- ConnectorConfig tConnConfig = getConnectorConfig(mapping.getTargetConnectorId());
|
|
|
+ AbstractConnectorConfig sConnConfig = getConnectorConfig(mapping.getSourceConnectorId());
|
|
|
+ AbstractConnectorConfig tConnConfig = getConnectorConfig(mapping.getTargetConnectorId());
|
|
|
Table sourceTable = tableGroup.getSourceTable();
|
|
|
Table targetTable = tableGroup.getTargetTable();
|
|
|
Table sTable = new Table(sourceTable.getName(), sourceTable.getType(), new ArrayList<>());
|
|
@@ -180,7 +181,7 @@ public class ParserFactory implements Parser {
|
|
|
Assert.notNull(connector, "Connector can not be null.");
|
|
|
String connectorType = config.getString("connectorType");
|
|
|
Class<?> configClass = ConnectorEnum.getConfigClass(connectorType);
|
|
|
- ConnectorConfig obj = (ConnectorConfig) JsonUtil.jsonToObj(config.toString(), configClass);
|
|
|
+ AbstractConnectorConfig obj = (AbstractConnectorConfig) JsonUtil.jsonToObj(config.toString(), configClass);
|
|
|
connector.setConfig(obj);
|
|
|
|
|
|
return connector;
|
|
@@ -232,9 +233,9 @@ public class ParserFactory implements Parser {
|
|
|
final String sourceConnectorId = mapping.getSourceConnectorId();
|
|
|
final String targetConnectorId = mapping.getTargetConnectorId();
|
|
|
|
|
|
- ConnectorConfig sConfig = getConnectorConfig(sourceConnectorId);
|
|
|
+ AbstractConnectorConfig sConfig = getConnectorConfig(sourceConnectorId);
|
|
|
Assert.notNull(sConfig, "数据源配置不能为空.");
|
|
|
- ConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
|
|
|
+ AbstractConnectorConfig tConfig = getConnectorConfig(targetConnectorId);
|
|
|
Assert.notNull(tConfig, "目标源配置不能为空.");
|
|
|
TableGroup group = PickerUtil.mergeTableGroupConfig(mapping, tableGroup);
|
|
|
Map<String, String> command = group.getCommand();
|
|
@@ -273,18 +274,22 @@ public class ParserFactory implements Parser {
|
|
|
ConvertUtil.convert(group.getConvert(), target);
|
|
|
|
|
|
// 4、插件转换
|
|
|
- pluginFactory.convert(group.getPlugin(), data, target);
|
|
|
+ Plugin plugin = group.getPlugin();
|
|
|
+ pluginFactory.convert(tConnectorMapper, plugin, tTableName, data, target);
|
|
|
|
|
|
// 5、写入目标源
|
|
|
BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, tTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
|
|
|
Result writer = writeBatch(batchWriter, executorService);
|
|
|
|
|
|
- // 6、更新结果
|
|
|
+ // 6、执行批量处理后的
|
|
|
+ pluginFactory.postProcessAfter(tConnectorMapper, plugin, tTableName, ConnectorConstant.OPERTION_INSERT, data, target);
|
|
|
+
|
|
|
+ // 7、更新结果
|
|
|
task.setPageIndex(task.getPageIndex() + 1);
|
|
|
task.setCursor(getLastCursor(data, pk));
|
|
|
flush(task, writer);
|
|
|
|
|
|
- // 7、判断尾页
|
|
|
+ // 8、判断尾页
|
|
|
if (data.size() < pageSize) {
|
|
|
logger.info("完成全量:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
|
|
|
break;
|
|
@@ -402,7 +407,7 @@ public class ParserFactory implements Parser {
|
|
|
* @param connectorId
|
|
|
* @return
|
|
|
*/
|
|
|
- private ConnectorConfig getConnectorConfig(String connectorId) {
|
|
|
+ private AbstractConnectorConfig getConnectorConfig(String connectorId) {
|
|
|
return getConnector(connectorId).getConfig();
|
|
|
}
|
|
|
|