|
@@ -146,6 +146,44 @@ public class ConnectionTest {
|
|
|
logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testBatchUpdate() {
|
|
|
+ final DatabaseConnectorMapper connectorMapper = new DatabaseConnectorMapper(createMysqlConfig());
|
|
|
+
|
|
|
+ long begin = Instant.now().toEpochMilli();
|
|
|
+ final int threadSize = 10;
|
|
|
+ final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
|
|
|
+ final String sql = "UPDATE `test`.`my_big_table` SET `name` = ?, `last_time` = now() WHERE `id` = ?";
|
|
|
+
|
|
|
+ // 模拟100w条数据
|
|
|
+ int k = 10;
|
|
|
+ while (k > 0) {
|
|
|
+ List<Object[]> dataList = new ArrayList<>();
|
|
|
+ for (int i = 1; i <= 100000; i++) {
|
|
|
+ // 'dA8LeJLtX9MgQgDe7H1O', '2022-11-17 16:35:21', 1
|
|
|
+ Object[] args = new Object[2];
|
|
|
+ args[0] = randomUserId(20);
|
|
|
+ args[1] = i;
|
|
|
+ dataList.add(args);
|
|
|
+
|
|
|
+ if (i % 10000 == 0) {
|
|
|
+ System.out.println(i + "-----------------正在处理");
|
|
|
+ batchUpdateBlocking(connectorMapper, pool, sql, dataList, 1000);
|
|
|
+ dataList.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!CollectionUtils.isEmpty(dataList)) {
|
|
|
+ System.out.println("-----------------正在处理剩余数据");
|
|
|
+ batchUpdateBlocking(connectorMapper, pool, sql, dataList, 1000);
|
|
|
+ }
|
|
|
+ k--;
|
|
|
+ }
|
|
|
+
|
|
|
+ pool.shutdown();
|
|
|
+ logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
|
|
|
+ }
|
|
|
+
|
|
|
private final static String STR = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
|
|
|
|
|
|
private String randomUserId(int i) {
|
|
@@ -191,6 +229,42 @@ public class ConnectionTest {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void batchUpdateBlocking(DatabaseConnectorMapper connectorMapper, ExecutorService pool, String sql, List<Object[]> dataList, int batchSize) {
|
|
|
+ int total = dataList.size();
|
|
|
+ int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
|
|
|
+ final CyclicBarrier barrier = new CyclicBarrier(taskSize);
|
|
|
+ final CountDownLatch latch = new CountDownLatch(taskSize);
|
|
|
+ int fromIndex = 0;
|
|
|
+ int toIndex = batchSize;
|
|
|
+ for (int i = 0; i < taskSize; i++) {
|
|
|
+ final List<Object[]> data;
|
|
|
+ if (toIndex > total) {
|
|
|
+ toIndex = fromIndex + (total % batchSize);
|
|
|
+ data = dataList.subList(fromIndex, toIndex);
|
|
|
+ } else {
|
|
|
+ data = dataList.subList(fromIndex, toIndex);
|
|
|
+ fromIndex += batchSize;
|
|
|
+ toIndex += batchSize;
|
|
|
+ }
|
|
|
+
|
|
|
+ pool.submit(() -> {
|
|
|
+ try {
|
|
|
+ barrier.await();
|
|
|
+ connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, data));
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error(e.getMessage());
|
|
|
+ } finally {
|
|
|
+ latch.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ latch.await();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ logger.error(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testReadSchema() {
|
|
|
getTables(createOracleConfig(), "test", "AE86", "MY_ORG");
|