ConnectionTest.java 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. /**
  2. * DBSyncer Copyright 2020-2023 All Rights Reserved.
  3. */
  4. import oracle.jdbc.OracleConnection;
  5. import org.dbsyncer.common.util.CollectionUtils;
  6. import org.dbsyncer.common.util.RandomUtil;
  7. import org.dbsyncer.common.util.StringUtil;
  8. import org.dbsyncer.sdk.config.DatabaseConfig;
  9. import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance;
  10. import org.dbsyncer.sdk.connector.database.ds.SimpleConnection;
  11. import org.dbsyncer.sdk.enums.TableTypeEnum;
  12. import org.dbsyncer.sdk.model.Field;
  13. import org.dbsyncer.sdk.model.Table;
  14. import org.junit.Test;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import org.springframework.jdbc.core.BatchPreparedStatementSetter;
  18. import java.nio.charset.Charset;
  19. import java.sql.Clob;
  20. import java.sql.Connection;
  21. import java.sql.DatabaseMetaData;
  22. import java.sql.PreparedStatement;
  23. import java.sql.ResultSet;
  24. import java.sql.SQLException;
  25. import java.sql.Timestamp;
  26. import java.time.Instant;
  27. import java.time.LocalDateTime;
  28. import java.util.ArrayList;
  29. import java.util.List;
  30. import java.util.Map;
  31. import java.util.concurrent.BrokenBarrierException;
  32. import java.util.concurrent.CountDownLatch;
  33. import java.util.concurrent.CyclicBarrier;
  34. import java.util.concurrent.ExecutorService;
  35. import java.util.concurrent.Executors;
  36. import java.util.concurrent.TimeUnit;
  37. import java.util.stream.Collectors;
  38. /**
  39. * @Author AE86
  40. * @Version 1.0.0
  41. * @Date 2022-04-11 20:19
  42. */
  43. public class ConnectionTest {
  44. private final Logger logger = LoggerFactory.getLogger(getClass());
  45. @Test
  46. public void testByte() {
  47. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createOracleConfig());
  48. String executeSql = "UPDATE \"my_user\" SET \"name\"=?,\"clo\"=? WHERE \"id\"=?";
  49. int[] execute = connectorInstance.execute(databaseTemplate ->
  50. databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
  51. @Override
  52. public void setValues(PreparedStatement ps, int i) {
  53. try {
  54. SimpleConnection connection = databaseTemplate.getSimpleConnection();
  55. OracleConnection conn = (OracleConnection) connection.getConnection();
  56. Clob clob = conn.createClob();
  57. clob.setString(1, new String("中文888".getBytes(Charset.defaultCharset())));
  58. ps.setString(1, "hello888");
  59. ps.setClob(2, clob);
  60. ps.setInt(3, 2);
  61. } catch (SQLException e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. @Override
  66. public int getBatchSize() {
  67. return 1;
  68. }
  69. })
  70. );
  71. logger.info("execute:{}", execute);
  72. }
  73. @Test
  74. public void testConnection() throws InterruptedException {
  75. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createSqlServerConfig());
  76. // 模拟并发
  77. final int threadSize = 100;
  78. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  79. final CyclicBarrier barrier = new CyclicBarrier(threadSize);
  80. final CountDownLatch latch = new CountDownLatch(threadSize);
  81. for (int i = 0; i < threadSize; i++) {
  82. final int k = i + 3;
  83. pool.submit(() -> {
  84. try {
  85. barrier.await();
  86. // 模拟操作
  87. System.out.println(String.format("%s %s:%s", LocalDateTime.now(), Thread.currentThread().getName(), k));
  88. Object execute = connectorInstance.execute(tem -> tem.queryForObject("select 1", Integer.class));
  89. System.out.println(String.format("%s %s:%s execute=>%s", LocalDateTime.now(), Thread.currentThread().getName(), k, execute));
  90. } catch (InterruptedException e) {
  91. logger.error(e.getMessage());
  92. } catch (BrokenBarrierException e) {
  93. logger.error(e.getMessage());
  94. } catch (Exception e) {
  95. logger.error(e.getMessage());
  96. } finally {
  97. latch.countDown();
  98. }
  99. });
  100. }
  101. try {
  102. latch.await();
  103. logger.info("try to shutdown");
  104. pool.shutdown();
  105. } catch (InterruptedException e) {
  106. logger.error(e.getMessage());
  107. }
  108. TimeUnit.SECONDS.sleep(3);
  109. logger.info("test end");
  110. }
  111. @Test
  112. public void testQuery() {
  113. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  114. // 3、执行SQL
  115. String querySql = "SELECT * from test_schema where id = ?";
  116. Object[] args = new Object[1];
  117. args[0] = 9999999;
  118. List<Map<String, Object>> list = connectorInstance.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, args));
  119. logger.info("test list={}", list);
  120. }
  121. @Test
  122. public void testBatchInsert() {
  123. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  124. long begin = Instant.now().toEpochMilli();
  125. final int threadSize = 10;
  126. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  127. final String sql = "INSERT INTO `vote_records` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
  128. // 模拟1000w条数据
  129. List<Object[]> dataList = new ArrayList<>();
  130. for (int i = 1; i <= 200001; i++) {
  131. // 442001, 'dA8LeJLtX9MgQgDe7H1O', 9620, 1, 2, '2022-11-17 16:35:21'
  132. Object[] args = new Object[6];
  133. args[0] = i;
  134. args[1] = randomUserId(20);
  135. args[2] = RandomUtil.nextInt(1, 9999);
  136. args[3] = RandomUtil.nextInt(0, 3);
  137. args[4] = RandomUtil.nextInt(1, 3);
  138. args[5] = Timestamp.valueOf(LocalDateTime.now());
  139. dataList.add(args);
  140. if (i % 10000 == 0) {
  141. System.out.println(i + "-----------------正在处理");
  142. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  143. dataList.clear();
  144. }
  145. }
  146. if (!CollectionUtils.isEmpty(dataList)) {
  147. System.out.println("-----------------正在处理剩余数据");
  148. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  149. }
  150. pool.shutdown();
  151. logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
  152. }
  153. @Test
  154. public void testBatchUpdate() {
  155. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  156. long begin = Instant.now().toEpochMilli();
  157. final int threadSize = 10;
  158. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  159. final String sql = "UPDATE `test`.`vote_records` SET `user_id` = ?, `create_time` = now() WHERE `id` = ?";
  160. // 模拟100w条数据
  161. int k = 10;
  162. while (k > 0) {
  163. List<Object[]> dataList = new ArrayList<>();
  164. for (int i = 1; i <= 100000; i++) {
  165. // 'dA8LeJLtX9MgQgDe7H1O', '2022-11-17 16:35:21', 1
  166. Object[] args = new Object[2];
  167. args[0] = randomUserId(20);
  168. args[1] = i;
  169. dataList.add(args);
  170. if (i % 10000 == 0) {
  171. System.out.println(i + "-----------------正在处理");
  172. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  173. dataList.clear();
  174. }
  175. }
  176. if (!CollectionUtils.isEmpty(dataList)) {
  177. System.out.println("-----------------正在处理剩余数据");
  178. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  179. }
  180. k--;
  181. }
  182. pool.shutdown();
  183. logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
  184. }
  185. @Test
  186. public void testBatchDelete() {
  187. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  188. long begin = Instant.now().toEpochMilli();
  189. final int threadSize = 10;
  190. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  191. final String sql = "delete from `test`.`vote_records` WHERE `id` = ?";
  192. List<Object[]> dataList = new ArrayList<>();
  193. for (int i = 1; i <= 3259000; i++) {
  194. // 'dA8LeJLtX9MgQgDe7H1O', '2022-11-17 16:35:21', 1
  195. Object[] args = new Object[1];
  196. args[0] = i;
  197. dataList.add(args);
  198. if (i % 10000 == 0) {
  199. System.out.println(i + "-----------------正在处理");
  200. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  201. dataList.clear();
  202. }
  203. }
  204. if (!CollectionUtils.isEmpty(dataList)) {
  205. System.out.println("-----------------正在处理剩余数据");
  206. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  207. }
  208. pool.shutdown();
  209. logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
  210. }
  211. @Test
  212. public void testBatchIUD() {
  213. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  214. long begin = Instant.now().toEpochMilli();
  215. final int threadSize = 1000;
  216. final int num = 300;
  217. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  218. final CountDownLatch latch = new CountDownLatch(threadSize);
  219. final String insert = "INSERT INTO `vote_records_test` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
  220. final String update = "UPDATE `test`.`vote_records_test` SET `user_id` = ?, `create_time` = now() WHERE `id` = ?";
  221. final String delete = "DELETE from `test`.`vote_records_test` WHERE `id` = ?";
  222. // 模拟单表增删改事件
  223. for (int i = 0; i < threadSize; i++) {
  224. final int offset = i;
  225. pool.submit(() -> {
  226. try {
  227. logger.info("{}-开始任务", Thread.currentThread().getName());
  228. // 增删改事件密集型
  229. mockData(connectorInstance, num, offset, insert, update, delete);
  230. logger.info("{}-结束任务", Thread.currentThread().getName());
  231. } catch (Exception e) {
  232. logger.error(e.getMessage());
  233. } finally {
  234. latch.countDown();
  235. }
  236. });
  237. }
  238. try {
  239. latch.await();
  240. } catch (InterruptedException e) {
  241. logger.error(e.getMessage());
  242. }
  243. pool.shutdown();
  244. logger.info("总数:{}, 耗时:{}秒", (threadSize * num), (Instant.now().toEpochMilli() - begin) / 1000);
  245. }
  246. private void mockData(DatabaseConnectorInstance connectorInstance, int num, int offset, String insert, String update, String delete) {
  247. int start = offset * num;
  248. logger.info("{}-offset:{}, start:{}", Thread.currentThread().getName(), offset, start);
  249. List<Object[]> insertData = new ArrayList<>();
  250. List<Object[]> updateData = new ArrayList<>();
  251. List<Object[]> deleteData = new ArrayList<>();
  252. for (int i = 0; i < num; i++) {
  253. // insert
  254. Object[] insertArgs = new Object[6];
  255. insertArgs[0] = i + start;
  256. insertArgs[1] = randomUserId(20);
  257. insertArgs[2] = RandomUtil.nextInt(1, 9999);
  258. insertArgs[3] = RandomUtil.nextInt(0, 3);
  259. insertArgs[4] = RandomUtil.nextInt(1, 3);
  260. insertArgs[5] = Timestamp.valueOf(LocalDateTime.now());
  261. insertData.add(insertArgs);
  262. // update
  263. Object[] updateArgs = new Object[2];
  264. updateArgs[0] = randomUserId(20);
  265. updateArgs[1] = i + start;
  266. updateData.add(updateArgs);
  267. // delete
  268. Object[] deleteArgs = new Object[1];
  269. deleteArgs[0] = i + start;
  270. deleteData.add(deleteArgs);
  271. connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(insert, insertData));
  272. connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(update, updateData));
  273. connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(delete, deleteData));
  274. insertData.clear();
  275. updateData.clear();
  276. deleteData.clear();
  277. logger.info("{}, 数据行[{}, {}], 已处理:{}", Thread.currentThread().getName(), start, start + num, i + start);
  278. }
  279. }
  280. private final static String STR = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
  281. private String randomUserId(int i) {
  282. StringBuilder s = new StringBuilder();
  283. for (int j = 0; j < i; j++) {
  284. int r = RandomUtil.nextInt(0, 62);
  285. s.append(StringUtil.substring(STR, r, r + 1));
  286. }
  287. return s.toString();
  288. }
  289. private void batchUpdate(DatabaseConnectorInstance connectorInstance, ExecutorService pool, String sql, List<Object[]> dataList, int batchSize) {
  290. int total = dataList.size();
  291. int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
  292. final CountDownLatch latch = new CountDownLatch(taskSize);
  293. int offset = 0;
  294. for (int i = 0; i < taskSize; i++) {
  295. List<Object[]> slice = dataList.stream().skip(offset).limit(batchSize).collect(Collectors.toList());
  296. offset += batchSize;
  297. pool.submit(() -> {
  298. try {
  299. connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, slice));
  300. } catch (Exception e) {
  301. logger.error(e.getMessage());
  302. } finally {
  303. latch.countDown();
  304. }
  305. });
  306. }
  307. try {
  308. latch.await();
  309. } catch (InterruptedException e) {
  310. logger.error(e.getMessage());
  311. }
  312. }
  313. @Test
  314. public void testReadSchema() {
  315. getTables(createOracleConfig(), "test", "AE86", "MY_ORG");
  316. getTables(createOracleConfig(), "test", "AE86", null);
  317. getTables(createMysqlConfig(), "test", "root", "MY_ORG");
  318. getTables(createMysqlConfig(), "test", "root", null);
  319. getTables(createSqlServerConfig(), "test", "dbo", "MY_ORG");
  320. getTables(createSqlServerConfig(), "test", "dbo", null);
  321. getTables(createPostgresConfig(), "postgres", "public", "MY_ORG");
  322. getTables(createPostgresConfig(), "postgres", "public", null);
  323. }
  324. @Test
  325. public void testGetColumnsDetails() {
  326. final String schema = "root";
  327. final String tableNamePattern = "sw_test";
  328. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  329. connectorInstance.execute(databaseTemplate -> {
  330. SimpleConnection connection = databaseTemplate.getSimpleConnection();
  331. Connection conn = connection.getConnection();
  332. String databaseCatalog = conn.getCatalog();
  333. String schemaNamePattern = null == schema ? conn.getSchema() : schema;
  334. List<Field> fields = new ArrayList<>();
  335. DatabaseMetaData metaData = conn.getMetaData();
  336. ResultSet columnMetadata = metaData.getColumns(databaseCatalog, schemaNamePattern, tableNamePattern, null);
  337. while (columnMetadata.next()) {
  338. String columnName = columnMetadata.getString(4);
  339. int columnType = columnMetadata.getInt(5);
  340. String typeName = columnMetadata.getString(6);
  341. fields.add(new Field(columnName, typeName, columnType));
  342. }
  343. return fields;
  344. });
  345. }
  346. private List<Table> getTables(DatabaseConfig config, final String catalog, final String schema, final String tableNamePattern) {
  347. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(config);
  348. List<Table> tables = new ArrayList<>();
  349. connectorInstance.execute(databaseTemplate -> {
  350. SimpleConnection connection = databaseTemplate.getSimpleConnection();
  351. Connection conn = connection.getConnection();
  352. String databaseCatalog = null == catalog ? conn.getCatalog() : catalog;
  353. String schemaNamePattern = null == schema ? conn.getSchema() : schema;
  354. String[] types = {TableTypeEnum.TABLE.getCode(), TableTypeEnum.VIEW.getCode(), TableTypeEnum.MATERIALIZED_VIEW.getCode()};
  355. final ResultSet rs = conn.getMetaData().getTables(databaseCatalog, schemaNamePattern, tableNamePattern, types);
  356. while (rs.next()) {
  357. final String tableName = rs.getString("TABLE_NAME");
  358. final String tableType = rs.getString("TABLE_TYPE");
  359. tables.add(new Table(tableName, tableType));
  360. }
  361. return tables;
  362. });
  363. logger.info("\r 表总数{}", tables.size());
  364. tables.forEach(t -> logger.info("{} {}", t.getName(), t.getType()));
  365. return tables;
  366. }
  367. private DatabaseConfig createSqlServerConfig() {
  368. DatabaseConfig config = new DatabaseConfig();
  369. config.setUrl("jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test");
  370. config.setUsername("sa");
  371. config.setPassword("123");
  372. config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
  373. return config;
  374. }
  375. private DatabaseConfig createOracleConfig() {
  376. DatabaseConfig config = new DatabaseConfig();
  377. config.setUrl("jdbc:oracle:thin:@127.0.0.1:1521:ORCL");
  378. config.setUsername("ae86");
  379. config.setPassword("123");
  380. config.setDriverClassName("oracle.jdbc.OracleDriver");
  381. return config;
  382. }
  383. private DatabaseConfig createMysqlConfig() {
  384. DatabaseConfig config = new DatabaseConfig();
  385. config.setUrl("jdbc:mysql://127.0.0.1:3305/test?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false&autoReconnect=true&failOverReadOnly=false&tinyInt1isBit=false");
  386. config.setUsername("root");
  387. config.setPassword("123");
  388. config.setDriverClassName("com.mysql.cj.jdbc.Driver");
  389. return config;
  390. }
  391. private DatabaseConfig createPostgresConfig() {
  392. DatabaseConfig config = new DatabaseConfig();
  393. config.setUrl("jdbc:postgresql://127.0.0.1:5432/postgres");
  394. config.setUsername("postgres");
  395. config.setPassword("123456");
  396. config.setDriverClassName("org.postgresql.Driver");
  397. return config;
  398. }
  399. }