ConnectionTest.java 20 KB


  1. /**
  2. * DBSyncer Copyright 2020-2023 All Rights Reserved.
  3. */
  4. import oracle.jdbc.OracleConnection;
  5. import org.dbsyncer.common.util.CollectionUtils;
  6. import org.dbsyncer.common.util.RandomUtil;
  7. import org.dbsyncer.common.util.StringUtil;
  8. import org.dbsyncer.sdk.config.DatabaseConfig;
  9. import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance;
  10. import org.dbsyncer.sdk.connector.database.ds.SimpleConnection;
  11. import org.dbsyncer.sdk.enums.TableTypeEnum;
  12. import org.dbsyncer.sdk.model.Field;
  13. import org.dbsyncer.sdk.model.Table;
  14. import org.junit.Test;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import org.springframework.jdbc.core.BatchPreparedStatementSetter;
  18. import java.nio.charset.Charset;
  19. import java.sql.Clob;
  20. import java.sql.Connection;
  21. import java.sql.DatabaseMetaData;
  22. import java.sql.PreparedStatement;
  23. import java.sql.ResultSet;
  24. import java.sql.SQLException;
  25. import java.sql.Timestamp;
  26. import java.time.Instant;
  27. import java.time.LocalDateTime;
  28. import java.util.ArrayList;
  29. import java.util.List;
  30. import java.util.concurrent.BrokenBarrierException;
  31. import java.util.concurrent.CountDownLatch;
  32. import java.util.concurrent.CyclicBarrier;
  33. import java.util.concurrent.ExecutorService;
  34. import java.util.concurrent.Executors;
  35. import java.util.concurrent.TimeUnit;
  36. /**
  37. * @Author AE86
  38. * @Version 1.0.0
  39. * @Date 2022-04-11 20:19
  40. */
  41. public class ConnectionTest {
  42. private final Logger logger = LoggerFactory.getLogger(getClass());
  43. @Test
  44. public void testByte() {
  45. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createOracleConfig());
  46. String executeSql = "UPDATE \"my_user\" SET \"name\"=?,\"clo\"=? WHERE \"id\"=?";
  47. int[] execute = connectorInstance.execute(databaseTemplate ->
  48. databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
  49. @Override
  50. public void setValues(PreparedStatement ps, int i) {
  51. try {
  52. SimpleConnection connection = databaseTemplate.getSimpleConnection();
  53. OracleConnection conn = (OracleConnection) connection.getConnection();
  54. Clob clob = conn.createClob();
  55. clob.setString(1, new String("中文888".getBytes(Charset.defaultCharset())));
  56. ps.setString(1, "hello888");
  57. ps.setClob(2, clob);
  58. ps.setInt(3, 2);
  59. } catch (SQLException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. @Override
  64. public int getBatchSize() {
  65. return 1;
  66. }
  67. })
  68. );
  69. logger.info("execute:{}", execute);
  70. }
  71. @Test
  72. public void testConnection() throws InterruptedException {
  73. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createSqlServerConfig());
  74. // 模拟并发
  75. final int threadSize = 100;
  76. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  77. final CyclicBarrier barrier = new CyclicBarrier(threadSize);
  78. final CountDownLatch latch = new CountDownLatch(threadSize);
  79. for (int i = 0; i < threadSize; i++) {
  80. final int k = i + 3;
  81. pool.submit(() -> {
  82. try {
  83. barrier.await();
  84. // 模拟操作
  85. System.out.println(String.format("%s %s:%s", LocalDateTime.now(), Thread.currentThread().getName(), k));
  86. Object execute = connectorInstance.execute(tem -> tem.queryForObject("select 1", Integer.class));
  87. System.out.println(String.format("%s %s:%s execute=>%s", LocalDateTime.now(), Thread.currentThread().getName(), k, execute));
  88. } catch (InterruptedException e) {
  89. logger.error(e.getMessage());
  90. } catch (BrokenBarrierException e) {
  91. logger.error(e.getMessage());
  92. } catch (Exception e) {
  93. logger.error(e.getMessage());
  94. } finally {
  95. latch.countDown();
  96. }
  97. });
  98. }
  99. try {
  100. latch.await();
  101. logger.info("try to shutdown");
  102. pool.shutdown();
  103. } catch (InterruptedException e) {
  104. logger.error(e.getMessage());
  105. }
  106. TimeUnit.SECONDS.sleep(3);
  107. logger.info("test end");
  108. }
  109. @Test
  110. public void testBatchInsert() {
  111. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  112. long begin = Instant.now().toEpochMilli();
  113. final int threadSize = 10;
  114. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  115. final String sql = "INSERT INTO `vote_records` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
  116. // 模拟1000w条数据
  117. List<Object[]> dataList = new ArrayList<>();
  118. for (int i = 1; i <= 200001; i++) {
  119. // 442001, 'dA8LeJLtX9MgQgDe7H1O', 9620, 1, 2, '2022-11-17 16:35:21'
  120. Object[] args = new Object[6];
  121. args[0] = i;
  122. args[1] = randomUserId(20);
  123. args[2] = RandomUtil.nextInt(1, 9999);
  124. args[3] = RandomUtil.nextInt(0, 3);
  125. args[4] = RandomUtil.nextInt(1, 3);
  126. args[5] = Timestamp.valueOf(LocalDateTime.now());
  127. dataList.add(args);
  128. if (i % 10000 == 0) {
  129. System.out.println(i + "-----------------正在处理");
  130. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  131. dataList.clear();
  132. }
  133. }
  134. if (!CollectionUtils.isEmpty(dataList)) {
  135. System.out.println("-----------------正在处理剩余数据");
  136. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  137. }
  138. pool.shutdown();
  139. logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
  140. }
  141. @Test
  142. public void testBatchUpdate() {
  143. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  144. long begin = Instant.now().toEpochMilli();
  145. final int threadSize = 10;
  146. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  147. final String sql = "UPDATE `test`.`vote_records` SET `user_id` = ?, `create_time` = now() WHERE `id` = ?";
  148. // 模拟100w条数据
  149. int k = 10;
  150. while (k > 0) {
  151. List<Object[]> dataList = new ArrayList<>();
  152. for (int i = 1; i <= 100000; i++) {
  153. // 'dA8LeJLtX9MgQgDe7H1O', '2022-11-17 16:35:21', 1
  154. Object[] args = new Object[2];
  155. args[0] = randomUserId(20);
  156. args[1] = i;
  157. dataList.add(args);
  158. if (i % 10000 == 0) {
  159. System.out.println(i + "-----------------正在处理");
  160. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  161. dataList.clear();
  162. }
  163. }
  164. if (!CollectionUtils.isEmpty(dataList)) {
  165. System.out.println("-----------------正在处理剩余数据");
  166. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  167. }
  168. k--;
  169. }
  170. pool.shutdown();
  171. logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
  172. }
  173. @Test
  174. public void testBatchDelete() {
  175. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  176. long begin = Instant.now().toEpochMilli();
  177. final int threadSize = 10;
  178. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  179. final String sql = "delete from `test`.`vote_records` WHERE `id` = ?";
  180. List<Object[]> dataList = new ArrayList<>();
  181. for (int i = 1; i <= 3259000; i++) {
  182. // 'dA8LeJLtX9MgQgDe7H1O', '2022-11-17 16:35:21', 1
  183. Object[] args = new Object[1];
  184. args[0] = i;
  185. dataList.add(args);
  186. if (i % 10000 == 0) {
  187. System.out.println(i + "-----------------正在处理");
  188. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  189. dataList.clear();
  190. }
  191. }
  192. if (!CollectionUtils.isEmpty(dataList)) {
  193. System.out.println("-----------------正在处理剩余数据");
  194. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  195. }
  196. pool.shutdown();
  197. logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
  198. }
  199. @Test
  200. public void testBatchIUD() {
  201. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  202. long begin = Instant.now().toEpochMilli();
  203. final int threadSize = 10;
  204. final int num = 1000;
  205. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  206. final CountDownLatch latch = new CountDownLatch(threadSize);
  207. final String insert = "INSERT INTO `vote_records_test` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
  208. final String update = "UPDATE `test`.`vote_records_test` SET `user_id` = ?, `create_time` = now() WHERE `id` = ?";
  209. final String delete = "DELETE from `test`.`vote_records_test` WHERE `id` = ?";
  210. // 模拟单表增删改事件,每个事件间隔2条数据
  211. for (int i = 0; i < threadSize; i++) {
  212. final int offset = i;
  213. pool.submit(() -> {
  214. try {
  215. logger.info("{}-开始任务", Thread.currentThread().getName());
  216. // 增删改事件密集型
  217. mockData(connectorInstance, num, offset, insert, update, delete);
  218. // 增改事件密集型
  219. // mockData2(connectorInstance, num, offset, insert, update);
  220. logger.info("{}-结束任务", Thread.currentThread().getName());
  221. } catch (Exception e) {
  222. logger.error(e.getMessage());
  223. } finally {
  224. latch.countDown();
  225. }
  226. });
  227. }
  228. try {
  229. latch.await();
  230. } catch (InterruptedException e) {
  231. logger.error(e.getMessage());
  232. }
  233. pool.shutdown();
  234. // logger.info("总数:{}, 耗时:{}秒", (threadSize * num * 3), (Instant.now().toEpochMilli() - begin) / 1000);
  235. logger.info("总数:{}, 耗时:{}秒", (threadSize * num), (Instant.now().toEpochMilli() - begin) / 1000);
  236. }
  237. private void mockData(DatabaseConnectorInstance connectorInstance, int num, int offset, String insert, String update, String delete) {
  238. int start = offset * num;
  239. logger.info("{}-offset:{}, start:{}", Thread.currentThread().getName(), offset, start);
  240. List<Object[]> insertData = new ArrayList<>();
  241. List<Object[]> updateData = new ArrayList<>();
  242. List<Object[]> deleteData = new ArrayList<>();
  243. for (int i = 0; i < num; i++) {
  244. // insert
  245. Object[] insertArgs = new Object[6];
  246. insertArgs[0] = i + start;
  247. insertArgs[1] = randomUserId(20);
  248. insertArgs[2] = RandomUtil.nextInt(1, 9999);
  249. insertArgs[3] = RandomUtil.nextInt(0, 3);
  250. insertArgs[4] = RandomUtil.nextInt(1, 3);
  251. insertArgs[5] = Timestamp.valueOf(LocalDateTime.now());
  252. insertData.add(insertArgs);
  253. // update
  254. Object[] updateArgs = new Object[2];
  255. updateArgs[0] = randomUserId(20);
  256. updateArgs[1] = i + start;
  257. updateData.add(updateArgs);
  258. // delete
  259. Object[] deleteArgs = new Object[1];
  260. deleteArgs[0] = i + start;
  261. deleteData.add(deleteArgs);
  262. connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(insert, insertData));
  263. connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(update, updateData));
  264. connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(delete, deleteData));
  265. insertData.clear();
  266. updateData.clear();
  267. deleteData.clear();
  268. logger.info("{}, 数据行[{}, {}], 已处理:{}", Thread.currentThread().getName(), start, start + num, i + start);
  269. }
  270. }
  271. private void mockData2(DatabaseConnectorInstance connectorInstance, int num, int offset, String insert, String update) {
  272. List<Object[]> insertData = new ArrayList<>();
  273. List<Object[]> updateData = new ArrayList<>();
  274. final int batch = 100;
  275. int start = offset * num;
  276. logger.info("{}-offset:{}, start:{}", Thread.currentThread().getName(), offset, start);
  277. for (int i = 1; i <= num; i++) {
  278. // insert
  279. Object[] insertArgs = new Object[6];
  280. insertArgs[0] = i + start;
  281. insertArgs[1] = randomUserId(20);
  282. insertArgs[2] = RandomUtil.nextInt(1, 9999);
  283. insertArgs[3] = RandomUtil.nextInt(0, 3);
  284. insertArgs[4] = RandomUtil.nextInt(1, 3);
  285. insertArgs[5] = Timestamp.valueOf(LocalDateTime.now());
  286. insertData.add(insertArgs);
  287. // update
  288. Object[] updateArgs = new Object[2];
  289. updateArgs[0] = randomUserId(20);
  290. updateArgs[1] = i + start;
  291. updateData.add(updateArgs);
  292. if (i % batch == 0) {
  293. connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(insert, insertData));
  294. connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(update, updateData));
  295. logger.info("{}, 数据行[{}, {}], 已处理:{}", Thread.currentThread().getName(), start, start + num, i + start);
  296. insertData.clear();
  297. updateData.clear();
  298. }
  299. }
  300. }
  301. private final static String STR = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
  302. private String randomUserId(int i) {
  303. StringBuilder s = new StringBuilder();
  304. for (int j = 0; j < i; j++) {
  305. int r = RandomUtil.nextInt(0, 62);
  306. s.append(StringUtil.substring(STR, r, r + 1));
  307. }
  308. return s.toString();
  309. }
  310. private void batchUpdate(DatabaseConnectorInstance connectorInstance, ExecutorService pool, String sql, List<Object[]> dataList, int batchSize) {
  311. int total = dataList.size();
  312. int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
  313. final CountDownLatch latch = new CountDownLatch(taskSize);
  314. int fromIndex = 0;
  315. int toIndex = batchSize;
  316. for (int i = 0; i < taskSize; i++) {
  317. final List<Object[]> data;
  318. if (toIndex > total) {
  319. toIndex = fromIndex + (total % batchSize);
  320. data = dataList.subList(fromIndex, toIndex);
  321. } else {
  322. data = dataList.subList(fromIndex, toIndex);
  323. fromIndex += batchSize;
  324. toIndex += batchSize;
  325. }
  326. pool.submit(() -> {
  327. try {
  328. connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, data));
  329. } catch (Exception e) {
  330. logger.error(e.getMessage());
  331. } finally {
  332. latch.countDown();
  333. }
  334. });
  335. }
  336. try {
  337. latch.await();
  338. } catch (InterruptedException e) {
  339. logger.error(e.getMessage());
  340. }
  341. }
  342. @Test
  343. public void testReadSchema() {
  344. getTables(createOracleConfig(), "test", "AE86", "MY_ORG");
  345. getTables(createOracleConfig(), "test", "AE86", null);
  346. getTables(createMysqlConfig(), "test", "root", "MY_ORG");
  347. getTables(createMysqlConfig(), "test", "root", null);
  348. getTables(createSqlServerConfig(), "test", "dbo", "MY_ORG");
  349. getTables(createSqlServerConfig(), "test", "dbo", null);
  350. getTables(createPostgresConfig(), "postgres", "public", "MY_ORG");
  351. getTables(createPostgresConfig(), "postgres", "public", null);
  352. }
  353. @Test
  354. public void testGetColumnsDetails() {
  355. final String schema = "root";
  356. final String tableNamePattern = "sw_test";
  357. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  358. connectorInstance.execute(databaseTemplate -> {
  359. SimpleConnection connection = databaseTemplate.getSimpleConnection();
  360. Connection conn = connection.getConnection();
  361. String databaseCatalog = conn.getCatalog();
  362. String schemaNamePattern = null == schema ? conn.getSchema() : schema;
  363. List<Field> fields = new ArrayList<>();
  364. DatabaseMetaData metaData = conn.getMetaData();
  365. ResultSet columnMetadata = metaData.getColumns(databaseCatalog, schemaNamePattern, tableNamePattern, null);
  366. while (columnMetadata.next()) {
  367. String columnName = columnMetadata.getString(4);
  368. int columnType = columnMetadata.getInt(5);
  369. String typeName = columnMetadata.getString(6);
  370. fields.add(new Field(columnName, typeName, columnType));
  371. }
  372. return fields;
  373. });
  374. }
  375. private List<Table> getTables(DatabaseConfig config, final String catalog, final String schema, final String tableNamePattern) {
  376. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(config);
  377. List<Table> tables = new ArrayList<>();
  378. connectorInstance.execute(databaseTemplate -> {
  379. SimpleConnection connection = databaseTemplate.getSimpleConnection();
  380. Connection conn = connection.getConnection();
  381. String databaseCatalog = null == catalog ? conn.getCatalog() : catalog;
  382. String schemaNamePattern = null == schema ? conn.getSchema() : schema;
  383. String[] types = {TableTypeEnum.TABLE.getCode(), TableTypeEnum.VIEW.getCode(), TableTypeEnum.MATERIALIZED_VIEW.getCode()};
  384. final ResultSet rs = conn.getMetaData().getTables(databaseCatalog, schemaNamePattern, tableNamePattern, types);
  385. while (rs.next()) {
  386. final String tableName = rs.getString("TABLE_NAME");
  387. final String tableType = rs.getString("TABLE_TYPE");
  388. tables.add(new Table(tableName, tableType));
  389. }
  390. return tables;
  391. });
  392. logger.info("\r 表总数{}", tables.size());
  393. tables.forEach(t -> logger.info("{} {}", t.getName(), t.getType()));
  394. return tables;
  395. }
  396. private DatabaseConfig createSqlServerConfig() {
  397. DatabaseConfig config = new DatabaseConfig();
  398. config.setUrl("jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test");
  399. config.setUsername("sa");
  400. config.setPassword("123");
  401. config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
  402. return config;
  403. }
  404. private DatabaseConfig createOracleConfig() {
  405. DatabaseConfig config = new DatabaseConfig();
  406. config.setUrl("jdbc:oracle:thin:@127.0.0.1:1521:ORCL");
  407. config.setUsername("ae86");
  408. config.setPassword("123");
  409. config.setDriverClassName("oracle.jdbc.OracleDriver");
  410. return config;
  411. }
  412. private DatabaseConfig createMysqlConfig() {
  413. DatabaseConfig config = new DatabaseConfig();
  414. config.setUrl("jdbc:mysql://127.0.0.1:3305/test?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false&autoReconnect=true&failOverReadOnly=false");
  415. config.setUsername("root");
  416. config.setPassword("123");
  417. config.setDriverClassName("com.mysql.cj.jdbc.Driver");
  418. return config;
  419. }
  420. private DatabaseConfig createPostgresConfig() {
  421. DatabaseConfig config = new DatabaseConfig();
  422. config.setUrl("jdbc:postgresql://127.0.0.1:5432/postgres");
  423. config.setUsername("postgres");
  424. config.setPassword("123456");
  425. config.setDriverClassName("org.postgresql.Driver");
  426. return config;
  427. }
  428. }