ConnectionTest.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. /**
  2. * DBSyncer Copyright 2020-2023 All Rights Reserved.
  3. */
  4. import oracle.jdbc.OracleConnection;
  5. import org.dbsyncer.common.util.CollectionUtils;
  6. import org.dbsyncer.common.util.RandomUtil;
  7. import org.dbsyncer.common.util.StringUtil;
  8. import org.dbsyncer.sdk.config.DatabaseConfig;
  9. import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance;
  10. import org.dbsyncer.sdk.connector.database.ds.SimpleConnection;
  11. import org.dbsyncer.sdk.enums.TableTypeEnum;
  12. import org.dbsyncer.sdk.model.Field;
  13. import org.dbsyncer.sdk.model.Table;
  14. import org.junit.Test;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import org.springframework.jdbc.core.BatchPreparedStatementSetter;
  18. import java.nio.charset.Charset;
  19. import java.sql.*;
  20. import java.time.Instant;
  21. import java.time.LocalDateTime;
  22. import java.util.ArrayList;
  23. import java.util.List;
  24. import java.util.concurrent.*;
  25. /**
  26. * @Author AE86
  27. * @Version 1.0.0
  28. * @Date 2022-04-11 20:19
  29. */
  30. public class ConnectionTest {
  31. private final Logger logger = LoggerFactory.getLogger(getClass());
  32. @Test
  33. public void testByte() {
  34. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createOracleConfig());
  35. String executeSql = "UPDATE \"my_user\" SET \"name\"=?,\"clo\"=? WHERE \"id\"=?";
  36. int[] execute = connectorInstance.execute(databaseTemplate ->
  37. databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
  38. @Override
  39. public void setValues(PreparedStatement ps, int i) {
  40. try {
  41. SimpleConnection connection = databaseTemplate.getSimpleConnection();
  42. OracleConnection conn = (OracleConnection) connection.getConnection();
  43. Clob clob = conn.createClob();
  44. clob.setString(1, new String("中文888".getBytes(Charset.defaultCharset())));
  45. ps.setString(1, "hello888");
  46. ps.setClob(2, clob);
  47. ps.setInt(3, 2);
  48. } catch (SQLException e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. @Override
  53. public int getBatchSize() {
  54. return 1;
  55. }
  56. })
  57. );
  58. logger.info("execute:{}", execute);
  59. }
  60. @Test
  61. public void testConnection() throws InterruptedException {
  62. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createSqlServerConfig());
  63. // 模拟并发
  64. final int threadSize = 100;
  65. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  66. final CyclicBarrier barrier = new CyclicBarrier(threadSize);
  67. final CountDownLatch latch = new CountDownLatch(threadSize);
  68. for (int i = 0; i < threadSize; i++) {
  69. final int k = i + 3;
  70. pool.submit(() -> {
  71. try {
  72. barrier.await();
  73. // 模拟操作
  74. System.out.println(String.format("%s %s:%s", LocalDateTime.now(), Thread.currentThread().getName(), k));
  75. Object execute = connectorInstance.execute(tem -> tem.queryForObject("select 1", Integer.class));
  76. System.out.println(String.format("%s %s:%s execute=>%s", LocalDateTime.now(), Thread.currentThread().getName(), k, execute));
  77. } catch (InterruptedException e) {
  78. logger.error(e.getMessage());
  79. } catch (BrokenBarrierException e) {
  80. logger.error(e.getMessage());
  81. } catch (Exception e) {
  82. logger.error(e.getMessage());
  83. } finally {
  84. latch.countDown();
  85. }
  86. });
  87. }
  88. try {
  89. latch.await();
  90. logger.info("try to shutdown");
  91. pool.shutdown();
  92. } catch (InterruptedException e) {
  93. logger.error(e.getMessage());
  94. }
  95. TimeUnit.SECONDS.sleep(3);
  96. logger.info("test end");
  97. }
  98. @Test
  99. public void testBatchInsert() {
  100. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  101. long begin = Instant.now().toEpochMilli();
  102. final int threadSize = 10;
  103. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  104. final String sql = "INSERT INTO `vote_records_copy` (`id`, `user_id`, `vote_num`, `group_id`, `status`, `create_time`) VALUES (?, ?, ?, ?, ?, ?)";
  105. // 模拟1000w条数据
  106. List<Object[]> dataList = new ArrayList<>();
  107. for (int i = 1; i <= 200001; i++) {
  108. // 442001, 'dA8LeJLtX9MgQgDe7H1O', 9620, 1, 2, '2022-11-17 16:35:21'
  109. Object[] args = new Object[6];
  110. args[0] = i;
  111. args[1] = randomUserId(20);
  112. args[2] = RandomUtil.nextInt(1, 9999);
  113. args[3] = RandomUtil.nextInt(0, 3);
  114. args[4] = RandomUtil.nextInt(1, 3);
  115. args[5] = Timestamp.valueOf(LocalDateTime.now());
  116. dataList.add(args);
  117. if (i % 10000 == 0) {
  118. System.out.println(i + "-----------------正在处理");
  119. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  120. dataList.clear();
  121. }
  122. }
  123. if(!CollectionUtils.isEmpty(dataList)){
  124. System.out.println("-----------------正在处理剩余数据");
  125. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  126. }
  127. pool.shutdown();
  128. logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
  129. }
  130. @Test
  131. public void testBatchUpdate() {
  132. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  133. long begin = Instant.now().toEpochMilli();
  134. final int threadSize = 10;
  135. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  136. final String sql = "UPDATE `test`.`vote_records` SET `user_id` = ?, `create_time` = now() WHERE `id` = ?";
  137. // 模拟100w条数据
  138. int k = 10;
  139. while (k > 0) {
  140. List<Object[]> dataList = new ArrayList<>();
  141. for (int i = 1; i <= 100000; i++) {
  142. // 'dA8LeJLtX9MgQgDe7H1O', '2022-11-17 16:35:21', 1
  143. Object[] args = new Object[2];
  144. args[0] = randomUserId(20);
  145. args[1] = i;
  146. dataList.add(args);
  147. if (i % 10000 == 0) {
  148. System.out.println(i + "-----------------正在处理");
  149. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  150. dataList.clear();
  151. }
  152. }
  153. if (!CollectionUtils.isEmpty(dataList)) {
  154. System.out.println("-----------------正在处理剩余数据");
  155. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  156. }
  157. k--;
  158. }
  159. pool.shutdown();
  160. logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
  161. }
  162. @Test
  163. public void testBatchDelete() {
  164. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  165. long begin = Instant.now().toEpochMilli();
  166. final int threadSize = 10;
  167. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  168. final String sql = "delete from `test`.`vote_records` WHERE `id` = ?";
  169. List<Object[]> dataList = new ArrayList<>();
  170. for (int i = 1; i <= 3259000; i++) {
  171. // 'dA8LeJLtX9MgQgDe7H1O', '2022-11-17 16:35:21', 1
  172. Object[] args = new Object[1];
  173. args[0] = i;
  174. dataList.add(args);
  175. if (i % 10000 == 0) {
  176. System.out.println(i + "-----------------正在处理");
  177. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  178. dataList.clear();
  179. }
  180. }
  181. if (!CollectionUtils.isEmpty(dataList)) {
  182. System.out.println("-----------------正在处理剩余数据");
  183. batchUpdate(connectorInstance, pool, sql, dataList, 1000);
  184. }
  185. pool.shutdown();
  186. logger.info("总共耗时:{}秒", (Instant.now().toEpochMilli() - begin) / 1000);
  187. }
  188. private final static String STR = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
  189. private String randomUserId(int i) {
  190. StringBuilder s = new StringBuilder();
  191. for (int j = 0; j < i; j++) {
  192. int r = RandomUtil.nextInt(0, 62);
  193. s.append(StringUtil.substring(STR, r, r + 1));
  194. }
  195. return s.toString();
  196. }
  197. private void batchUpdate(DatabaseConnectorInstance connectorInstance, ExecutorService pool, String sql, List<Object[]> dataList, int batchSize) {
  198. int total = dataList.size();
  199. int taskSize = total % batchSize == 0 ? total / batchSize : total / batchSize + 1;
  200. final CountDownLatch latch = new CountDownLatch(taskSize);
  201. int fromIndex = 0;
  202. int toIndex = batchSize;
  203. for (int i = 0; i < taskSize; i++) {
  204. final List<Object[]> data;
  205. if (toIndex > total) {
  206. toIndex = fromIndex + (total % batchSize);
  207. data = dataList.subList(fromIndex, toIndex);
  208. } else {
  209. data = dataList.subList(fromIndex, toIndex);
  210. fromIndex += batchSize;
  211. toIndex += batchSize;
  212. }
  213. pool.submit(() -> {
  214. try {
  215. connectorInstance.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, data));
  216. } catch (Exception e) {
  217. logger.error(e.getMessage());
  218. } finally {
  219. latch.countDown();
  220. }
  221. });
  222. }
  223. try {
  224. latch.await();
  225. } catch (InterruptedException e) {
  226. logger.error(e.getMessage());
  227. }
  228. }
  229. @Test
  230. public void testReadSchema() {
  231. getTables(createOracleConfig(), "test", "AE86", "MY_ORG");
  232. getTables(createOracleConfig(), "test", "AE86", null);
  233. getTables(createMysqlConfig(), "test", "root", "MY_ORG");
  234. getTables(createMysqlConfig(), "test", "root", null);
  235. getTables(createSqlServerConfig(), "test", "dbo", "MY_ORG");
  236. getTables(createSqlServerConfig(), "test", "dbo", null);
  237. getTables(createPostgresConfig(), "postgres", "public", "MY_ORG");
  238. getTables(createPostgresConfig(), "postgres", "public", null);
  239. }
  240. @Test
  241. public void testGetColumnsDetails() {
  242. final String schema = "root";
  243. final String tableNamePattern = "sw_test";
  244. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(createMysqlConfig());
  245. connectorInstance.execute(databaseTemplate -> {
  246. SimpleConnection connection = databaseTemplate.getSimpleConnection();
  247. Connection conn = connection.getConnection();
  248. String databaseCatalog = conn.getCatalog();
  249. String schemaNamePattern = null == schema ? conn.getSchema() : schema;
  250. List<Field> fields = new ArrayList<>();
  251. DatabaseMetaData metaData = conn.getMetaData();
  252. ResultSet columnMetadata = metaData.getColumns(databaseCatalog, schemaNamePattern, tableNamePattern, null);
  253. while (columnMetadata.next()) {
  254. String columnName = columnMetadata.getString(4);
  255. int columnType = columnMetadata.getInt(5);
  256. String typeName = columnMetadata.getString(6);
  257. fields.add(new Field(columnName, typeName, columnType));
  258. }
  259. return fields;
  260. });
  261. }
  262. private List<Table> getTables(DatabaseConfig config, final String catalog, final String schema, final String tableNamePattern) {
  263. final DatabaseConnectorInstance connectorInstance = new DatabaseConnectorInstance(config);
  264. List<Table> tables = new ArrayList<>();
  265. connectorInstance.execute(databaseTemplate -> {
  266. SimpleConnection connection = databaseTemplate.getSimpleConnection();
  267. Connection conn = connection.getConnection();
  268. String databaseCatalog = null == catalog ? conn.getCatalog() : catalog;
  269. String schemaNamePattern = null == schema ? conn.getSchema() : schema;
  270. String[] types = {TableTypeEnum.TABLE.getCode(), TableTypeEnum.VIEW.getCode(), TableTypeEnum.MATERIALIZED_VIEW.getCode()};
  271. final ResultSet rs = conn.getMetaData().getTables(databaseCatalog, schemaNamePattern, tableNamePattern, types);
  272. while (rs.next()) {
  273. final String tableName = rs.getString("TABLE_NAME");
  274. final String tableType = rs.getString("TABLE_TYPE");
  275. tables.add(new Table(tableName, tableType));
  276. }
  277. return tables;
  278. });
  279. logger.info("\r 表总数{}", tables.size());
  280. tables.forEach(t -> logger.info("{} {}", t.getName(), t.getType()));
  281. return tables;
  282. }
  283. private DatabaseConfig createSqlServerConfig() {
  284. DatabaseConfig config = new DatabaseConfig();
  285. config.setUrl("jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test");
  286. config.setUsername("sa");
  287. config.setPassword("123");
  288. config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
  289. return config;
  290. }
  291. private DatabaseConfig createOracleConfig() {
  292. DatabaseConfig config = new DatabaseConfig();
  293. config.setUrl("jdbc:oracle:thin:@127.0.0.1:1521:ORCL");
  294. config.setUsername("ae86");
  295. config.setPassword("123");
  296. config.setDriverClassName("oracle.jdbc.OracleDriver");
  297. return config;
  298. }
  299. private DatabaseConfig createMysqlConfig() {
  300. DatabaseConfig config = new DatabaseConfig();
  301. config.setUrl("jdbc:mysql://127.0.0.1:3305/test?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&useSSL=false&verifyServerCertificate=false&autoReconnect=true&failOverReadOnly=false");
  302. config.setUsername("root");
  303. config.setPassword("123");
  304. config.setDriverClassName("com.mysql.cj.jdbc.Driver");
  305. return config;
  306. }
  307. private DatabaseConfig createPostgresConfig() {
  308. DatabaseConfig config = new DatabaseConfig();
  309. config.setUrl("jdbc:postgresql://127.0.0.1:5432/postgres");
  310. config.setUsername("postgres");
  311. config.setPassword("123456");
  312. config.setDriverClassName("org.postgresql.Driver");
  313. return config;
  314. }
  315. }