ShardBinlogTest.java 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. import com.google.protobuf.ByteString;
  2. import org.apache.lucene.document.Document;
  3. import org.apache.lucene.document.IntPoint;
  4. import org.apache.lucene.document.LongPoint;
  5. import org.apache.lucene.index.IndexReader;
  6. import org.apache.lucene.index.Term;
  7. import org.apache.lucene.search.*;
  8. import org.apache.lucene.util.BytesRef;
  9. import org.dbsyncer.common.model.Paging;
  10. import org.dbsyncer.common.util.DateFormatUtil;
  11. import org.dbsyncer.storage.binlog.proto.BinlogMap;
  12. import org.dbsyncer.storage.binlog.proto.BinlogMessage;
  13. import org.dbsyncer.storage.binlog.proto.EventEnum;
  14. import org.dbsyncer.storage.constant.BinlogConstant;
  15. import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
  16. import org.dbsyncer.storage.lucene.Shard;
  17. import org.dbsyncer.storage.query.Option;
  18. import org.dbsyncer.storage.util.BinlogMessageUtil;
  19. import org.dbsyncer.storage.util.DocumentUtil;
  20. import org.junit.After;
  21. import org.junit.Before;
  22. import org.junit.Test;
  23. import org.slf4j.Logger;
  24. import org.slf4j.LoggerFactory;
  25. import java.io.IOException;
  26. import java.math.BigDecimal;
  27. import java.math.BigInteger;
  28. import java.nio.charset.Charset;
  29. import java.sql.Date;
  30. import java.sql.Timestamp;
  31. import java.time.Instant;
  32. import java.time.LocalDateTime;
  33. import java.util.ArrayList;
  34. import java.util.HashMap;
  35. import java.util.List;
  36. import java.util.Map;
  37. import java.util.concurrent.TimeUnit;
  38. /**
  39. * @author AE86
  40. * @version 1.0.0
  41. * @date 2022/6/18 23:46
  42. */
  43. public class ShardBinlogTest {
  44. private final Logger logger = LoggerFactory.getLogger(getClass());
  45. private Shard shard;
  46. @Before
  47. public void init() throws IOException {
  48. shard = new Shard("target/indexDir/");
  49. }
  50. @After
  51. public void close() throws IOException {
  52. shard.deleteAll();
  53. }
  54. @Test
  55. public void testBinlogMessage() throws IOException, InterruptedException {
  56. mockData(1);
  57. // 查询[待处理] 或 [处理中 且 处理超时]
  58. List<Map> maps = queryReadyAndProcess();
  59. logger.info("总条数:{}", maps.size());
  60. TimeUnit.SECONDS.sleep(1);
  61. markProcessing(maps);
  62. logger.info("标记处理中");
  63. // 模拟新记录
  64. TimeUnit.SECONDS.sleep(1);
  65. mockData(6);
  66. maps = queryReadyAndProcess();
  67. logger.info("【待处理】和【处理中 且 处理超时】总条数:{}", maps.size());
  68. logger.info("模拟处理超时,等待10s");
  69. TimeUnit.SECONDS.sleep(10);
  70. maps = queryReadyAndProcess();
  71. logger.info("【待处理】和【处理中 且 处理超时】总条数:{}", maps.size());
  72. }
  73. private void markProcessing(List<Map> maps) {
  74. long updateTime = Instant.now().toEpochMilli();
  75. maps.forEach(row -> {
  76. String id = (String) row.get(BinlogConstant.BINLOG_ID);
  77. BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
  78. try {
  79. shard.update(new Term(BinlogConstant.BINLOG_ID, String.valueOf(id)), DocumentUtil.convertBinlog2Doc(id, BinlogConstant.PROCESSING, ref, updateTime));
  80. } catch (IOException e) {
  81. throw new RuntimeException(e);
  82. }
  83. });
  84. }
  85. private void mockData(int i) throws IOException {
  86. List<Document> list = new ArrayList<>();
  87. long now = Instant.now().toEpochMilli();
  88. int size = i + 5;
  89. while (i < size) {
  90. BinlogMessage message = genMessage("123456", i + "");
  91. BytesRef bytes = new BytesRef(message.toByteArray());
  92. list.add(DocumentUtil.convertBinlog2Doc(String.valueOf(i), BinlogConstant.READY, bytes, now));
  93. if (i % 1000 == 0) {
  94. shard.insertBatch(list);
  95. list.clear();
  96. }
  97. i++;
  98. }
  99. if (!list.isEmpty()) {
  100. shard.insertBatch(list);
  101. }
  102. check();
  103. }
  104. private List<Map> queryReadyAndProcess() throws IOException {
  105. long lastTime = Timestamp.valueOf(LocalDateTime.now().minusSeconds(5)).getTime();
  106. BooleanQuery filter1 = new BooleanQuery.Builder()
  107. .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST)
  108. .build();
  109. BooleanQuery filter2 = new BooleanQuery.Builder()
  110. .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.PROCESSING), BooleanClause.Occur.MUST)
  111. .add(LongPoint.newRangeQuery(BinlogConstant.BINLOG_TIME, Long.MIN_VALUE, lastTime), BooleanClause.Occur.MUST)
  112. .build();
  113. BooleanQuery query = new BooleanQuery.Builder()
  114. .add(filter1, BooleanClause.Occur.SHOULD)
  115. .add(filter2, BooleanClause.Occur.SHOULD)
  116. .build();
  117. return query(new Option(query));
  118. }
  119. private List<Map> query(Option option) throws IOException {
  120. option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_ID, IndexFieldResolverEnum.STRING);
  121. option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_STATUS, IndexFieldResolverEnum.INT);
  122. option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_CONTENT, IndexFieldResolverEnum.BINARY);
  123. option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_TIME, IndexFieldResolverEnum.LONG);
  124. Sort sort = new Sort(new SortField(BinlogConstant.BINLOG_TIME, SortField.Type.LONG));
  125. Paging paging = shard.query(option, 1, 10001, sort);
  126. List<Map> maps = (List<Map>) paging.getData();
  127. for (Map m : maps) {
  128. String id = (String) m.get(BinlogConstant.BINLOG_ID);
  129. Integer s = (Integer) m.get(BinlogConstant.BINLOG_STATUS);
  130. BytesRef ref = (BytesRef) m.get(BinlogConstant.BINLOG_CONTENT);
  131. Long t = (Long) m.get(BinlogConstant.BINLOG_TIME);
  132. BinlogMessage message = BinlogMessage.parseFrom(ref.bytes);
  133. Map<String, ByteString> rowMap = message.getData().getRowMap();
  134. String timestamp = DateFormatUtil.timestampToString(new Timestamp(t));
  135. logger.info("t:{}, id:{}, s:{}, message:{}", timestamp, id, s, rowMap.get("name").toStringUtf8());
  136. }
  137. return maps;
  138. }
  139. private BinlogMessage genMessage(String tableGroupId, String key) {
  140. Map<String, Object> data = new HashMap<>();
  141. data.put("id", 1L);
  142. data.put("name", key + "中文");
  143. data.put("age", 88);
  144. data.put("bd", new BigDecimal(88));
  145. data.put("bigInt", new BigInteger("123456789876543210"));
  146. data.put("localTime", LocalDateTime.now());
  147. data.put("sex", 1);
  148. data.put("f", 88.88f);
  149. data.put("d", 999.99d);
  150. data.put("b", true);
  151. short ss = 32767;
  152. data.put("ss", ss);
  153. data.put("bytes", "中文666".getBytes(Charset.defaultCharset()));
  154. data.put("create_date", new Date(Timestamp.valueOf(LocalDateTime.now()).getTime()));
  155. data.put("update_time", Timestamp.valueOf(LocalDateTime.now()).getTime());
  156. BinlogMap.Builder builder = BinlogMap.newBuilder();
  157. data.forEach((k, v) -> {
  158. if (null != v) {
  159. ByteString bytes = BinlogMessageUtil.serializeValue(v);
  160. if (null != bytes) {
  161. builder.putRow(k, bytes);
  162. }
  163. }
  164. });
  165. BinlogMessage build = BinlogMessage.newBuilder().setTableGroupId(tableGroupId).setEvent(EventEnum.UPDATE).setData(builder.build()).build();
  166. return build;
  167. }
  168. private void check() throws IOException {
  169. final IndexSearcher searcher = shard.getSearcher();
  170. IndexReader reader = searcher.getIndexReader();
  171. // 通过reader可以有效的获取到文档的数量
  172. // 有效的索引文档
  173. System.out.println("有效的索引文档:" + reader.numDocs());
  174. // 总共的索引文档
  175. System.out.println("总共的索引文档:" + reader.maxDoc());
  176. // 删掉的索引文档,其实不恰当,应该是在回收站里的索引文档
  177. System.out.println("删掉的索引文档:" + reader.numDeletedDocs());
  178. }
  179. }