AE86 1 år sedan
förälder
incheckning
935a30d4de

+ 72 - 32
dbsyncer-connector/dbsyncer-connector-base/src/test/java/ConnectionTest.java

@@ -235,8 +235,7 @@ public class ConnectionTest {
 
         long begin = Instant.now().toEpochMilli();
         final int threadSize = 10;
-        final int num = 100;
-        final int batch = 2;
+        final int num = 1000;
         final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
         final CountDownLatch latch = new CountDownLatch(threadSize);
         final String insert = "INSERT INTO `vote_records_test` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
@@ -245,11 +244,14 @@ public class ConnectionTest {
 
         // 模拟单表增删改事件,每个事件间隔2条数据
         for (int i = 0; i < threadSize; i++) {
-            final int offset = i == 0 ? 0 : i * threadSize;
+            final int offset = i;
             pool.submit(() -> {
                 try {
                     logger.info("{}-开始任务", Thread.currentThread().getName());
-                    mockData(connectorInstance, num, batch, offset, insert, update, delete);
+                    // 增删改事件密集型
+                    mockData(connectorInstance, num, offset, insert, update, delete);
+                    // 增改事件密集型
+//                    mockData2(connectorInstance, num, offset, insert, update);
                     logger.info("{}-结束任务", Thread.currentThread().getName());
                 } catch (Exception e) {
                     logger.error(e.getMessage());
@@ -265,40 +267,78 @@ public class ConnectionTest {
             logger.error(e.getMessage());
         }
         pool.shutdown();
-        logger.info("总数:{}, 耗时:{}秒", (threadSize * batch * num * 3), (Instant.now().toEpochMilli() - begin) / 1000);
+//        logger.info("总数:{}, 耗时:{}秒", (threadSize * num * 3), (Instant.now().toEpochMilli() - begin) / 1000);
+        logger.info("总数:{}, 耗时:{}秒", (threadSize * num), (Instant.now().toEpochMilli() - begin) / 1000);
     }
 
-    private void mockData(DatabaseConnectorInstance connectorInstance, int num, int batch, int offset, String insert, String update, String delete) {
+    private void mockData(DatabaseConnectorInstance connectorInstance, int num, int offset, String insert, String update, String delete) {
+        int start = offset * num;
+        logger.info("{}-offset:{}, start:{}", Thread.currentThread().getName(), offset, start);
+        List<Object[]> insertData = new ArrayList<>();
+        List<Object[]> updateData = new ArrayList<>();
+        List<Object[]> deleteData = new ArrayList<>();
         for (int i = 0; i < num; i++) {
-            List<Object[]> insertData = new ArrayList<>();
-            List<Object[]> updateData = new ArrayList<>();
-            List<Object[]> deleteData = new ArrayList<>();
-            for (int j = 1; j <= batch; j++) {
-                // insert
-                Object[] insertArgs = new Object[6];
-                insertArgs[0] = j + offset;
-                insertArgs[1] = randomUserId(20);
-                insertArgs[2] = RandomUtil.nextInt(1, 9999);
-                insertArgs[3] = RandomUtil.nextInt(0, 3);
-                insertArgs[4] = RandomUtil.nextInt(1, 3);
-                insertArgs[5] = Timestamp.valueOf(LocalDateTime.now());
-                insertData.add(insertArgs);
-
-                // update
-                Object[] updateArgs = new Object[2];
-                updateArgs[0] = randomUserId(20);
-                updateArgs[1] = j + offset;
-                updateData.add(updateArgs);
-
-                // delete
-                Object[] deleteArgs = new Object[1];
-                deleteArgs[0] = j + offset;
-                deleteData.add(deleteArgs);
-            }
+            // insert
+            Object[] insertArgs = new Object[6];
+            insertArgs[0] = i + start;
+            insertArgs[1] = randomUserId(20);
+            insertArgs[2] = RandomUtil.nextInt(1, 9999);
+            insertArgs[3] = RandomUtil.nextInt(0, 3);
+            insertArgs[4] = RandomUtil.nextInt(1, 3);
+            insertArgs[5] = Timestamp.valueOf(LocalDateTime.now());
+            insertData.add(insertArgs);
+
+            // update
+            Object[] updateArgs = new Object[2];
+            updateArgs[0] = randomUserId(20);
+            updateArgs[1] = i + start;
+            updateData.add(updateArgs);
+
+            // delete
+            Object[] deleteArgs = new Object[1];
+            deleteArgs[0] = i + start;
+            deleteData.add(deleteArgs);
+
             connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(insert, insertData));
             connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(update, updateData));
             connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(delete, deleteData));
-            logger.info("{}-已处理, 数据行[{}, {}], 批次:{}", Thread.currentThread().getName(), offset, offset + batch, i + 1);
+            insertData.clear();
+            updateData.clear();
+            deleteData.clear();
+            logger.info("{}, 数据行[{}, {}], 已处理:{}", Thread.currentThread().getName(), start, start + num, i + start);
+        }
+    }
+
+    private void mockData2(DatabaseConnectorInstance connectorInstance, int num, int offset, String insert, String update) {
+        List<Object[]> insertData = new ArrayList<>();
+        List<Object[]> updateData = new ArrayList<>();
+        final int batch = 100;
+        int start = offset * num;
+        logger.info("{}-offset:{}, start:{}", Thread.currentThread().getName(), offset, start);
+        for (int i = 1; i <= num; i++) {
+            // insert
+            Object[] insertArgs = new Object[6];
+            insertArgs[0] = i + start;
+            insertArgs[1] = randomUserId(20);
+            insertArgs[2] = RandomUtil.nextInt(1, 9999);
+            insertArgs[3] = RandomUtil.nextInt(0, 3);
+            insertArgs[4] = RandomUtil.nextInt(1, 3);
+            insertArgs[5] = Timestamp.valueOf(LocalDateTime.now());
+            insertData.add(insertArgs);
+
+            // update
+            Object[] updateArgs = new Object[2];
+            updateArgs[0] = randomUserId(20);
+            updateArgs[1] = i + start;
+            updateData.add(updateArgs);
+
+            if (i % batch == 0) {
+                connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(insert, insertData));
+                connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(update, updateData));
+                logger.info("{}, 数据行[{}, {}], 已处理:{}", Thread.currentThread().getName(), start, start + num, i + start);
+                insertData.clear();
+                updateData.clear();
+            }
         }
     }
 

+ 0 - 7
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/DatabaseTemplate.java

@@ -510,9 +510,6 @@ public class DatabaseTemplate implements JdbcOperations {
     @Override
     public int[] batchUpdate(final String... sql) throws DataAccessException {
         Assert.notEmpty(sql, "SQL array must not be empty");
-        if (logger.isDebugEnabled()) {
-            logger.debug("Executing SQL batch update of " + sql.length + " statements");
-        }
 
         /**
          * Callback to execute the batch update.
@@ -584,10 +581,6 @@ public class DatabaseTemplate implements JdbcOperations {
 
         Assert.notNull(psc, "PreparedStatementCreator must not be null");
         Assert.notNull(action, "Callback object must not be null");
-        if (logger.isDebugEnabled()) {
-            String sql = getSql(psc);
-            logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));
-        }
 
         PreparedStatement ps = null;
         try {