|
@@ -251,8 +251,6 @@ public class ParserFactory implements Parser {
|
|
|
int batchSize = mapping.getBatchNum();
|
|
|
ConnectorMapper sConnectorMapper = connectorFactory.connect(sConfig);
|
|
|
ConnectorMapper tConnectorMapper = connectorFactory.connect(tConfig);
|
|
|
- String cursor = task.getCursor();
|
|
|
- int pageIndex = task.getPageIndex();
|
|
|
|
|
|
for (; ; ) {
|
|
|
if (!task.isRunning()) {
|
|
@@ -261,7 +259,7 @@ public class ParserFactory implements Parser {
|
|
|
}
|
|
|
|
|
|
// 1、获取数据源数据
|
|
|
- Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), cursor, pageIndex, pageSize));
|
|
|
+ Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), task.getCursor(), task.getPageIndex(), pageSize));
|
|
|
List<Map> data = reader.getSuccessData();
|
|
|
if (CollectionUtils.isEmpty(data)) {
|
|
|
logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
|
|
@@ -281,17 +279,16 @@ public class ParserFactory implements Parser {
|
|
|
BatchWriter batchWriter = new BatchWriter(tConnectorMapper, command, sTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
|
|
|
Result writer = writeBatch(batchWriter, executorService);
|
|
|
|
|
|
- // 6、更新结果
|
|
|
- cursor = getLastCursor(data, pk);
|
|
|
- task.setPageIndex(pageIndex + 1);
|
|
|
- task.setCursor(cursor);
|
|
|
- flush(task, writer);
|
|
|
-
|
|
|
- // 7、判断尾页
|
|
|
+ // 6、判断尾页
|
|
|
if (data.size() < pageSize) {
|
|
|
logger.info("完成全量:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
|
|
|
break;
|
|
|
}
|
|
|
+
|
|
|
+ // 7、更新结果
|
|
|
+ task.setPageIndex(task.getPageIndex() + 1);
|
|
|
+ task.setCursor(getLastCursor(data, pk));
|
|
|
+ flush(task, writer);
|
|
|
}
|
|
|
}
|
|
|
|