|
@@ -3,7 +3,10 @@ package org.dbsyncer.storage.support;
|
|
import org.apache.lucene.document.Document;
|
|
import org.apache.lucene.document.Document;
|
|
import org.apache.lucene.index.IndexableField;
|
|
import org.apache.lucene.index.IndexableField;
|
|
import org.apache.lucene.index.Term;
|
|
import org.apache.lucene.index.Term;
|
|
-import org.apache.lucene.search.PrefixQuery;
|
|
|
|
|
|
+import org.apache.lucene.search.BooleanClause;
|
|
|
|
+import org.apache.lucene.search.BooleanQuery;
|
|
|
|
+import org.apache.lucene.search.MatchAllDocsQuery;
|
|
|
|
+import org.apache.lucene.search.TermQuery;
|
|
import org.dbsyncer.common.util.CollectionUtils;
|
|
import org.dbsyncer.common.util.CollectionUtils;
|
|
import org.dbsyncer.storage.AbstractStorageService;
|
|
import org.dbsyncer.storage.AbstractStorageService;
|
|
import org.dbsyncer.storage.StorageException;
|
|
import org.dbsyncer.storage.StorageException;
|
|
@@ -60,15 +63,28 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
|
|
@Override
|
|
@Override
|
|
public List<Map> select(String collectionId, Query query) {
|
|
public List<Map> select(String collectionId, Query query) {
|
|
Shard shard = map.get(collectionId);
|
|
Shard shard = map.get(collectionId);
|
|
- if(null != shard){
|
|
|
|
|
|
+
|
|
|
|
+ // 检查是否存在历史
|
|
|
|
+ if (null == shard) {
|
|
|
|
+ shard = cacheShardIfExist(collectionId);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (null != shard) {
|
|
try {
|
|
try {
|
|
|
|
+ int pageNum = query.getPageNum() <= 0 ? 1 : query.getPageNum();
|
|
|
|
+ int pageSize = query.getPageSize() <= 0 ? 20 : query.getPageSize();
|
|
|
|
+ // 设置参数
|
|
List<Param> params = query.getParams();
|
|
List<Param> params = query.getParams();
|
|
if (!CollectionUtils.isEmpty(params)) {
|
|
if (!CollectionUtils.isEmpty(params)) {
|
|
- Param p = params.get(0);
|
|
|
|
- Term term = new Term(p.getKey(), (String) p.getValue());
|
|
|
|
- PrefixQuery q = new PrefixQuery(term);
|
|
|
|
- return shard.prefixQuery(q);
|
|
|
|
|
|
+ BooleanQuery.Builder builder = new BooleanQuery.Builder();
|
|
|
|
+ params.forEach(p ->
|
|
|
|
+ builder.add(new TermQuery(new Term(p.getKey(), p.getValue())), BooleanClause.Occur.MUST)
|
|
|
|
+ );
|
|
|
|
+ BooleanQuery q = builder.build();
|
|
|
|
+ return shard.query(q, pageNum, pageSize);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ return shard.query(new MatchAllDocsQuery(), pageNum, pageSize);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
logger.error(e.getMessage());
|
|
logger.error(e.getMessage());
|
|
}
|
|
}
|
|
@@ -97,27 +113,34 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
|
|
map.get(collectionId).delete(new Term(ConfigConstant.CONFIG_MODEL_ID, id));
|
|
map.get(collectionId).delete(new Term(ConfigConstant.CONFIG_MODEL_ID, id));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public void deleteAll(String collectionId) throws IOException {
|
|
|
|
+ Shard shard = map.get(collectionId);
|
|
|
|
+ if (null != shard) {
|
|
|
|
+ shard.deleteAll();
|
|
|
|
+ map.remove(collectionId);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public void insertLog(String collectionId, Map<String, Object> params) throws IOException {
|
|
public void insertLog(String collectionId, Map<String, Object> params) throws IOException {
|
|
-// createShardIfNotExist(collectionId);
|
|
|
|
- // TODO 实现日志写入
|
|
|
|
- logger.info(params.toString());
|
|
|
|
|
|
+ createShardIfNotExist(collectionId);
|
|
|
|
+ Document doc = ParamsUtil.convertLog2Doc(params);
|
|
|
|
+ map.get(collectionId).insert(doc);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void insertData(String collectionId, List<Map> list) throws IOException {
|
|
public void insertData(String collectionId, List<Map> list) throws IOException {
|
|
-// createShardIfNotExist(collectionId);
|
|
|
|
- // TODO 实现数据写入
|
|
|
|
- logger.info(list.toString());
|
|
|
|
-// List<Document> docs = list.parallelStream().map(r -> ParamsUtil.convertData2Doc(r)).collect(Collectors.toList());
|
|
|
|
-// map.get(collectionId).insertBatch(docs);
|
|
|
|
|
|
+ createShardIfNotExist(collectionId);
|
|
|
|
+ List<Document> docs = list.parallelStream().map(r -> ParamsUtil.convertData2Doc(r)).collect(Collectors.toList());
|
|
|
|
+ map.get(collectionId).insertBatch(docs);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* 如果不存在分片则创建(线程安全)
|
|
* 如果不存在分片则创建(线程安全)
|
|
- *<p>/data/config</p>
|
|
|
|
- *<p>/data/log</p>
|
|
|
|
- *<p>/data/data/123</p>
|
|
|
|
|
|
+ * <p>/data/config</p>
|
|
|
|
+ * <p>/data/log</p>
|
|
|
|
+ * <p>/data/data/123</p>
|
|
*
|
|
*
|
|
* @param collectionId
|
|
* @param collectionId
|
|
* @throws IOException
|
|
* @throws IOException
|
|
@@ -128,4 +151,16 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private Shard cacheShardIfExist(String collectionId) {
|
|
|
|
+ String path = PATH + collectionId;
|
|
|
|
+ if (new File(path).exists()) {
|
|
|
|
+ try {
|
|
|
|
+ map.putIfAbsent(collectionId, new Shard(path));
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ logger.error(e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return map.get(collectionId);
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|