|
@@ -5,11 +5,22 @@ import org.apache.lucene.analysis.Analyzer;
|
|
|
import org.apache.lucene.analysis.TokenStream;
|
|
|
import org.apache.lucene.analysis.cn.smart.SmartChineseAnalyzer;
|
|
|
import org.apache.lucene.document.Document;
|
|
|
-import org.apache.lucene.index.*;
|
|
|
-import org.apache.lucene.search.*;
|
|
|
+import org.apache.lucene.index.DirectoryReader;
|
|
|
+import org.apache.lucene.index.IndexReader;
|
|
|
+import org.apache.lucene.index.IndexWriter;
|
|
|
+import org.apache.lucene.index.IndexWriterConfig;
|
|
|
+import org.apache.lucene.index.IndexableField;
|
|
|
+import org.apache.lucene.index.Term;
|
|
|
+import org.apache.lucene.search.IndexSearcher;
|
|
|
+import org.apache.lucene.search.Query;
|
|
|
+import org.apache.lucene.search.ScoreDoc;
|
|
|
+import org.apache.lucene.search.Sort;
|
|
|
+import org.apache.lucene.search.TopDocs;
|
|
|
import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
|
|
|
import org.apache.lucene.store.Directory;
|
|
|
import org.apache.lucene.store.FSDirectory;
|
|
|
+import org.apache.lucene.store.Lock;
|
|
|
+import org.apache.lucene.util.IOUtils;
|
|
|
import org.dbsyncer.common.model.Paging;
|
|
|
import org.dbsyncer.storage.StorageException;
|
|
|
import org.slf4j.Logger;
|
|
@@ -19,7 +30,11 @@ import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.nio.file.Path;
|
|
|
import java.nio.file.Paths;
|
|
|
-import java.util.*;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
|
|
|
/**
|
|
|
* @author AE86
|
|
@@ -54,40 +69,23 @@ public class Shard {
|
|
|
directory = FSDirectory.open(dir);
|
|
|
// 分词器
|
|
|
analyzer = new SmartChineseAnalyzer();
|
|
|
- // 创建索引写入配置
|
|
|
- config = new IndexWriterConfig(analyzer);
|
|
|
- // 默认32M, 减少合并次数
|
|
|
- config.setRAMBufferSizeMB(32);
|
|
|
- // 创建索引写入对象
|
|
|
- indexWriter = new IndexWriter(directory, config);
|
|
|
- // 创建索引的读取器
|
|
|
- indexReader = DirectoryReader.open(indexWriter);
|
|
|
+ reopen();
|
|
|
} catch (IOException e) {
|
|
|
throw new StorageException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void insert(Document doc) throws IOException {
|
|
|
- execute(doc, () -> indexWriter.addDocument(doc));
|
|
|
- }
|
|
|
-
|
|
|
- public void insertBatch(List<Document> docs) throws IOException {
|
|
|
+ public void insertBatch(List<Document> docs) {
|
|
|
execute(docs, () -> indexWriter.addDocuments(docs));
|
|
|
}
|
|
|
|
|
|
- public void update(Term term, Document doc) throws IOException {
|
|
|
+ public void update(Term term, Document doc) {
|
|
|
if (null != term) {
|
|
|
execute(doc, () -> indexWriter.updateDocument(term, doc));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void delete(Term term) throws IOException {
|
|
|
- if (null != term) {
|
|
|
- execute(term, () -> indexWriter.deleteDocuments(term));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void deleteBatch(Term... terms) throws IOException {
|
|
|
+ public void deleteBatch(Term... terms) {
|
|
|
if (null != terms) {
|
|
|
execute(terms, () -> indexWriter.deleteDocuments(terms));
|
|
|
}
|
|
@@ -203,14 +201,50 @@ public class Shard {
|
|
|
return list;
|
|
|
}
|
|
|
|
|
|
- private void execute(Object value, Callback callback) throws IOException {
|
|
|
- if (null != value && indexWriter.isOpen()) {
|
|
|
- callback.execute();
|
|
|
- indexWriter.flush();
|
|
|
- indexWriter.commit();
|
|
|
+ private void execute(Object value, Callback callback) {
|
|
|
+ if (value == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (indexWriter.isOpen()) {
|
|
|
+ try {
|
|
|
+ doExecute(callback);
|
|
|
+ } catch (IOException e) {
|
|
|
+ // 程序异常导致文件锁未关闭 java.nio.channels.ClosedChannelException
|
|
|
+ logger.error(String.format("索引异常:%s", indexPath.getAbsolutePath()), e);
|
|
|
+ }
|
|
|
return;
|
|
|
}
|
|
|
- logger.error(value.toString());
|
|
|
+
|
|
|
+ // 索引异常关闭
|
|
|
+ try {
|
|
|
+ reopen();
|
|
|
+ doExecute(callback);
|
|
|
+ } catch (IOException e) {
|
|
|
+ // 重试失败打印异常数据
|
|
|
+ logger.error(value.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void doExecute(Callback callback) throws IOException {
|
|
|
+ callback.execute();
|
|
|
+ indexWriter.flush();
|
|
|
+ indexWriter.commit();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void reopen() throws IOException {
|
|
|
+ Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME);
|
|
|
+ if (writeLock != null) {
|
|
|
+ IOUtils.close(writeLock); // release write lock
|
|
|
+ }
|
|
|
+ // 创建索引写入配置
|
|
|
+ config = new IndexWriterConfig(analyzer);
|
|
|
+ // 默认32M, 减少合并次数
|
|
|
+ config.setRAMBufferSizeMB(32);
|
|
|
+ // 创建索引写入对象
|
|
|
+ indexWriter = new IndexWriter(directory, config);
|
|
|
+ // 创建索引的读取器
|
|
|
+ indexReader = DirectoryReader.open(indexWriter);
|
|
|
}
|
|
|
|
|
|
interface Callback {
|