AE86 2 yıl önce
ebeveyn
işleme
e72e7344db

+ 72 - 0
dbsyncer-connector/src/main/test/ConnectionTest.java

@@ -1,4 +1,5 @@
 import oracle.jdbc.OracleConnection;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.database.ds.SimpleConnection;
@@ -12,6 +13,7 @@ import org.springframework.jdbc.core.BatchPreparedStatementSetter;
 
 import java.nio.charset.Charset;
 import java.sql.*;
+import java.time.Instant;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.List;
@@ -103,6 +105,76 @@ public class ConnectionTest {
         logger.info("test end");
     }
 
+    @Test
+    public void testBatchInsert() {
+        final DatabaseConnectorMapper connectorMapper = new DatabaseConnectorMapper(createPostgresConfig());
+
+        long begin = Instant.now().toEpochMilli();
+        final int threadSize = 10;
+        final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
+
+        // 模拟1000w条数据
+        List<Object[]> dataList = new ArrayList<>();
+        for (int i = 1; i <= 10000001; i++) {
+            Object[] args = new Object[5];
+            args[0] = i;
+            args[1] = "mathaaaaaaaaaaaaaaaaaa";
+            args[2] = 9999;
+            args[3] = 8888;
+            args[4] = "888899999999999999999999999999蓉儿UN是代付款房间里的解放东路来得及分类的恢复力度菲欧";
+            dataList.add(args);
+
+            if (i % 10000 == 0) {
+                System.out.println(i + "-----------------正在处理");
+                batchInsert(connectorMapper, pool, dataList, 1000);
+                dataList.clear();
+            }
+        }
+
+        if(!CollectionUtils.isEmpty(dataList)){
+            System.out.println("-----------------正在处理剩余数据");
+            batchInsert(connectorMapper, pool, dataList, 1000);
+        }
+
+        pool.shutdown();
+        logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
+    }
+
+    private void batchInsert(DatabaseConnectorMapper connectorMapper, ExecutorService pool, List<Object[]> dataList, int batchSize) {
+        int total = dataList.size();
+        int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
+        final String sql = "insert into t_course2 (id,course,score,user_id,memo) VALUES (?,?,?,?,?)";
+        final CountDownLatch latch = new CountDownLatch(taskSize);
+        int fromIndex = 0;
+        int toIndex = batchSize;
+        for (int i = 0; i < taskSize; i++) {
+            final List<Object[]> data;
+            if (toIndex > total) {
+                toIndex = fromIndex + (total % batchSize);
+                data = dataList.subList(fromIndex, toIndex);
+            } else {
+                data = dataList.subList(fromIndex, toIndex);
+                fromIndex += batchSize;
+                toIndex += batchSize;
+            }
+
+            pool.submit(() -> {
+                try {
+                    connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, data));
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        }
+    }
+
     @Test
     public void testReadSchema() {
         getTables(createOracleConfig(), "test", "AE86", "MY_ORG");