123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- /**
- * DBSyncer Copyright 2020-2023 All Rights Reserved.
- */
- import oracle.jdbc.OracleConnection;
- import org.dbsyncer.common.util.CollectionUtils;
- import org.dbsyncer.common.util.RandomUtil;
- import org.dbsyncer.common.util.StringUtil;
- import org.dbsyncer.sdk.config.DatabaseConfig;
- import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance;
- import org.dbsyncer.sdk.connector.database.ds.SimpleConnection;
- import org.dbsyncer.sdk.enums.TableTypeEnum;
- import org.dbsyncer.sdk.model.Field;
- import org.dbsyncer.sdk.model.Table;
- import org.junit.Test;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.jdbc.core.BatchPreparedStatementSetter;
- import java.nio.charset.Charset;
- import java.sql.Clob;
- import java.sql.Connection;
- import java.sql.DatabaseMetaData;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.sql.Timestamp;
- import java.time.Instant;
- import java.time.LocalDateTime;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.BrokenBarrierException;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.CyclicBarrier;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- /**
- * @Author AE86
- * @Version 1.0.0
- * @Date 2022-04-11 20:19
- */
- public class ConnectionTest {
- private final Logger logger = LoggerFactory.getLogger(getClass());
- @Test
- public void testByte() {
- final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createOracleConfig());
- String executeSql = "UPDATE \"my_user\" SET \"name\"=?,\"clo\"=? WHERE \"id\"=?";
- int[] execute = connectorInstance.execute(databaseTemplate ->
- databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
- @Override
- public void setValues(PreparedStatement ps, int i) {
- try {
- SimpleConnection connection = databaseTemplate.getSimpleConnection();
- OracleConnection conn = (OracleConnection) connection.getConnection();
- Clob clob = conn.createClob();
- clob.setString(1, new String("中文888".getBytes(Charset.defaultCharset())));
- ps.setString(1, "hello888");
- ps.setClob(2, clob);
- ps.setInt(3, 2);
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- @Override
- public int getBatchSize() {
- return 1;
- }
- })
- );
- logger.info("execute:{}", execute);
- }
- @Test
- public void testConnection() throws InterruptedException {
- final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createSqlServerConfig());
- // 模拟并发
- final int threadSize = 100;
- final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
- final CyclicBarrier barrier = new CyclicBarrier(threadSize);
- final CountDownLatch latch = new CountDownLatch(threadSize);
- for (int i = 0; i < threadSize; i++) {
- final int k = i + 3;
- pool.submit(() -> {
- try {
- barrier.await();
- // 模拟操作
- System.out.println(String.format("%s %s:%s", LocalDateTime.now(), Thread.currentThread().getName(), k));
- Object execute = connectorInstance.execute(tem -> tem.queryForObject("select 1", Integer.class));
- System.out.println(String.format("%s %s:%s execute=>%s", LocalDateTime.now(), Thread.currentThread().getName(), k, execute));
- } catch (InterruptedException e) {
- logger.error(e.getMessage());
- } catch (BrokenBarrierException e) {
- logger.error(e.getMessage());
- } catch (Exception e) {
- logger.error(e.getMessage());
- } finally {
- latch.countDown();
- }
- });
- }
- try {
- latch.await();
- logger.info("try to shutdown");
- pool.shutdown();
- } catch (InterruptedException e) {
- logger.error(e.getMessage());
- }
- TimeUnit.SECONDS.sleep(3);
- logger.info("test end");
- }
- @Test
- public void testBatchInsert() {
- final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
- long begin = Instant.now().toEpochMilli();
- final int threadSize = 10;
- final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
- final String sql = "INSERT INTO `vote_records` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
- // 模拟1000w条数据
- List<Object[]> dataList = new ArrayList<>();
- for (int i = 1; i <= 200001; i++) {
- // 442001, 'dA8LeJLtX9MgQgDe7H1O', 9620, 1, 2, '2022-11-17 16:35:21'
- Object[] args = new Object[6];
- args[0] = i;
- args[1] = randomUserId(20);
- args[2] = RandomUtil.nextInt(1, 9999);
- args[3] = RandomUtil.nextInt(0, 3);
- args[4] = RandomUtil.nextInt(1, 3);
- args[5] = Timestamp.valueOf(LocalDateTime.now());
- dataList.add(args);
- if (i % 10000 == 0) {
- System.out.println(i + "-----------------正在处理");
- batchUpdate(connectorInstance, pool, sql, dataList, 1000);
- dataList.clear();
- }
- }
- if (!CollectionUtils.isEmpty(dataList)) {
- System.out.println("-----------------正在处理剩余数据");
- batchUpdate(connectorInstance, pool, sql, dataList, 1000);
- }
- pool.shutdown();
- logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
- }
- @Test
- public void testBatchUpdate() {
- final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
- long begin = Instant.now().toEpochMilli();
- final int threadSize = 10;
- final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
- final String sql = "UPDATE `test`.`vote_records` SET `user_id` = ?, `create_time` = now() WHERE `id` = ?";
- // 模拟100w条数据
- int k = 10;
- while (k > 0) {
- List<Object[]> dataList = new ArrayList<>();
- for (int i = 1; i <= 100000; i++) {
- // 'dA8LeJLtX9MgQgDe7H1O', '2022-11-17 16:35:21', 1
- Object[] args = new Object[2];
- args[0] = randomUserId(20);
- args[1] = i;
- dataList.add(args);
- if (i % 10000 == 0) {
- System.out.println(i + "-----------------正在处理");
- batchUpdate(connectorInstance, pool, sql, dataList, 1000);
- dataList.clear();
- }
- }
- if (!CollectionUtils.isEmpty(dataList)) {
- System.out.println("-----------------正在处理剩余数据");
- batchUpdate(connectorInstance, pool, sql, dataList, 1000);
- }
- k--;
- }
- pool.shutdown();
- logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
- }
- @Test
- public void testBatchDelete() {
- final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
- long begin = Instant.now().toEpochMilli();
- final int threadSize = 10;
- final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
- final String sql = "delete from `test`.`vote_records` WHERE `id` = ?";
- List<Object[]> dataList = new ArrayList<>();
- for (int i = 1; i <= 3259000; i++) {
- // 'dA8LeJLtX9MgQgDe7H1O', '2022-11-17 16:35:21', 1
- Object[] args = new Object[1];
- args[0] = i;
- dataList.add(args);
- if (i % 10000 == 0) {
- System.out.println(i + "-----------------正在处理");
- batchUpdate(connectorInstance, pool, sql, dataList, 1000);
- dataList.clear();
- }
- }
- if (!CollectionUtils.isEmpty(dataList)) {
- System.out.println("-----------------正在处理剩余数据");
- batchUpdate(connectorInstance, pool, sql, dataList, 1000);
- }
- pool.shutdown();
- logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
- }
- @Test
- public void testBatchIUD() {
- final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
- long begin = Instant.now().toEpochMilli();
- final int threadSize = 10;
- final int num = 1000;
- final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
- final CountDownLatch latch = new CountDownLatch(threadSize);
- final String insert = "INSERT INTO `vote_records_test` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
- final String update = "UPDATE `test`.`vote_records_test` SET `user_id` = ?, `create_time` = now() WHERE `id` = ?";
- final String delete = "DELETE from `test`.`vote_records_test` WHERE `id` = ?";
- // 模拟单表增删改事件,每个事件间隔2条数据
- for (int i = 0; i < threadSize; i++) {
- final int offset = i;
- pool.submit(() -> {
- try {
- logger.info("{}-开始任务", Thread.currentThread().getName());
- // 增删改事件密集型
- mockData(connectorInstance, num, offset, insert, update, delete);
- // 增改事件密集型
- // mockData2(connectorInstance, num, offset, insert, update);
- logger.info("{}-结束任务", Thread.currentThread().getName());
- } catch (Exception e) {
- logger.error(e.getMessage());
- } finally {
- latch.countDown();
- }
- });
- }
- try {
- latch.await();
- } catch (InterruptedException e) {
- logger.error(e.getMessage());
- }
- pool.shutdown();
- // logger.info("总数:{}, 耗时:{}秒", (threadSize * num * 3), (Instant.now().toEpochMilli() - begin) / 1000);
- logger.info("总数:{}, 耗时:{}秒", (threadSize * num), (Instant.now().toEpochMilli() - begin) / 1000);
- }
- private void mockData(DatabaseConnectorInstance connectorInstance, int num, int offset, String insert, String update, String delete) {
- int start = offset * num;
- logger.info("{}-offset:{}, start:{}", Thread.currentThread().getName(), offset, start);
- List<Object[]> insertData = new ArrayList<>();
- List<Object[]> updateData = new ArrayList<>();
- List<Object[]> deleteData = new ArrayList<>();
- for (int i = 0; i < num; i++) {
- // insert
- Object[] insertArgs = new Object[6];
- insertArgs[0] = i + start;
- insertArgs[1] = randomUserId(20);
- insertArgs[2] = RandomUtil.nextInt(1, 9999);
- insertArgs[3] = RandomUtil.nextInt(0, 3);
- insertArgs[4] = RandomUtil.nextInt(1, 3);
- insertArgs[5] = Timestamp.valueOf(LocalDateTime.now());
- insertData.add(insertArgs);
- // update
- Object[] updateArgs = new Object[2];
- updateArgs[0] = randomUserId(20);
- updateArgs[1] = i + start;
- updateData.add(updateArgs);
- // delete
- Object[] deleteArgs = new Object[1];
- deleteArgs[0] = i + start;
- deleteData.add(deleteArgs);
- connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(insert, insertData));
- connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(update, updateData));
- connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(delete, deleteData));
- insertData.clear();
- updateData.clear();
- deleteData.clear();
- logger.info("{}, 数据行[{}, {}], 已处理:{}", Thread.currentThread().getName(), start, start + num, i + start);
- }
- }
- private void mockData2(DatabaseConnectorInstance connectorInstance, int num, int offset, String insert, String update) {
- List<Object[]> insertData = new ArrayList<>();
- List<Object[]> updateData = new ArrayList<>();
- final int batch = 100;
- int start = offset * num;
- logger.info("{}-offset:{}, start:{}", Thread.currentThread().getName(), offset, start);
- for (int i = 1; i <= num; i++) {
- // insert
- Object[] insertArgs = new Object[6];
- insertArgs[0] = i + start;
- insertArgs[1] = randomUserId(20);
- insertArgs[2] = RandomUtil.nextInt(1, 9999);
- insertArgs[3] = RandomUtil.nextInt(0, 3);
- insertArgs[4] = RandomUtil.nextInt(1, 3);
- insertArgs[5] = Timestamp.valueOf(LocalDateTime.now());
- insertData.add(insertArgs);
- // update
- Object[] updateArgs = new Object[2];
- updateArgs[0] = randomUserId(20);
- updateArgs[1] = i + start;
- updateData.add(updateArgs);
- if (i % batch == 0) {
- connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(insert, insertData));
- connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(update, updateData));
- logger.info("{}, 数据行[{}, {}], 已处理:{}", Thread.currentThread().getName(), start, start + num, i + start);
- insertData.clear();
- updateData.clear();
- }
- }
- }
- private final static String STR = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
- private String randomUserId(int i) {
- StringBuilder s = new StringBuilder();
- for (int j = 0; j < i; j++) {
- int r = RandomUtil.nextInt(0, 62);
- s.append(StringUtil.substring(STR, r, r + 1));
- }
- return s.toString();
- }
- private void batchUpdate(DatabaseConnectorInstance connectorInstance, ExecutorService pool, String sql, List<Object[]> dataList, int batchSize) {
- int total = dataList.size();
- int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
- final CountDownLatch latch = new CountDownLatch(taskSize);
- int fromIndex = 0;
- int toIndex = batchSize;
- for (int i = 0; i < taskSize; i++) {
- final List<Object[]> data;
- if (toIndex > total) {
- toIndex = fromIndex + (total % batchSize);
- data = dataList.subList(fromIndex, toIndex);
- } else {
- data = dataList.subList(fromIndex, toIndex);
- fromIndex += batchSize;
- toIndex += batchSize;
- }
- pool.submit(() -> {
- try {
- connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, data));
- } catch (Exception e) {
- logger.error(e.getMessage());
- } finally {
- latch.countDown();
- }
- });
- }
- try {
- latch.await();
- } catch (InterruptedException e) {
- logger.error(e.getMessage());
- }
- }
- @Test
- public void testReadSchema() {
- getTables(createOracleConfig(), "test", "AE86", "MY_ORG");
- getTables(createOracleConfig(), "test", "AE86", null);
- getTables(createMysqlConfig(), "test", "root", "MY_ORG");
- getTables(createMysqlConfig(), "test", "root", null);
- getTables(createSqlServerConfig(), "test", "dbo", "MY_ORG");
- getTables(createSqlServerConfig(), "test", "dbo", null);
- getTables(createPostgresConfig(), "postgres", "public", "MY_ORG");
- getTables(createPostgresConfig(), "postgres", "public", null);
- }
- @Test
- public void testGetColumnsDetails() {
- final String schema = "root";
- final String tableNamePattern = "sw_test";
- final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
- connectorInstance.execute(databaseTemplate -> {
- SimpleConnection connection = databaseTemplate.getSimpleConnection();
- Connection conn = connection.getConnection();
- String databaseCatalog = conn.getCatalog();
- String schemaNamePattern = null == schema ? conn.getSchema() : schema;
- List<Field> fields = new ArrayList<>();
- DatabaseMetaData metaData = conn.getMetaData();
- ResultSet columnMetadata = metaData.getColumns(databaseCatalog, schemaNamePattern, tableNamePattern, null);
- while (columnMetadata.next()) {
- String columnName = columnMetadata.getString(4);
- int columnType = columnMetadata.getInt(5);
- String typeName = columnMetadata.getString(6);
- fields.add(new Field(columnName, typeName, columnType));
- }
- return fields;
- });
- }
- private List<Table> getTables(DatabaseConfig config, final String catalog, final String schema, final String tableNamePattern) {
- final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(config);
- List<Table> tables = new ArrayList<>();
- connectorInstance.execute(databaseTemplate -> {
- SimpleConnection connection = databaseTemplate.getSimpleConnection();
- Connection conn = connection.getConnection();
- String databaseCatalog = null == catalog ? conn.getCatalog() : catalog;
- String schemaNamePattern = null == schema ? conn.getSchema() : schema;
- String[] types = {TableTypeEnum.TABLE.getCode(), TableTypeEnum.VIEW.getCode(), TableTypeEnum.MATERIALIZED_VIEW.getCode()};
- final ResultSet rs = conn.getMetaData().getTables(databaseCatalog, schemaNamePattern, tableNamePattern, types);
- while (rs.next()) {
- final String tableName = rs.getString("TABLE_NAME");
- final String tableType = rs.getString("TABLE_TYPE");
- tables.add(new Table(tableName, tableType));
- }
- return tables;
- });
- logger.info("\r 表总数{}", tables.size());
- tables.forEach(t -> logger.info("{} {}", t.getName(), t.getType()));
- return tables;
- }
- private DatabaseConfig createSqlServerConfig() {
- DatabaseConfig config = new DatabaseConfig();
- config.setUrl("jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test");
- config.setUsername("sa");
- config.setPassword("123");
- config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
- return config;
- }
- private DatabaseConfig createOracleConfig() {
- DatabaseConfig config = new DatabaseConfig();
- config.setUrl("jdbc:oracle:thin:@127.0.0.1:1521:ORCL");
- config.setUsername("ae86");
- config.setPassword("123");
- config.setDriverClassName("oracle.jdbc.OracleDriver");
- return config;
- }
- private DatabaseConfig createMysqlConfig() {
- DatabaseConfig config = new DatabaseConfig();
- config.setUrl("jdbc:mysql://127.0.0.1:3305/test?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false&autoReconnect=true&failOverReadOnly=false");
- config.setUsername("root");
- config.setPassword("123");
- config.setDriverClassName("com.mysql.cj.jdbc.Driver");
- return config;
- }
- private DatabaseConfig createPostgresConfig() {
- DatabaseConfig config = new DatabaseConfig();
- config.setUrl("jdbc:postgresql://127.0.0.1:5432/postgres");
- config.setUsername("postgres");
- config.setPassword("123456");
- config.setDriverClassName("org.postgresql.Driver");
- return config;
- }
- }
|