123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- import com.google.protobuf.ByteString;
- import org.apache.lucene.document.Document;
- import org.apache.lucene.document.IntPoint;
- import org.apache.lucene.document.LongPoint;
- import org.apache.lucene.index.IndexReader;
- import org.apache.lucene.index.Term;
- import org.apache.lucene.search.*;
- import org.apache.lucene.util.BytesRef;
- import org.dbsyncer.common.model.Paging;
- import org.dbsyncer.common.util.DateFormatUtil;
- import org.dbsyncer.storage.binlog.proto.BinlogMap;
- import org.dbsyncer.storage.binlog.proto.BinlogMessage;
- import org.dbsyncer.storage.binlog.proto.EventEnum;
- import org.dbsyncer.storage.constant.BinlogConstant;
- import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
- import org.dbsyncer.storage.lucene.Shard;
- import org.dbsyncer.storage.query.Option;
- import org.dbsyncer.storage.util.BinlogMessageUtil;
- import org.dbsyncer.storage.util.DocumentUtil;
- import org.junit.After;
- import org.junit.Before;
- import org.junit.Test;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.IOException;
- import java.math.BigDecimal;
- import java.math.BigInteger;
- import java.nio.charset.Charset;
- import java.sql.Date;
- import java.sql.Timestamp;
- import java.time.Instant;
- import java.time.LocalDateTime;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.TimeUnit;
- /**
- * @author AE86
- * @version 1.0.0
- * @date 2022/6/18 23:46
- */
- public class ShardBinlogTest {
- private final Logger logger = LoggerFactory.getLogger(getClass());
- private Shard shard;
- @Before
- public void init() throws IOException {
- shard = new Shard("target/indexDir/");
- }
- @After
- public void close() throws IOException {
- shard.deleteAll();
- }
- @Test
- public void testBinlogMessage() throws IOException, InterruptedException {
- mockData(1);
- // 查询[待处理] 或 [处理中 且 处理超时]
- List<Map> maps = queryReadyAndProcess();
- logger.info("总条数:{}", maps.size());
- TimeUnit.SECONDS.sleep(1);
- markProcessing(maps);
- logger.info("标记处理中");
- // 模拟新记录
- TimeUnit.SECONDS.sleep(1);
- mockData(6);
- maps = queryReadyAndProcess();
- logger.info("【待处理】和【处理中 且 处理超时】总条数:{}", maps.size());
- logger.info("模拟处理超时,等待10s");
- TimeUnit.SECONDS.sleep(10);
- maps = queryReadyAndProcess();
- logger.info("【待处理】和【处理中 且 处理超时】总条数:{}", maps.size());
- }
- private void markProcessing(List<Map> maps) {
- long updateTime = Instant.now().toEpochMilli();
- maps.forEach(row -> {
- String id = (String) row.get(BinlogConstant.BINLOG_ID);
- BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
- try {
- shard.update(new Term(BinlogConstant.BINLOG_ID, String.valueOf(id)), DocumentUtil.convertBinlog2Doc(id, BinlogConstant.PROCESSING, ref, updateTime));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- }
- private void mockData(int i) throws IOException {
- List<Document> list = new ArrayList<>();
- long now = Instant.now().toEpochMilli();
- int size = i + 5;
- while (i < size) {
- BinlogMessage message = genMessage("123456", i + "");
- BytesRef bytes = new BytesRef(message.toByteArray());
- list.add(DocumentUtil.convertBinlog2Doc(String.valueOf(i), BinlogConstant.READY, bytes, now));
- if (i % 1000 == 0) {
- shard.insertBatch(list);
- list.clear();
- }
- i++;
- }
- if (!list.isEmpty()) {
- shard.insertBatch(list);
- }
- check();
- }
- private List<Map> queryReadyAndProcess() throws IOException {
- long lastTime = Timestamp.valueOf(LocalDateTime.now().minusSeconds(5)).getTime();
- BooleanQuery filter1 = new BooleanQuery.Builder()
- .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST)
- .build();
- BooleanQuery filter2 = new BooleanQuery.Builder()
- .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.PROCESSING), BooleanClause.Occur.MUST)
- .add(LongPoint.newRangeQuery(BinlogConstant.BINLOG_TIME, Long.MIN_VALUE, lastTime), BooleanClause.Occur.MUST)
- .build();
- BooleanQuery query = new BooleanQuery.Builder()
- .add(filter1, BooleanClause.Occur.SHOULD)
- .add(filter2, BooleanClause.Occur.SHOULD)
- .build();
- return query(new Option(query));
- }
- private List<Map> query(Option option) throws IOException {
- option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_ID, IndexFieldResolverEnum.STRING);
- option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_STATUS, IndexFieldResolverEnum.INT);
- option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_CONTENT, IndexFieldResolverEnum.BINARY);
- option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_TIME, IndexFieldResolverEnum.LONG);
- Sort sort = new Sort(new SortField(BinlogConstant.BINLOG_TIME, SortField.Type.LONG));
- Paging paging = shard.query(option, 1, 10001, sort);
- List<Map> maps = (List<Map>) paging.getData();
- for (Map m : maps) {
- String id = (String) m.get(BinlogConstant.BINLOG_ID);
- Integer s = (Integer) m.get(BinlogConstant.BINLOG_STATUS);
- BytesRef ref = (BytesRef) m.get(BinlogConstant.BINLOG_CONTENT);
- Long t = (Long) m.get(BinlogConstant.BINLOG_TIME);
- BinlogMessage message = BinlogMessage.parseFrom(ref.bytes);
- Map<String, ByteString> rowMap = message.getData().getRowMap();
- String timestamp = DateFormatUtil.timestampToString(new Timestamp(t));
- logger.info("t:{}, id:{}, s:{}, message:{}", timestamp, id, s, rowMap.get("name").toStringUtf8());
- }
- return maps;
- }
- private BinlogMessage genMessage(String tableGroupId, String key) {
- Map<String, Object> data = new HashMap<>();
- data.put("id", 1L);
- data.put("name", key + "中文");
- data.put("age", 88);
- data.put("bd", new BigDecimal(88));
- data.put("bigInt", new BigInteger("123456789876543210"));
- data.put("localTime", LocalDateTime.now());
- data.put("sex", 1);
- data.put("f", 88.88f);
- data.put("d", 999.99d);
- data.put("b", true);
- short ss = 32767;
- data.put("ss", ss);
- data.put("bytes", "中文666".getBytes(Charset.defaultCharset()));
- data.put("create_date", new Date(Timestamp.valueOf(LocalDateTime.now()).getTime()));
- data.put("update_time", Timestamp.valueOf(LocalDateTime.now()).getTime());
- BinlogMap.Builder builder = BinlogMap.newBuilder();
- data.forEach((k, v) -> {
- if (null != v) {
- ByteString bytes = BinlogMessageUtil.serializeValue(v);
- if (null != bytes) {
- builder.putRow(k, bytes);
- }
- }
- });
- BinlogMessage build = BinlogMessage.newBuilder().setTableGroupId(tableGroupId).setEvent(EventEnum.UPDATE).setData(builder.build()).build();
- return build;
- }
- private void check() throws IOException {
- final IndexSearcher searcher = shard.getSearcher();
- IndexReader reader = searcher.getIndexReader();
- // 通过reader可以有效的获取到文档的数量
- // 有效的索引文档
- System.out.println("有效的索引文档:" + reader.numDocs());
- // 总共的索引文档
- System.out.println("总共的索引文档:" + reader.maxDoc());
- // 删掉的索引文档,其实不恰当,应该是在回收站里的索引文档
- System.out.println("删掉的索引文档:" + reader.numDeletedDocs());
- }
- }
|