AE86 2 years ago
parent
commit
5171ec3883

+ 3 - 0
dbsyncer-storage/src/main/test/BinlogMessageTest.java

@@ -7,6 +7,7 @@ import org.dbsyncer.storage.binlog.impl.BinlogColumnValue;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.binlog.proto.EventEnum;
+import org.dbsyncer.storage.util.BinlogMessageUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -24,6 +25,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 
+import static org.dbsyncer.storage.util.BinlogMessageUtil.serializeValue;
+
 /**
  * @author AE86
  * @version 1.0.0

+ 50 - 41
dbsyncer-storage/src/main/test/ShardBinlogTest.java

@@ -1,7 +1,10 @@
 import com.google.protobuf.ByteString;
-import org.apache.lucene.document.*;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.IntPoint;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.util.BytesRef;
@@ -10,9 +13,12 @@ import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.binlog.proto.EventEnum;
+import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
 import org.dbsyncer.storage.lucene.Shard;
 import org.dbsyncer.storage.query.Option;
+import org.dbsyncer.storage.util.BinlogMessageUtil;
+import org.dbsyncer.storage.util.ParamsUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -24,6 +30,7 @@ import java.math.BigDecimal;
 import java.nio.charset.Charset;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.time.Instant;
 import java.time.LocalDateTime;
 import java.util.*;
 
@@ -51,21 +58,11 @@ public class ShardBinlogTest extends AbstractBinlogRecorder {
     @Test
     public void testBinlogMessage() throws IOException {
         List<Document> list = new ArrayList<>();
+        long now = Instant.now().toEpochMilli();
         for (int i = 1; i <= 10; i++) {
             BinlogMessage message = genMessage("123456", i + "");
-            Document doc = new Document();
-            doc.add(new LongPoint("id", i));
-            doc.add(new StoredField("id", i));
-            doc.add(new NumericDocValuesField("id", i));
-
-            doc.add(new IntPoint("s", 0));
-            doc.add(new StoredField("s", 0));
-            doc.add(new NumericDocValuesField("s", 0));
-
-            BytesRef ref = new BytesRef(message.toByteArray());
-            doc.add(new BinaryDocValuesField("content", ref));
-            doc.add(new StoredField("content", ref));
-            list.add(doc);
+            BytesRef bytes = new BytesRef(message.toByteArray());
+            list.add(ParamsUtil.convertBinlog2Doc(String.valueOf(i), BinlogConstant.READY, bytes, now));
 
             if (i % 1000 == 0) {
                 shard.insertBatch(list);
@@ -77,33 +74,49 @@ public class ShardBinlogTest extends AbstractBinlogRecorder {
             shard.insertBatch(list);
         }
         check();
-        Document doc = new Document();
-        doc.add(new LongPoint("id", 1));
-        doc.add(new StoredField("id", 1));
-        doc.add(new NumericDocValuesField("id", 1));
-
-        doc.add(new IntPoint("s", 1));
-        doc.add(new StoredField("s", 1));
-        doc.add(new NumericDocValuesField("s", 1));
-        shard.update(new Term("id", "1"), doc);
-
-        Option option = new Option(new MatchAllDocsQuery());
-        option.addIndexFieldResolverEnum("id", IndexFieldResolverEnum.LONG);
-        option.addIndexFieldResolverEnum("s", IndexFieldResolverEnum.INT);
-        option.addIndexFieldResolverEnum("content", IndexFieldResolverEnum.BINARY);
+
+        BooleanQuery query = new BooleanQuery.Builder().add(IntPoint.newSetQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST).build();
+        List<Map> maps = query(new Option(query));
+        // 更新状态为处理中
+        maps.forEach(row -> {
+            String id = (String) row.get(BinlogConstant.BINLOG_ID);
+            BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
+            try {
+                shard.update(new Term(BinlogConstant.BINLOG_ID, String.valueOf(id)), ParamsUtil.convertBinlog2Doc(id, BinlogConstant.PROCESSING, ref, now));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        // 查询【待处理】
+        query = new BooleanQuery.Builder()
+                .add(IntPoint.newSetQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST)
+                .build();
+        maps = query(new Option(query));
+        logger.info("【待处理】总条数:{}", maps.size());
+
+        // 查询【待处理】和【处理中】
+        query = new BooleanQuery.Builder()
+                .add(IntPoint.newSetQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY, BinlogConstant.PROCESSING), BooleanClause.Occur.MUST)
+                .build();
+        maps = query(new Option(query));
+        logger.info("【待处理】和【处理中】总条数:{}", maps.size());
+    }
+
+    private List<Map> query(Option option) throws IOException {
+        option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_STATUS, IndexFieldResolverEnum.INT);
+        option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_CONTENT, IndexFieldResolverEnum.BINARY);
         Paging paging = shard.query(option, 1, 10001, null);
         List<Map> maps = (List<Map>) paging.getData();
         for (Map m : maps) {
-            Long id = (Long) m.get("id");
-            Long tid = (Long) m.get("tid");
-            Integer s = (Integer) m.get("s");
-            BytesRef ref = (BytesRef) m.get("content");
+            String id = (String) m.get(BinlogConstant.BINLOG_ID);
+            Integer s = (Integer) m.get(BinlogConstant.BINLOG_STATUS);
+            BytesRef ref = (BytesRef) m.get(BinlogConstant.BINLOG_CONTENT);
             BinlogMessage message = BinlogMessage.parseFrom(ref.bytes);
             Map<String, ByteString> rowMap = message.getData().getRowMap();
-            logger.info("id:{}, tid:{}, s:{}, message:{}", id, tid, s, rowMap.get("name").toStringUtf8());
+            logger.info("id:{}, s:{}, message:{}", id, s, rowMap.get("name").toStringUtf8());
         }
-
-        logger.info("总条数:{}", paging.getTotal());
+        return maps;
     }
 
     private BinlogMessage genMessage(String tableGroupId, String key) {
@@ -125,18 +138,14 @@ public class ShardBinlogTest extends AbstractBinlogRecorder {
         BinlogMap.Builder builder = BinlogMap.newBuilder();
         data.forEach((k, v) -> {
             if (null != v) {
-                ByteString bytes = serializeValue(v);
+                ByteString bytes = BinlogMessageUtil.serializeValue(v);
                 if (null != bytes) {
                     builder.putRow(k, bytes);
                 }
             }
         });
 
-        BinlogMessage build = BinlogMessage.newBuilder()
-                .setTableGroupId(tableGroupId)
-                .setEvent(EventEnum.UPDATE)
-                .setData(builder.build())
-                .build();
+        BinlogMessage build = BinlogMessage.newBuilder().setTableGroupId(tableGroupId).setEvent(EventEnum.UPDATE).setData(builder.build()).build();
         return build;
     }