AE86 2 năm trước cách đây
mục cha
commit
cd5483ca82

+ 3 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -42,6 +42,8 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
     private static final long CONTEXT_PERIOD = 10_000;
 
+    private static final ByteBuffer buffer = ByteBuffer.allocate(8);
+
     private static final BinlogColumnValue value = new BinlogColumnValue();
 
     private final Lock lock = new ReentrantLock(true);
@@ -84,7 +86,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
     @Override
     public void run() {
-        if (running || !getQueue().isEmpty()) {
+        if (running || getQueue().size() > 500) {
             return;
         }
 
@@ -132,8 +134,6 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         }
     }
 
-    private final ByteBuffer buffer = ByteBuffer.allocate(8);
-
     /**
      * Java语言提供了八种基本类型,六种数字类型(四个整数型,两个浮点型),一种字符类型,一种布尔型。
      * <p>

+ 8 - 11
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -115,19 +115,9 @@ public class Shard {
         return analyzer;
     }
 
-    public List<Map> query(Query query) throws IOException {
-        final IndexSearcher searcher = getSearcher();
-        final TopDocs topDocs = searcher.search(query, MAX_SIZE);
-        return search(searcher, topDocs, new Option(), 1, 20);
-    }
-
-    public Paging query(Query query, Sort sort) throws IOException {
-        return query(new Option(query), 1, 20, sort);
-    }
-
     public Paging query(Option option, int pageNum, int pageSize, Sort sort) throws IOException {
         final IndexSearcher searcher = getSearcher();
-        final TopDocs topDocs = searcher.search(option.getQuery(), MAX_SIZE, sort);
+        final TopDocs topDocs = getTopDocs(searcher, option.getQuery(), MAX_SIZE, sort);
         Paging paging = new Paging(pageNum, pageSize);
         List<Map> data = search(searcher, topDocs, option, pageNum, pageSize);
         paging.setTotal(topDocs.totalHits);
@@ -135,6 +125,13 @@ public class Shard {
         return paging;
     }
 
+    private TopDocs getTopDocs(IndexSearcher searcher, Query query, int maxSize, Sort sort) throws IOException {
+        if (null != sort) {
+            return searcher.search(query, maxSize, sort);
+        }
+        return searcher.search(query, maxSize);
+    }
+
     /**
      * 执行查询
      *

+ 21 - 9
dbsyncer-storage/src/main/test/BinlogMessageTest.java

@@ -1,4 +1,5 @@
 import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.io.IOUtils;
 import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
 import org.dbsyncer.storage.binlog.BinlogContext;
@@ -50,16 +51,27 @@ public class BinlogMessageTest {
 
     @Test
     public void testBinlogMessage() throws IOException {
-        write("123456", "abc");
-        write("000111", "xyz");
-        write("888999", "jkl");
+        for (int i = 0; i < 10000; i++) {
+            write("123456", i+"");
+        }
+        //write("000111", "xyz");
+        //write("888999", "jkl");
 
         byte[] line;
+        int count = 0;
         while (null != (line = context.readLine())) {
-            logger.info("size:{}, {}", line.length, line);
-            BinlogMessage message = BinlogMessage.parseFrom(line);
-            logger.info(message.toString());
+            //logger.info("size:{}, {}", line.length, line);
+            try {
+                BinlogMessage message = BinlogMessage.parseFrom(line);
+                if(null != message){
+                    count ++;
+                    message.getData();
+                }
+            } catch (InvalidProtocolBufferException e) {
+                logger.info("{} : {}", line.length, line);
+            }
         }
+        logger.info("总条数:{}", count);
         context.flush();
     }
 
@@ -94,9 +106,9 @@ public class BinlogMessageTest {
                 .setEvent(EventEnum.UPDATE)
                 .setData(builder.build())
                 .build();
-        byte[] bytes = build.toByteArray();
-        logger.info("序列化长度:{}", bytes.length);
-        logger.info("{}", bytes);
+        //byte[] bytes = build.toByteArray();
+        //logger.info("序列化长度:{}", bytes.length);
+        //logger.info("{}", bytes);
         context.write(build);
     }
 

+ 44 - 12
dbsyncer-storage/src/main/test/LuceneFactoryTest.java

@@ -12,6 +12,7 @@ import org.apache.lucene.search.highlight.Highlighter;
 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
 import org.apache.lucene.search.highlight.QueryScorer;
 import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
+import org.apache.lucene.util.BytesRef;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
@@ -155,7 +156,7 @@ public class LuceneFactoryTest {
         BooleanQuery query = new BooleanQuery.Builder()
                 .add(IntPoint.newRangeQuery("age", 1, 100), BooleanClause.Occur.MUST)
                 .build();
-        Paging paging = shard.query(query, new Sort(new SortField("createTime", SortField.Type.LONG, true)));
+        Paging paging = shard.query(new Option(query), 1, 20, new Sort(new SortField("createTime", SortField.Type.LONG, true)));
         paging.getData().forEach(m -> System.out.println(m));
 
         // 清空
@@ -165,7 +166,7 @@ public class LuceneFactoryTest {
     @Test
     public void testCURD() throws IOException {
         System.out.println("测试前:");
-        List<Map> maps = shard.query(new MatchAllDocsQuery());
+        List<Map> maps = query(new MatchAllDocsQuery());
         maps.forEach(m -> System.out.println(m));
         check();
 
@@ -176,7 +177,7 @@ public class LuceneFactoryTest {
         doc.add(new TextField("content", "这是一款大规模数据处理软件,名字叫做Apache Spark", Field.Store.YES));
         shard.insert(doc);
         System.out.println("新增后:");
-        maps = shard.query(new MatchAllDocsQuery());
+        maps = query(new MatchAllDocsQuery());
         maps.forEach(m -> System.out.println(m));
         check();
 
@@ -184,14 +185,14 @@ public class LuceneFactoryTest {
         doc.add(new TextField("content", "这是一款大规模数据处理软件,名字叫做Apache Spark[已修改]", Field.Store.YES));
         shard.update(new Term("id", id), doc);
         System.out.println("修改后:");
-        maps = shard.query(new MatchAllDocsQuery());
+        maps = query(new MatchAllDocsQuery());
         maps.forEach(m -> System.out.println(m));
         check();
 
         // 删除
         shard.delete(new Term("id", id));
         System.out.println("删除后:");
-        maps = shard.query(new MatchAllDocsQuery());
+        maps = query(new MatchAllDocsQuery());
         maps.forEach(m -> System.out.println(m));
         check();
 
@@ -199,6 +200,32 @@ public class LuceneFactoryTest {
         shard.deleteAll();
     }
 
+    @Test
+    public void testBinary() throws IOException {
+        System.out.println("测试前:");
+        List<Map> maps = query(new MatchAllDocsQuery());
+        maps.forEach(m -> System.out.println(m));
+        check();
+
+        // 新增
+        Document doc = new Document();
+        String id = "100";
+        doc.add(new StringField("id", id, Field.Store.YES));
+        BytesRef bytesRef = new BytesRef("中文".getBytes());
+        doc.add(new StoredField("content", bytesRef));
+        shard.insert(doc);
+        System.out.println("新增后:");
+        maps = query(new MatchAllDocsQuery());
+        maps.forEach(m -> {
+            m.get("content");
+            System.out.println(m);
+        });
+        check();
+
+        // 清空
+        shard.deleteAll();
+    }
+
     /**
      * 按词条搜索
      * <p>
@@ -214,7 +241,7 @@ public class LuceneFactoryTest {
         TermQuery query = new TermQuery(new Term(searchField, "Spark"));
 
         //执行查询,并打印查询到的记录数
-        shard.query(query);
+        query(query);
     }
 
     /**
@@ -254,7 +281,7 @@ public class LuceneFactoryTest {
         BooleanQuery query = builder.build();
 
         //执行查询,并打印查询到的记录数
-        shard.query(query);
+        query(query);
     }
 
     /**
@@ -272,7 +299,7 @@ public class LuceneFactoryTest {
         Query query = new PrefixQuery(term);
 
         //执行查询,并打印查询到的记录数
-        shard.query(query);
+        query(query);
     }
 
     /**
@@ -297,7 +324,7 @@ public class LuceneFactoryTest {
         PhraseQuery phraseQuery = builder.build();
 
         //执行查询,并打印查询到的记录数
-        shard.query(phraseQuery);
+        query(phraseQuery);
     }
 
     /**
@@ -315,7 +342,7 @@ public class LuceneFactoryTest {
         Query query = new FuzzyQuery(t);
 
         //执行查询,并打印查询到的记录数
-        shard.query(query);
+        query(query);
     }
 
     /**
@@ -332,7 +359,7 @@ public class LuceneFactoryTest {
         Query query = new WildcardQuery(term);
 
         //执行查询,并打印查询到的记录数
-        shard.query(query);
+        query(query);
     }
 
     /**
@@ -354,7 +381,7 @@ public class LuceneFactoryTest {
         Query query = parser.parse("Spark");
 
         //执行查询,并打印查询到的记录数
-        shard.query(query);
+        query(query);
     }
 
     /**
@@ -416,6 +443,11 @@ public class LuceneFactoryTest {
         }
     }
 
+    private List<Map> query(Query query) throws IOException {
+        Paging paging = shard.query(new Option(query), 1, 20, null);
+        return (List<Map>) paging.getData();
+    }
+
     private void check() throws IOException {
         final IndexSearcher searcher = shard.getSearcher();
         IndexReader reader = searcher.getIndexReader();