1
0
AE86 1 жил өмнө
parent
commit
0561d1e436

+ 6 - 0
dbsyncer-connector/dbsyncer-connector-base/pom.xml

@@ -76,6 +76,12 @@
             <version>${project.parent.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-log4j2</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>

+ 70 - 2
dbsyncer-connector/dbsyncer-connector-base/src/test/java/ConnectionTest.java

@@ -117,7 +117,7 @@ public class ConnectionTest {
         long begin = Instant.now().toEpochMilli();
         final int threadSize = 10;
         final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
-        final String sql = "INSERT INTO `vote_records_copy` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
+        final String sql = "INSERT INTO `vote_records` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
 
         // 模拟1000w条数据
         List<Object[]> dataList = new ArrayList<>();
@@ -139,7 +139,7 @@ public class ConnectionTest {
             }
         }
 
-        if(!CollectionUtils.isEmpty(dataList)){
+        if (!CollectionUtils.isEmpty(dataList)) {
             System.out.println("-----------------正在处理剩余数据");
             batchUpdate(connectorInstance, pool, sql, dataList, 1000);
         }
@@ -218,6 +218,74 @@ public class ConnectionTest {
         logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
     }
 
+    @Test
+    public void testBatchIUD() {
+        final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
+
+        long begin = Instant.now().toEpochMilli();
+        final int threadSize = 10;
+        final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
+        final CountDownLatch latch = new CountDownLatch(threadSize);
+        final int num = 100;
+        final String insert = "INSERT INTO `vote_records_test` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
+        final String update = "UPDATE `test`.`vote_records` SET `user_id` = ?, `create_time` = now() WHERE `id` = ?";
+        final String delete = "delete from `test`.`vote_records` WHERE `id` = ?";
+
+        System.out.println("-----------------模拟总数:" + (threadSize * threadSize * num));
+        for (int i = 0; i < threadSize; i++) {
+            final int offset = i == 0 ? 0 : i * threadSize;
+            pool.submit(() -> {
+                try {
+                    mockData(connectorInstance, num, threadSize, offset, insert, update, delete);
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        }
+        pool.shutdown();
+        logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
+    }
+
+    private void mockData(DatabaseConnectorInstance connectorInstance, int num, int batch, int offset, String insert, String update, String delete) {
+        List<Object[]> insertData = new ArrayList<>();
+        List<Object[]> updateData = new ArrayList<>();
+        List<Object[]> deleteData = new ArrayList<>();
+        for (int j = 1; j <= batch; j++) {
+            // insert
+            Object[] insertArgs = new Object[6];
+            insertArgs[0] = j + offset;
+            insertArgs[1] = randomUserId(20);
+            insertArgs[2] = RandomUtil.nextInt(1, 9999);
+            insertArgs[3] = RandomUtil.nextInt(0, 3);
+            insertArgs[4] = RandomUtil.nextInt(1, 3);
+            insertArgs[5] = Timestamp.valueOf(LocalDateTime.now());
+            insertData.add(insertArgs);
+
+            // update
+            Object[] updateArgs = new Object[2];
+            updateArgs[0] = randomUserId(20);
+            updateArgs[1] = j + offset;
+            updateData.add(updateArgs);
+
+            // delete
+            Object[] deleteArgs = new Object[1];
+            deleteArgs[0] = j + offset;
+            deleteData.add(deleteArgs);
+        }
+        connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(insert, insertData));
+        connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(update, updateData));
+        connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(delete, deleteData));
+        System.out.println(Thread.currentThread().getName() + "-----------------正在已处理:" + offset + " - " + (offset + batch));
+    }
+
     private final static String STR = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
 
     private String randomUserId(int i) {