|
@@ -43,6 +43,7 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.Executor;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* @author AE86
|
|
@@ -214,35 +215,19 @@ public class ParserComponentImpl implements ParserComponent {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
- List<Map> dataList = context.getTargetList();
|
|
|
int batchSize = context.getBatchSize();
|
|
|
// 总数
|
|
|
- int total = dataList.size();
|
|
|
- // 单次任务
|
|
|
- if (total <= batchSize) {
|
|
|
- return connectorFactory.writer(context);
|
|
|
- }
|
|
|
-
|
|
|
+ int total = context.getTargetList().size();
|
|
|
// 批量任务, 拆分
|
|
|
int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
|
|
|
-
|
|
|
- final CountDownLatch latch = new CountDownLatch(taskSize);
|
|
|
- int fromIndex = 0;
|
|
|
- int toIndex = batchSize;
|
|
|
+ CountDownLatch latch = new CountDownLatch(taskSize);
|
|
|
+ int offset = 0;
|
|
|
for (int i = 0; i < taskSize; i++) {
|
|
|
- final List<Map> data;
|
|
|
- if (toIndex > total) {
|
|
|
- toIndex = fromIndex + (total % batchSize);
|
|
|
- data = dataList.subList(fromIndex, toIndex);
|
|
|
- } else {
|
|
|
- data = dataList.subList(fromIndex, toIndex);
|
|
|
- fromIndex += batchSize;
|
|
|
- toIndex += batchSize;
|
|
|
- }
|
|
|
-
|
|
|
try {
|
|
|
PluginContext tmpContext = (PluginContext) context.clone();
|
|
|
- tmpContext.setTargetList(data);
|
|
|
+ List<Map> slice = context.getTargetList().stream().skip(offset).limit(batchSize).collect(Collectors.toList());
|
|
|
+ offset += batchSize;
|
|
|
+ tmpContext.setTargetList(slice);
|
|
|
executor.execute(() -> {
|
|
|
try {
|
|
|
Result w = connectorFactory.writer(tmpContext);
|