LuceneFactoryTest.java 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. /**
  2. * DBSyncer Copyright 2020-2023 All Rights Reserved.
  3. */
  4. import org.apache.lucene.analysis.Analyzer;
  5. import org.apache.lucene.analysis.TokenStream;
  6. import org.apache.lucene.analysis.cn.smart.SmartChineseAnalyzer;
  7. import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
  8. import org.apache.lucene.document.Document;
  9. import org.apache.lucene.document.Field;
  10. import org.apache.lucene.document.IntPoint;
  11. import org.apache.lucene.document.LongPoint;
  12. import org.apache.lucene.document.NumericDocValuesField;
  13. import org.apache.lucene.document.StoredField;
  14. import org.apache.lucene.document.StringField;
  15. import org.apache.lucene.document.TextField;
  16. import org.apache.lucene.index.IndexReader;
  17. import org.apache.lucene.index.Term;
  18. import org.apache.lucene.queryparser.classic.ParseException;
  19. import org.apache.lucene.queryparser.classic.QueryParser;
  20. import org.apache.lucene.search.BooleanClause;
  21. import org.apache.lucene.search.BooleanQuery;
  22. import org.apache.lucene.search.FuzzyQuery;
  23. import org.apache.lucene.search.IndexSearcher;
  24. import org.apache.lucene.search.MatchAllDocsQuery;
  25. import org.apache.lucene.search.PhraseQuery;
  26. import org.apache.lucene.search.PrefixQuery;
  27. import org.apache.lucene.search.Query;
  28. import org.apache.lucene.search.ScoreDoc;
  29. import org.apache.lucene.search.Sort;
  30. import org.apache.lucene.search.SortField;
  31. import org.apache.lucene.search.TermQuery;
  32. import org.apache.lucene.search.TopDocs;
  33. import org.apache.lucene.search.WildcardQuery;
  34. import org.apache.lucene.search.highlight.Highlighter;
  35. import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
  36. import org.apache.lucene.search.highlight.QueryScorer;
  37. import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
  38. import org.apache.lucene.util.BytesRef;
  39. import org.dbsyncer.common.model.Paging;
  40. import org.dbsyncer.common.util.CollectionUtils;
  41. import org.dbsyncer.common.util.RandomUtil;
  42. import org.dbsyncer.sdk.constant.ConfigConstant;
  43. import org.dbsyncer.sdk.constant.ConnectorConstant;
  44. import org.dbsyncer.storage.enums.StorageDataStatusEnum;
  45. import org.dbsyncer.storage.impl.SnowflakeIdWorker;
  46. import org.dbsyncer.storage.lucene.Option;
  47. import org.dbsyncer.storage.lucene.Shard;
  48. import org.dbsyncer.storage.util.BinlogMessageUtil;
  49. import org.dbsyncer.storage.util.DocumentUtil;
  50. import org.junit.After;
  51. import org.junit.Before;
  52. import org.junit.Test;
  53. import org.slf4j.Logger;
  54. import org.slf4j.LoggerFactory;
  55. import java.io.IOException;
  56. import java.io.StringReader;
  57. import java.time.Instant;
  58. import java.util.ArrayList;
  59. import java.util.HashMap;
  60. import java.util.List;
  61. import java.util.Map;
  62. import java.util.concurrent.BrokenBarrierException;
  63. import java.util.concurrent.CountDownLatch;
  64. import java.util.concurrent.CyclicBarrier;
  65. import java.util.concurrent.ExecutorService;
  66. import java.util.concurrent.Executors;
  67. import java.util.concurrent.TimeUnit;
  68. public class LuceneFactoryTest {
  69. private final Logger logger = LoggerFactory.getLogger(getClass());
  70. private Shard shard;
  71. private SnowflakeIdWorker snowflakeIdWorker = new SnowflakeIdWorker();
  72. @Before
  73. public void setUp() throws IOException {
  74. shard = new Shard("target/indexDir/");
  75. }
  76. @After
  77. public void tearDown() throws IOException {
  78. shard.deleteAll();
  79. }
  80. @Test
  81. public void testConcurrentUpdate() throws InterruptedException {
  82. // 模拟并发
  83. final int threadSize = 100;
  84. final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
  85. final CyclicBarrier barrier = new CyclicBarrier(threadSize);
  86. final CountDownLatch latch = new CountDownLatch(threadSize);
  87. for (int i = 0; i < threadSize; i++) {
  88. final int k = i + 1;
  89. pool.submit(() -> {
  90. try {
  91. barrier.await();
  92. // 模拟操作
  93. System.out.println(String.format("%s:%s", Thread.currentThread().getName(), k));
  94. List<Document> docs = new ArrayList<>();
  95. docs.add(DocumentUtil.convertData2Doc(createMap(k)));
  96. shard.insertBatch(docs);
  97. } catch (InterruptedException e) {
  98. logger.error(e.getMessage());
  99. } catch (BrokenBarrierException e) {
  100. logger.error(e.getMessage());
  101. } catch (Exception e) {
  102. logger.error(e.getMessage());
  103. } finally {
  104. latch.countDown();
  105. }
  106. });
  107. }
  108. try {
  109. latch.await();
  110. logger.info("try to shutdown");
  111. pool.shutdown();
  112. check();
  113. Sort sort = new Sort(new SortField(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, SortField.Type.LONG, true),
  114. new SortField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, SortField.Type.LONG, true));
  115. Option option = new Option();
  116. option.setQuery(new MatchAllDocsQuery());
  117. Paging paging = shard.query(option, 1, 20, sort);
  118. if (!CollectionUtils.isEmpty(paging.getData())) {
  119. List<Map> data = (List<Map>) paging.getData();
  120. data.stream().forEach(r -> System.out.println(r));
  121. }
  122. } catch (InterruptedException e) {
  123. logger.error(e.getMessage());
  124. } catch (IOException e) {
  125. logger.error(e.getMessage());
  126. }
  127. TimeUnit.SECONDS.sleep(3);
  128. logger.info("test end");
  129. }
  130. private Map<String, Object> createMap(int i) {
  131. Map<String, Object> params = new HashMap();
  132. params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
  133. params.put(ConfigConstant.DATA_SUCCESS, StorageDataStatusEnum.SUCCESS.getValue());
  134. params.put(ConfigConstant.DATA_EVENT, ConnectorConstant.OPERTION_UPDATE);
  135. params.put(ConfigConstant.DATA_TABLE_GROUP_ID, "" + i);
  136. params.put(ConfigConstant.DATA_TARGET_TABLE_NAME, "MY_USER");
  137. params.put(ConfigConstant.DATA_ERROR, "");
  138. Map<String, Object> row = new HashMap<>();
  139. row.put("id", i);
  140. row.put("name", "中文");
  141. row.put("tel", "15800001234");
  142. row.put("update_time", Instant.now().toEpochMilli());
  143. row.put("remark", "test" + i);
  144. params.put(ConfigConstant.BINLOG_DATA, BinlogMessageUtil.toBinlogMap(row).toByteArray());
  145. params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
  146. return params;
  147. }
  148. @Test
  149. public void testQuery() throws IOException {
  150. int size = 3;
  151. List<Document> docs = new ArrayList<>();
  152. for (int i = size; i > 0; i--) {
  153. Document doc = new Document();
  154. doc.add(new StringField("id", String.valueOf(i), Field.Store.YES));
  155. doc.add(new StringField("name", "中文" + i, Field.Store.YES));
  156. doc.add(new TextField("content", "这是一串很长长长长长长长的文本", Field.Store.YES));
  157. // 创建索引
  158. int age = RandomUtil.nextInt(0, 50);
  159. doc.add(new IntPoint("age", age));
  160. // 需要存储内容
  161. doc.add(new StoredField("age", age));
  162. // 需要排序
  163. doc.add(new NumericDocValuesField("age", age));
  164. System.out.println(String.format("id=%s,age:=%s", String.valueOf(i), age));
  165. // 2020-05-23 12:00:00
  166. long createTime = 1590206400000L + i;
  167. doc.add(new LongPoint("createTime", createTime));
  168. doc.add(new StoredField("createTime", createTime));
  169. doc.add(new NumericDocValuesField("createTime", createTime));
  170. docs.add(doc);
  171. }
  172. shard.insertBatch(docs);
  173. // 范围查询 IntPoint.newRangeQuery("id", 1, 100)
  174. // 集合查询 IntPoint.newSetQuery("id", 2, 3)
  175. // 单个查询 IntPoint.newExactQuery("id", 3)
  176. BooleanQuery query = new BooleanQuery.Builder()
  177. .add(IntPoint.newRangeQuery("age", 1, 100), BooleanClause.Occur.MUST)
  178. .build();
  179. Option option = new Option();
  180. option.setQuery(query);
  181. Paging paging = shard.query(option, 1, 20, new Sort(new SortField("createTime", SortField.Type.LONG, true)));
  182. paging.getData().forEach(m -> System.out.println(m));
  183. // 清空
  184. shard.deleteAll();
  185. }
  186. @Test
  187. public void testCURD() throws IOException {
  188. System.out.println("测试前:");
  189. List<Map> maps = query(new MatchAllDocsQuery());
  190. maps.forEach(m -> System.out.println(m));
  191. check();
  192. // 新增
  193. Document doc = new Document();
  194. String id = "100";
  195. doc.add(new StringField("id", id, Field.Store.YES));
  196. doc.add(new TextField("content", "这是一款大规模数据处理软件,名字叫做Apache Spark", Field.Store.YES));
  197. List<Document> docs = new ArrayList<>();
  198. docs.add(doc);
  199. shard.insertBatch(docs);
  200. System.out.println("新增后:");
  201. maps = query(new MatchAllDocsQuery());
  202. maps.forEach(m -> System.out.println(m));
  203. check();
  204. // 修改
  205. doc.add(new TextField("content", "这是一款大规模数据处理软件,名字叫做Apache Spark[已修改]", Field.Store.YES));
  206. shard.update(new Term("id", id), doc);
  207. System.out.println("修改后:");
  208. maps = query(new MatchAllDocsQuery());
  209. maps.forEach(m -> System.out.println(m));
  210. check();
  211. // 删除
  212. shard.deleteBatch(new Term("id", id));
  213. System.out.println("删除后:");
  214. maps = query(new MatchAllDocsQuery());
  215. maps.forEach(m -> System.out.println(m));
  216. check();
  217. // 清空
  218. shard.deleteAll();
  219. }
  220. @Test
  221. public void testBinary() throws IOException {
  222. System.out.println("测试前:");
  223. List<Map> maps = query(new MatchAllDocsQuery());
  224. maps.forEach(m -> System.out.println(m));
  225. check();
  226. // 新增
  227. Document doc = new Document();
  228. String id = "100";
  229. doc.add(new StringField("id", id, Field.Store.YES));
  230. BytesRef bytesRef = new BytesRef("中文".getBytes());
  231. doc.add(new StoredField("content", bytesRef));
  232. List<Document> docs = new ArrayList<>();
  233. docs.add(doc);
  234. shard.insertBatch(docs);
  235. System.out.println("新增后:");
  236. maps = query(new MatchAllDocsQuery());
  237. maps.forEach(m -> {
  238. m.get("content");
  239. System.out.println(m);
  240. });
  241. check();
  242. // 清空
  243. shard.deleteAll();
  244. }
  245. /**
  246. * 按词条搜索
  247. * <p>
  248. * TermQuery是最简单、也是最常用的Query。TermQuery可以理解成为“词条搜索”, 在搜索引擎中最基本的搜索就是在索引中搜索某一词条,而TermQuery就是用来完成这项工作的。
  249. * 在Lucene中词条是最基本的搜索单位,从本质上来讲一个词条其实就是一个名/值对。 只不过这个“名”是字段名,而“值”则表示字段中所包含的某个关键字。
  250. *
  251. * @throws IOException
  252. */
  253. @Test
  254. public void testQueryTerm() throws IOException {
  255. String searchField = "title";
  256. //这是一个条件查询的api,用于添加条件
  257. TermQuery query = new TermQuery(new Term(searchField, "Spark"));
  258. //执行查询,并打印查询到的记录数
  259. query(query);
  260. }
  261. /**
  262. * 多条件查询
  263. * <p>
  264. * BooleanQuery也是实际开发过程中经常使用的一种Query。 它其实是一个组合的Query,在使用时可以把各种Query对象添加进去并标明它们之间的逻辑关系。 BooleanQuery本身来讲是一个布尔子句的容器,它提供了专门的API方法往其中添加子句,
  265. * 并标明它们之间的关系,以下代码为BooleanQuery提供的用于添加子句的API接口:
  266. *
  267. * @throws IOException
  268. */
  269. @Test
  270. public void testQueryBoolean() throws IOException {
  271. String searchField1 = "title";
  272. String searchField2 = "content";
  273. Query query1 = new TermQuery(new Term(searchField1, "Spark"));
  274. Query query2 = new TermQuery(new Term(searchField2, "Apache"));
  275. BooleanQuery.Builder builder = new BooleanQuery.Builder();
  276. // BooleanClause用于表示布尔查询子句关系的类,
  277. // 包 括:
  278. // BooleanClause.Occur.MUST,
  279. // BooleanClause.Occur.MUST_NOT,
  280. // BooleanClause.Occur.SHOULD。
  281. // 必须包含,不能包含,可以包含三种.有以下6种组合:
  282. //
  283. // 1.MUST和MUST:取得连个查询子句的交集。
  284. // 2.MUST和MUST_NOT:表示查询结果中不能包含MUST_NOT所对应得查询子句的检索结果。
  285. // 3.SHOULD与MUST_NOT:连用时,功能同MUST和MUST_NOT。
  286. // 4.SHOULD与MUST连用时,结果为MUST子句的检索结果,但是SHOULD可影响排序。
  287. // 5.SHOULD与SHOULD:表示“或”关系,最终检索结果为所有检索子句的并集。
  288. // 6.MUST_NOT和MUST_NOT:无意义,检索无结果。
  289. builder.add(query1, BooleanClause.Occur.SHOULD);
  290. builder.add(query2, BooleanClause.Occur.SHOULD);
  291. BooleanQuery query = builder.build();
  292. //执行查询,并打印查询到的记录数
  293. query(query);
  294. }
  295. /**
  296. * 匹配前缀
  297. * <p>
  298. * PrefixQuery用于匹配其索引开始以指定的字符串的文档。就是文档中存在xxx%
  299. * <p>
  300. *
  301. * @throws IOException
  302. */
  303. @Test
  304. public void testQueryPrefix() throws IOException {
  305. String searchField = "title";
  306. Term term = new Term(searchField, "Spar");
  307. Query query = new PrefixQuery(term);
  308. //执行查询,并打印查询到的记录数
  309. query(query);
  310. }
  311. /**
  312. * 短语搜索
  313. * <p>
  314. * 所谓PhraseQuery,就是通过短语来检索,比如我想查“big car”这个短语, 那么如果待匹配的document的指定项里包含了"big car"这个短语, 这个document就算匹配成功。可如果待匹配的句子里包含的是“big black car”,
  315. * 那么就无法匹配成功了,如果也想让这个匹配,就需要设定slop, 先给出slop的概念:slop是指两个项的位置之间允许的最大间隔距离
  316. *
  317. * @throws IOException
  318. */
  319. @Test
  320. public void testQueryPhrase() throws IOException {
  321. String searchField = "content";
  322. String query1 = "apache";
  323. String query2 = "spark";
  324. PhraseQuery.Builder builder = new PhraseQuery.Builder();
  325. builder.add(new Term(searchField, query1));
  326. builder.add(new Term(searchField, query2));
  327. builder.setSlop(0);
  328. PhraseQuery phraseQuery = builder.build();
  329. //执行查询,并打印查询到的记录数
  330. query(phraseQuery);
  331. }
  332. /**
  333. * 相近词语搜索
  334. * <p>
  335. * FuzzyQuery是一种模糊查询,它可以简单地识别两个相近的词语。
  336. *
  337. * @throws IOException
  338. */
  339. @Test
  340. public void testQueryFuzzy() throws IOException {
  341. String searchField = "content";
  342. Term t = new Term(searchField, "大规模");
  343. Query query = new FuzzyQuery(t);
  344. //执行查询,并打印查询到的记录数
  345. query(query);
  346. }
  347. /**
  348. * 通配符搜索(IO影响较大,不建议使用)
  349. * <p>
  350. * Lucene也提供了通配符的查询,这就是WildcardQuery。 通配符“?”代表1个字符,而“*”则代表0至多个字符。
  351. *
  352. * @throws IOException
  353. */
  354. @Test
  355. public void testQueryWildcard() throws IOException {
  356. String searchField = "content";
  357. Term term = new Term(searchField, "大*规模");
  358. Query query = new WildcardQuery(term);
  359. //执行查询,并打印查询到的记录数
  360. query(query);
  361. }
  362. /**
  363. * 分词查询
  364. *
  365. * @throws IOException
  366. * @throws ParseException
  367. */
  368. @Test
  369. public void testQueryParser() throws IOException, ParseException {
  370. final Analyzer analyzer = shard.getAnalyzer();
  371. String searchField = "content";
  372. //指定搜索字段和分析器
  373. QueryParser parser = new QueryParser(searchField, analyzer);
  374. //QueryParser queryParser = new MultiFieldQueryParser(new String[]{"title", "content"}, analyzer);
  375. //用户输入内容
  376. Query query = parser.parse("Spark");
  377. //执行查询,并打印查询到的记录数
  378. query(query);
  379. }
  380. /**
  381. * 高亮处理
  382. *
  383. * @throws IOException
  384. */
  385. @Test
  386. public void testHighlighter() throws IOException, ParseException, InvalidTokenOffsetsException {
  387. final Analyzer analyzer = shard.getAnalyzer();
  388. final IndexSearcher searcher = shard.getSearcher();
  389. String searchField = "content";
  390. String text = "大规模";
  391. //指定搜索字段和分析器
  392. QueryParser parser = new QueryParser(searchField, analyzer);
  393. //用户输入内容
  394. Query query = parser.parse(text);
  395. TopDocs topDocs = searcher.search(query, 100);
  396. // 关键字高亮显示的html标签,需要导入lucene-highlighter-xxx.jar
  397. SimpleHTMLFormatter simpleHTMLFormatter = new SimpleHTMLFormatter("<span style='color:red'>", "</span>");
  398. Highlighter highlighter = new Highlighter(simpleHTMLFormatter, new QueryScorer(query));
  399. for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
  400. //取得对应的文档对象
  401. Document document = searcher.doc(scoreDoc.doc);
  402. // 内容增加高亮显示
  403. TokenStream tokenStream = analyzer.tokenStream("content", new StringReader(document.get("content")));
  404. String content = highlighter.getBestFragment(tokenStream, document.get("content"));
  405. System.out.println(content);
  406. }
  407. }
  408. @Test
  409. public void testAnalyzerDoc() throws IOException {
  410. // SmartChineseAnalyzer smartcn分词器 需要lucene依赖 且和lucene版本同步
  411. Analyzer analyzer = new SmartChineseAnalyzer();
  412. String text = "Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎";
  413. TokenStream tokenStream = analyzer.tokenStream("content", new StringReader(text));
  414. CharTermAttribute charTermAttribute = tokenStream.addAttribute(CharTermAttribute.class);
  415. try {
  416. tokenStream.reset();
  417. while (tokenStream.incrementToken()) {
  418. System.out.println(charTermAttribute.toString());
  419. }
  420. tokenStream.end();
  421. } finally {
  422. tokenStream.close();
  423. analyzer.close();
  424. }
  425. }
  426. private List<Map> query(Query query) throws IOException {
  427. Option option = new Option();
  428. option.setQuery(query);
  429. Paging paging = shard.query(option, 1, 20, null);
  430. return (List<Map>) paging.getData();
  431. }
  432. private void check() throws IOException {
  433. final IndexSearcher searcher = shard.getSearcher();
  434. IndexReader reader = searcher.getIndexReader();
  435. // 通过reader可以有效的获取到文档的数量
  436. // 有效的索引文档
  437. System.out.println("有效的索引文档:" + reader.numDocs());
  438. // 总共的索引文档
  439. System.out.println("总共的索引文档:" + reader.maxDoc());
  440. // 删掉的索引文档,其实不恰当,应该是在回收站里的索引文档
  441. System.out.println("删掉的索引文档:" + reader.numDeletedDocs());
  442. }
  443. }