|
@@ -2,26 +2,47 @@ import org.apache.lucene.analysis.Analyzer;
|
|
|
import org.apache.lucene.analysis.TokenStream;
|
|
|
import org.apache.lucene.analysis.cn.smart.SmartChineseAnalyzer;
|
|
|
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
|
|
|
-import org.apache.lucene.document.*;
|
|
|
+import org.apache.lucene.document.Document;
|
|
|
+import org.apache.lucene.document.Field;
|
|
|
+import org.apache.lucene.document.IntPoint;
|
|
|
+import org.apache.lucene.document.LongPoint;
|
|
|
+import org.apache.lucene.document.NumericDocValuesField;
|
|
|
+import org.apache.lucene.document.StoredField;
|
|
|
+import org.apache.lucene.document.StringField;
|
|
|
+import org.apache.lucene.document.TextField;
|
|
|
import org.apache.lucene.index.IndexReader;
|
|
|
import org.apache.lucene.index.Term;
|
|
|
import org.apache.lucene.queryparser.classic.ParseException;
|
|
|
import org.apache.lucene.queryparser.classic.QueryParser;
|
|
|
-import org.apache.lucene.search.*;
|
|
|
+import org.apache.lucene.search.BooleanClause;
|
|
|
+import org.apache.lucene.search.BooleanQuery;
|
|
|
+import org.apache.lucene.search.FuzzyQuery;
|
|
|
+import org.apache.lucene.search.IndexSearcher;
|
|
|
+import org.apache.lucene.search.MatchAllDocsQuery;
|
|
|
+import org.apache.lucene.search.PhraseQuery;
|
|
|
+import org.apache.lucene.search.PrefixQuery;
|
|
|
+import org.apache.lucene.search.Query;
|
|
|
+import org.apache.lucene.search.ScoreDoc;
|
|
|
+import org.apache.lucene.search.Sort;
|
|
|
+import org.apache.lucene.search.SortField;
|
|
|
+import org.apache.lucene.search.TermQuery;
|
|
|
+import org.apache.lucene.search.TopDocs;
|
|
|
+import org.apache.lucene.search.WildcardQuery;
|
|
|
import org.apache.lucene.search.highlight.Highlighter;
|
|
|
import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
|
|
|
import org.apache.lucene.search.highlight.QueryScorer;
|
|
|
import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
|
|
|
import org.apache.lucene.util.BytesRef;
|
|
|
import org.dbsyncer.common.model.Paging;
|
|
|
+import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
|
|
|
import org.dbsyncer.common.util.CollectionUtils;
|
|
|
-import org.dbsyncer.common.util.JsonUtil;
|
|
|
import org.dbsyncer.common.util.RandomUtil;
|
|
|
import org.dbsyncer.connector.constant.ConnectorConstant;
|
|
|
import org.dbsyncer.storage.constant.ConfigConstant;
|
|
|
import org.dbsyncer.storage.enums.StorageDataStatusEnum;
|
|
|
+import org.dbsyncer.storage.lucene.Option;
|
|
|
import org.dbsyncer.storage.lucene.Shard;
|
|
|
-import org.dbsyncer.storage.query.Option;
|
|
|
+import org.dbsyncer.storage.util.BinlogMessageUtil;
|
|
|
import org.dbsyncer.storage.util.DocumentUtil;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Before;
|
|
@@ -32,15 +53,24 @@ import org.slf4j.LoggerFactory;
|
|
|
import java.io.IOException;
|
|
|
import java.io.StringReader;
|
|
|
import java.time.Instant;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
-import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.BrokenBarrierException;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.CyclicBarrier;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
public class LuceneFactoryTest {
|
|
|
|
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
- private Shard shard;
|
|
|
+
|
|
|
+ private Shard shard;
|
|
|
+
|
|
|
+ private SnowflakeIdWorker snowflakeIdWorker = new SnowflakeIdWorker();
|
|
|
|
|
|
@Before
|
|
|
public void setUp() throws IOException {
|
|
@@ -60,7 +90,7 @@ public class LuceneFactoryTest {
|
|
|
final CyclicBarrier barrier = new CyclicBarrier(threadSize);
|
|
|
final CountDownLatch latch = new CountDownLatch(threadSize);
|
|
|
for (int i = 0; i < threadSize; i++) {
|
|
|
- final int k = i + 3;
|
|
|
+ final int k = i + 1;
|
|
|
pool.submit(() -> {
|
|
|
try {
|
|
|
barrier.await();
|
|
@@ -68,10 +98,9 @@ public class LuceneFactoryTest {
|
|
|
// 模拟操作
|
|
|
System.out.println(String.format("%s:%s", Thread.currentThread().getName(), k));
|
|
|
|
|
|
- Document data = DocumentUtil.convertData2Doc(createMap(k));
|
|
|
- //IndexableField field = data.getField(ConfigConstant.CONFIG_MODEL_ID);
|
|
|
- //shard.update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), data);
|
|
|
- shard.insert(data);
|
|
|
+ List<Document> docs = new ArrayList<>();
|
|
|
+ docs.add(DocumentUtil.convertData2Doc(createMap(k)));
|
|
|
+ shard.insertBatch(docs);
|
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
logger.error(e.getMessage());
|
|
@@ -92,7 +121,9 @@ public class LuceneFactoryTest {
|
|
|
check();
|
|
|
Sort sort = new Sort(new SortField(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, SortField.Type.LONG, true),
|
|
|
new SortField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, SortField.Type.LONG, true));
|
|
|
- Paging paging = shard.query(new Option(new MatchAllDocsQuery()), 1, 20, sort);
|
|
|
+ Option option = new Option();
|
|
|
+ option.setQuery(new MatchAllDocsQuery());
|
|
|
+ Paging paging = shard.query(option, 1, 20, sort);
|
|
|
if (!CollectionUtils.isEmpty(paging.getData())) {
|
|
|
List<Map> data = (List<Map>) paging.getData();
|
|
|
data.stream().forEach(r -> System.out.println(r));
|
|
@@ -109,17 +140,19 @@ public class LuceneFactoryTest {
|
|
|
|
|
|
private Map<String, Object> createMap(int i) {
|
|
|
Map<String, Object> params = new HashMap();
|
|
|
- params.put(ConfigConstant.CONFIG_MODEL_ID, "953402886828589057");
|
|
|
+ params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
|
|
|
params.put(ConfigConstant.DATA_SUCCESS, StorageDataStatusEnum.SUCCESS.getValue());
|
|
|
params.put(ConfigConstant.DATA_EVENT, ConnectorConstant.OPERTION_UPDATE);
|
|
|
+ params.put(ConfigConstant.DATA_TABLE_GROUP_ID, "" + i);
|
|
|
+ params.put(ConfigConstant.DATA_TARGET_TABLE_NAME, "MY_USER");
|
|
|
params.put(ConfigConstant.DATA_ERROR, "");
|
|
|
Map<String, Object> row = new HashMap<>();
|
|
|
row.put("id", i);
|
|
|
row.put("name", "中文");
|
|
|
row.put("tel", "15800001234");
|
|
|
- row.put("update_time", System.currentTimeMillis());
|
|
|
+ row.put("update_time", Instant.now().toEpochMilli());
|
|
|
row.put("remark", "test" + i);
|
|
|
- params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(row));
|
|
|
+ params.put(ConfigConstant.BINLOG_DATA, BinlogMessageUtil.toBinlogMap(row).toByteArray());
|
|
|
params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
|
|
|
return params;
|
|
|
}
|
|
@@ -127,6 +160,7 @@ public class LuceneFactoryTest {
|
|
|
@Test
|
|
|
public void testQuery() throws IOException {
|
|
|
int size = 3;
|
|
|
+ List<Document> docs = new ArrayList<>();
|
|
|
for (int i = size; i > 0; i--) {
|
|
|
Document doc = new Document();
|
|
|
doc.add(new StringField("id", String.valueOf(i), Field.Store.YES));
|
|
@@ -147,16 +181,18 @@ public class LuceneFactoryTest {
|
|
|
doc.add(new LongPoint("createTime", createTime));
|
|
|
doc.add(new StoredField("createTime", createTime));
|
|
|
doc.add(new NumericDocValuesField("createTime", createTime));
|
|
|
-
|
|
|
- shard.insert(doc);
|
|
|
+ docs.add(doc);
|
|
|
}
|
|
|
+ shard.insertBatch(docs);
|
|
|
// 范围查询 IntPoint.newRangeQuery("id", 1, 100)
|
|
|
// 集合查询 IntPoint.newSetQuery("id", 2, 3)
|
|
|
// 单个查询 IntPoint.newExactQuery("id", 3)
|
|
|
BooleanQuery query = new BooleanQuery.Builder()
|
|
|
.add(IntPoint.newRangeQuery("age", 1, 100), BooleanClause.Occur.MUST)
|
|
|
.build();
|
|
|
- Paging paging = shard.query(new Option(query), 1, 20, new Sort(new SortField("createTime", SortField.Type.LONG, true)));
|
|
|
+ Option option = new Option();
|
|
|
+ option.setQuery(query);
|
|
|
+ Paging paging = shard.query(option, 1, 20, new Sort(new SortField("createTime", SortField.Type.LONG, true)));
|
|
|
paging.getData().forEach(m -> System.out.println(m));
|
|
|
|
|
|
// 清空
|
|
@@ -175,7 +211,9 @@ public class LuceneFactoryTest {
|
|
|
String id = "100";
|
|
|
doc.add(new StringField("id", id, Field.Store.YES));
|
|
|
doc.add(new TextField("content", "这是一款大规模数据处理软件,名字叫做Apache Spark", Field.Store.YES));
|
|
|
- shard.insert(doc);
|
|
|
+ List<Document> docs = new ArrayList<>();
|
|
|
+ docs.add(doc);
|
|
|
+ shard.insertBatch(docs);
|
|
|
System.out.println("新增后:");
|
|
|
maps = query(new MatchAllDocsQuery());
|
|
|
maps.forEach(m -> System.out.println(m));
|
|
@@ -190,7 +228,7 @@ public class LuceneFactoryTest {
|
|
|
check();
|
|
|
|
|
|
// 删除
|
|
|
- shard.delete(new Term("id", id));
|
|
|
+ shard.deleteBatch(new Term("id", id));
|
|
|
System.out.println("删除后:");
|
|
|
maps = query(new MatchAllDocsQuery());
|
|
|
maps.forEach(m -> System.out.println(m));
|
|
@@ -213,7 +251,9 @@ public class LuceneFactoryTest {
|
|
|
doc.add(new StringField("id", id, Field.Store.YES));
|
|
|
BytesRef bytesRef = new BytesRef("中文".getBytes());
|
|
|
doc.add(new StoredField("content", bytesRef));
|
|
|
- shard.insert(doc);
|
|
|
+ List<Document> docs = new ArrayList<>();
|
|
|
+ docs.add(doc);
|
|
|
+ shard.insertBatch(docs);
|
|
|
System.out.println("新增后:");
|
|
|
maps = query(new MatchAllDocsQuery());
|
|
|
maps.forEach(m -> {
|
|
@@ -444,7 +484,9 @@ public class LuceneFactoryTest {
|
|
|
}
|
|
|
|
|
|
private List<Map> query(Query query) throws IOException {
|
|
|
- Paging paging = shard.query(new Option(query), 1, 20, null);
|
|
|
+ Option option = new Option();
|
|
|
+ option.setQuery(query);
|
|
|
+ Paging paging = shard.query(option, 1, 20, null);
|
|
|
return (List<Map>) paging.getData();
|
|
|
}
|
|
|
|