Răsfoiți Sursa

fixed indexwriter is closed

AE86 3 ani în urmă
părinte
comite
3f4f5edacb

+ 5 - 0
dbsyncer-storage/pom.xml

@@ -32,6 +32,11 @@
             <version>${project.parent.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-log4j2</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>

+ 31 - 15
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -12,6 +12,8 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.storage.query.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -26,6 +28,8 @@ import java.util.*;
  */
 public class Shard {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
     private File indexPath;
 
     private Directory directory;
@@ -60,36 +64,27 @@ public class Shard {
     }
 
     public void insert(Document doc) throws IOException {
-        if (null != doc) {
-            indexWriter.addDocument(doc);
-            indexWriter.commit();
-        }
+        execute(doc, () -> indexWriter.addDocument(doc));
     }
 
     public void insertBatch(List<Document> docs) throws IOException {
-        if (null != docs) {
-            indexWriter.addDocuments(docs);
-            indexWriter.commit();
-        }
+        execute(docs, () -> indexWriter.addDocuments(docs));
     }
 
     public void update(Term term, Document doc) throws IOException {
-        if (null != term && null != doc) {
-            indexWriter.updateDocument(term, doc);
-            indexWriter.commit();
+        if (null != term) {
+            execute(doc, () -> indexWriter.updateDocument(term, doc));
         }
     }
 
     public void delete(Term term) throws IOException {
         if (null != term) {
-            indexWriter.deleteDocuments(term);
-            indexWriter.commit();
+            execute(term, () -> indexWriter.deleteDocuments(term));
         }
     }
 
     public void deleteAll() throws IOException {
-        indexWriter.deleteAll();
-        indexWriter.commit();
+        // Fix Bug: this IndexReader is closed. 直接删除文件
         close();
         directory.close();
         FileUtils.deleteDirectory(indexPath);
@@ -195,4 +190,25 @@ public class Shard {
         return list;
     }
 
+    private void execute(Object value, Callback callback) throws IOException {
+        if (null != value) {
+            if (indexWriter.isOpen()) {
+                callback.execute();
+                indexWriter.commit();
+                return;
+            }
+            logger.error(value.toString());
+        }
+    }
+
+    interface Callback {
+
+        /**
+         * 索引回执
+         *
+         * @throws IOException
+         */
+        void execute() throws IOException;
+    }
+
 }

+ 104 - 37
dbsyncer-storage/src/main/test/LuceneFactoryTest.java

@@ -1,10 +1,10 @@
-import org.apache.commons.lang.math.RandomUtils;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.TokenStream;
 import org.apache.lucene.analysis.cn.smart.SmartChineseAnalyzer;
 import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
 import org.apache.lucene.document.*;
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.queryparser.classic.ParseException;
 import org.apache.lucene.queryparser.classic.QueryParser;
@@ -14,22 +14,33 @@ import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
 import org.apache.lucene.search.highlight.QueryScorer;
 import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
 import org.dbsyncer.common.model.Paging;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.RandomUtil;
+import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 import org.dbsyncer.storage.lucene.Shard;
+import org.dbsyncer.storage.query.Option;
+import org.dbsyncer.storage.util.ParamsUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.StringReader;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
+import java.time.Instant;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.*;
 
 public class LuceneFactoryTest {
 
-    private Shard shard;
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+    private       Shard  shard;
 
     @Before
     public void setUp() throws IOException {
@@ -39,6 +50,78 @@ public class LuceneFactoryTest {
     @After
     public void tearDown() throws IOException {
         shard.close();
+        shard.deleteAll();
+    }
+
+    @Test
+    public void testConcurrentUpdate() throws InterruptedException {
+        // 模拟并发
+        final int threadSize = 100;
+        final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
+        final CyclicBarrier barrier = new CyclicBarrier(threadSize);
+        final CountDownLatch latch = new CountDownLatch(threadSize);
+        for (int i = 0; i < threadSize; i++) {
+            final int k = i + 3;
+            pool.submit(() -> {
+                try {
+                    barrier.await();
+
+                    // 模拟操作
+                    System.out.println(String.format("%s:%s", Thread.currentThread().getName(), k));
+
+                    Document update = ParamsUtil.convertData2Doc(createMap(k));
+                    IndexableField field = update.getField(ConfigConstant.CONFIG_MODEL_ID);
+                    shard.update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), update);
+
+                } catch (InterruptedException e) {
+                    logger.error(e.getMessage());
+                } catch (BrokenBarrierException e) {
+                    logger.error(e.getMessage());
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+
+        try {
+            latch.await();
+            logger.info("try to shutdown");
+            pool.shutdown();
+            check();
+            Sort sort = new Sort(new SortField(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, SortField.Type.LONG, true),
+                    new SortField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, SortField.Type.LONG, true));
+            Paging paging = shard.query(new Option(new MatchAllDocsQuery()), 1, 20, sort);
+            if (!CollectionUtils.isEmpty(paging.getData())) {
+                List<Map> data = (List<Map>) paging.getData();
+                data.stream().forEach(r -> System.out.println(r));
+            }
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        TimeUnit.SECONDS.sleep(3);
+        logger.info("test end");
+    }
+
+    private Map<String, Object> createMap(int i) {
+        Map<String, Object> params = new HashMap();
+        params.put(ConfigConstant.CONFIG_MODEL_ID, "953402886828589057");
+        params.put(ConfigConstant.DATA_SUCCESS, StorageDataStatusEnum.SUCCESS.getValue());
+        params.put(ConfigConstant.DATA_EVENT, ConnectorConstant.OPERTION_UPDATE);
+        params.put(ConfigConstant.DATA_ERROR, "");
+        Map<String, Object> row = new HashMap<>();
+        row.put("id", "1");
+        row.put("name", "中文");
+        row.put("tel", "15800001234");
+        row.put("update_time", System.currentTimeMillis());
+        row.put("remark", "test" + i);
+        params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(row));
+        params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
+        return params;
     }
 
     @Test
@@ -51,7 +134,7 @@ public class LuceneFactoryTest {
             doc.add(new TextField("content", "这是一串很长长长长长长长的文本", Field.Store.YES));
 
             // 创建索引
-            int age = RandomUtils.nextInt(50);
+            int age = RandomUtil.nextInt(0, 50);
             doc.add(new IntPoint("age", age));
             // 需要存储内容
             doc.add(new StoredField("age", age));
@@ -117,26 +200,16 @@ public class LuceneFactoryTest {
         shard.deleteAll();
     }
 
-    @Test
-    public void fmtDate() {
-        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-        LocalDateTime localDateTime = LocalDateTime.parse("2020-05-23 12:00:00", formatter);
-        long timeStamp = localDateTime.toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
-        System.out.println(timeStamp);
-    }
-
     /**
      * 按词条搜索
      * <p>
-     * TermQuery是最简单、也是最常用的Query。TermQuery可以理解成为“词条搜索”,
-     * 在搜索引擎中最基本的搜索就是在索引中搜索某一词条,而TermQuery就是用来完成这项工作的。
-     * 在Lucene中词条是最基本的搜索单位,从本质上来讲一个词条其实就是一个名/值对。
-     * 只不过这个“名”是字段名,而“值”则表示字段中所包含的某个关键字。
+     * TermQuery是最简单、也是最常用的Query。TermQuery可以理解成为“词条搜索”, 在搜索引擎中最基本的搜索就是在索引中搜索某一词条,而TermQuery就是用来完成这项工作的。
+     * 在Lucene中词条是最基本的搜索单位,从本质上来讲一个词条其实就是一个名/值对。 只不过这个“名”是字段名,而“值”则表示字段中所包含的某个关键字。
      *
      * @throws IOException
      */
     @Test
-    public void termQueryTest() throws IOException {
+    public void testQueryTerm() throws IOException {
         String searchField = "title";
         //这是一个条件查询的api,用于添加条件
         TermQuery query = new TermQuery(new Term(searchField, "Spark"));
@@ -147,16 +220,14 @@ public class LuceneFactoryTest {
 
     /**
      * 多条件查询
-     *
-     * BooleanQuery也是实际开发过程中经常使用的一种Query。
-     * 它其实是一个组合的Query,在使用时可以把各种Query对象添加进去并标明它们之间的逻辑关系。
-     * BooleanQuery本身来讲是一个布尔子句的容器,它提供了专门的API方法往其中添加子句,
+     * <p>
+     * BooleanQuery也是实际开发过程中经常使用的一种Query。 它其实是一个组合的Query,在使用时可以把各种Query对象添加进去并标明它们之间的逻辑关系。 BooleanQuery本身来讲是一个布尔子句的容器,它提供了专门的API方法往其中添加子句,
      * 并标明它们之间的关系,以下代码为BooleanQuery提供的用于添加子句的API接口:
      *
      * @throws IOException
      */
     @Test
-    public void BooleanQueryTest() throws IOException {
+    public void testQueryBoolean() throws IOException {
 
         String searchField1 = "title";
         String searchField2 = "content";
@@ -196,7 +267,7 @@ public class LuceneFactoryTest {
      * @throws IOException
      */
     @Test
-    public void prefixQueryTest() throws IOException {
+    public void testQueryPrefix() throws IOException {
         String searchField = "title";
         Term term = new Term(searchField, "Spar");
         Query query = new PrefixQuery(term);
@@ -208,16 +279,13 @@ public class LuceneFactoryTest {
     /**
      * 短语搜索
      * <p>
-     * 所谓PhraseQuery,就是通过短语来检索,比如我想查“big car”这个短语,
-     * 那么如果待匹配的document的指定项里包含了"big car"这个短语,
-     * 这个document就算匹配成功。可如果待匹配的句子里包含的是“big black car”,
-     * 那么就无法匹配成功了,如果也想让这个匹配,就需要设定slop,
-     * 先给出slop的概念:slop是指两个项的位置之间允许的最大间隔距离
+     * 所谓PhraseQuery,就是通过短语来检索,比如我想查“big car”这个短语, 那么如果待匹配的document的指定项里包含了"big car"这个短语, 这个document就算匹配成功。可如果待匹配的句子里包含的是“big black car”,
+     * 那么就无法匹配成功了,如果也想让这个匹配,就需要设定slop, 先给出slop的概念:slop是指两个项的位置之间允许的最大间隔距离
      *
      * @throws IOException
      */
     @Test
-    public void phraseQueryTest() throws IOException {
+    public void testQueryPhrase() throws IOException {
 
         String searchField = "content";
         String query1 = "apache";
@@ -241,7 +309,7 @@ public class LuceneFactoryTest {
      * @throws IOException
      */
     @Test
-    public void fuzzyQueryTest() throws IOException {
+    public void testQueryFuzzy() throws IOException {
 
         String searchField = "content";
         Term t = new Term(searchField, "大规模");
@@ -254,13 +322,12 @@ public class LuceneFactoryTest {
     /**
      * 通配符搜索(IO影响较大,不建议使用)
      * <p>
-     * Lucene也提供了通配符的查询,这就是WildcardQuery。
-     * 通配符“?”代表1个字符,而“*”则代表0至多个字符。
+     * Lucene也提供了通配符的查询,这就是WildcardQuery。 通配符“?”代表1个字符,而“*”则代表0至多个字符。
      *
      * @throws IOException
      */
     @Test
-    public void wildcardQueryTest() throws IOException {
+    public void testQueryWildcard() throws IOException {
         String searchField = "content";
         Term term = new Term(searchField, "大*规模");
         Query query = new WildcardQuery(term);
@@ -276,7 +343,7 @@ public class LuceneFactoryTest {
      * @throws ParseException
      */
     @Test
-    public void queryParserTest() throws IOException, ParseException {
+    public void testQueryParser() throws IOException, ParseException {
         final Analyzer analyzer = shard.getAnalyzer();
         String searchField = "content";
 
@@ -297,7 +364,7 @@ public class LuceneFactoryTest {
      * @throws IOException
      */
     @Test
-    public void HighlighterTest() throws IOException, ParseException, InvalidTokenOffsetsException {
+    public void testHighlighter() throws IOException, ParseException, InvalidTokenOffsetsException {
         final Analyzer analyzer = shard.getAnalyzer();
         final IndexSearcher searcher = shard.getSearcher();