Browse Source

优化binlog记录

AE86 2 years ago
parent
commit
b7358f7e31

+ 29 - 6
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -3,16 +3,18 @@ package org.dbsyncer.storage.binlog;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.BooleanClause;
 import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
 import org.apache.lucene.util.BytesRef;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.storage.binlog.impl.BinlogColumnValue;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
@@ -27,7 +29,9 @@ import org.springframework.beans.factory.annotation.Autowired;
 import javax.annotation.PostConstruct;
 import java.io.File;
 import java.io.IOException;
+import java.sql.Timestamp;
 import java.time.Instant;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -53,9 +57,12 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
     private static final int SUBMIT_COUNT = 1000;
 
+    private static final int MAX_PROCESSING_SECONDS = 60;
+
     private static final Queue<BinlogMessage> queue = new LinkedBlockingQueue(10000);
 
-    private static final String PATH = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar).append("data").append(File.separatorChar).append("data").append(File.separatorChar).toString();
+    private static final String PATH = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar).append("data").append(
+            File.separatorChar).append("data").append(File.separatorChar).toString();
 
     private Shard shard;
 
@@ -135,7 +142,8 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
             while (!queue.isEmpty() && count < SUBMIT_COUNT) {
                 BinlogMessage message = queue.poll();
                 if (null != message) {
-                    tasks.add(ParamsUtil.convertBinlog2Doc(String.valueOf(snowflakeIdWorker.nextId()), BinlogConstant.READY, new BytesRef(message.toByteArray()), now));
+                    tasks.add(ParamsUtil.convertBinlog2Doc(String.valueOf(snowflakeIdWorker.nextId()), BinlogConstant.READY,
+                            new BytesRef(message.toByteArray()), now));
                 }
                 count++;
             }
@@ -184,10 +192,23 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         }
 
         private void doParse() throws IOException {
-            BooleanQuery query = new BooleanQuery.Builder().add(IntPoint.newSetQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST).build();
+            //  查询[待处理] 或 [处理中 & 处理超时]
+            long maxProcessingSeconds = Timestamp.valueOf(LocalDateTime.now().minusSeconds(MAX_PROCESSING_SECONDS)).getTime();
+            BooleanQuery query = new BooleanQuery.Builder()
+                    .add(new BooleanQuery.Builder()
+                            .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST)
+                            .build(), BooleanClause.Occur.SHOULD)
+                    .add(new BooleanQuery.Builder()
+                            .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.PROCESSING), BooleanClause.Occur.MUST)
+                            .add(LongPoint.newRangeQuery(BinlogConstant.BINLOG_TIME, Long.MIN_VALUE, maxProcessingSeconds), BooleanClause.Occur.MUST)
+                            .build(), BooleanClause.Occur.SHOULD)
+                    .build();
             Option option = new Option(query);
             option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_CONTENT, IndexFieldResolverEnum.BINARY);
-            Paging paging = shard.query(option, 1, SUBMIT_COUNT, null);
+
+            // 优先处理最早记录
+            Sort sort = new Sort(new SortField(BinlogConstant.BINLOG_TIME, SortField.Type.LONG));
+            Paging paging = shard.query(option, 1, SUBMIT_COUNT, sort);
             if (CollectionUtils.isEmpty(paging.getData())) {
                 return;
             }
@@ -195,6 +216,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
             List<Map> list = (List<Map>) paging.getData();
             int size = list.size();
             List<Message> messageList = new ArrayList<>();
+            List<Document> docs = new ArrayList<>();
             long now = Instant.now().toEpochMilli();
             Map row = null;
             for (int i = 0; i < size; i++) {
@@ -206,12 +228,13 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
                     if (null != message) {
                         messageList.add(message);
                     }
-                    shard.update(new Term(BinlogConstant.BINLOG_ID, String.valueOf(id)), ParamsUtil.convertBinlog2Doc(id, BinlogConstant.PROCESSING, ref, now));
+                    docs.add(ParamsUtil.convertBinlog2Doc(id, BinlogConstant.PROCESSING, ref, now));
                 } catch (InvalidProtocolBufferException e) {
                     logger.error(e.getMessage());
                 }
             }
 
+            shard.insertBatch(docs);
             getQueue().addAll(messageList);
         }
     }

+ 65 - 32
dbsyncer-storage/src/main/test/ShardBinlogTest.java

@@ -1,14 +1,13 @@
 import com.google.protobuf.ByteString;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.Term;
-import org.apache.lucene.search.BooleanClause;
-import org.apache.lucene.search.BooleanQuery;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.*;
 import org.apache.lucene.util.BytesRef;
 import org.dbsyncer.common.model.Paging;
+import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.storage.binlog.AbstractBinlogRecorder;
 import org.dbsyncer.storage.binlog.proto.BinlogMap;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
@@ -33,6 +32,7 @@ import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author AE86
@@ -56,10 +56,48 @@ public class ShardBinlogTest extends AbstractBinlogRecorder {
     }
 
     @Test
-    public void testBinlogMessage() throws IOException {
+    public void testBinlogMessage() throws IOException, InterruptedException {
+        mockData(1);
+
+        // 查询[待处理] 或 [处理中 且 处理超时]
+        List<Map> maps = queryReadyAndProcess();
+        logger.info("总条数:{}", maps.size());
+        TimeUnit.SECONDS.sleep(1);
+        markProcessing(maps);
+        logger.info("标记处理中");
+
+        // 模拟新记录
+        TimeUnit.SECONDS.sleep(1);
+        mockData(6);
+
+        maps = queryReadyAndProcess();
+        logger.info("【待处理】和【处理中 且 处理超时】总条数:{}", maps.size());
+
+        logger.info("模拟处理超时,等待10s");
+        TimeUnit.SECONDS.sleep(10);
+
+        maps = queryReadyAndProcess();
+        logger.info("【待处理】和【处理中 且 处理超时】总条数:{}", maps.size());
+    }
+
+    private void markProcessing(List<Map> maps) {
+        long updateTime = Instant.now().toEpochMilli();
+        maps.forEach(row -> {
+            String id = (String) row.get(BinlogConstant.BINLOG_ID);
+            BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
+            try {
+                shard.update(new Term(BinlogConstant.BINLOG_ID, String.valueOf(id)), ParamsUtil.convertBinlog2Doc(id, BinlogConstant.PROCESSING, ref, updateTime));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    private void mockData(int i) throws IOException {
         List<Document> list = new ArrayList<>();
         long now = Instant.now().toEpochMilli();
-        for (int i = 1; i <= 10; i++) {
+        int size = i + 5;
+        while (i < size){
             BinlogMessage message = genMessage("123456", i + "");
             BytesRef bytes = new BytesRef(message.toByteArray());
             list.add(ParamsUtil.convertBinlog2Doc(String.valueOf(i), BinlogConstant.READY, bytes, now));
@@ -68,53 +106,48 @@ public class ShardBinlogTest extends AbstractBinlogRecorder {
                 shard.insertBatch(list);
                 list.clear();
             }
+            i++;
         }
 
         if (!list.isEmpty()) {
             shard.insertBatch(list);
         }
         check();
+    }
 
-        BooleanQuery query = new BooleanQuery.Builder().add(IntPoint.newSetQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST).build();
-        List<Map> maps = query(new Option(query));
-        // 更新状态为处理中
-        maps.forEach(row -> {
-            String id = (String) row.get(BinlogConstant.BINLOG_ID);
-            BytesRef ref = (BytesRef) row.get(BinlogConstant.BINLOG_CONTENT);
-            try {
-                shard.update(new Term(BinlogConstant.BINLOG_ID, String.valueOf(id)), ParamsUtil.convertBinlog2Doc(id, BinlogConstant.PROCESSING, ref, now));
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        });
-
-        // 查询【待处理】
-        query = new BooleanQuery.Builder()
-                .add(IntPoint.newSetQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST)
+    private List<Map> queryReadyAndProcess() throws IOException {
+        long lastTime = Timestamp.valueOf(LocalDateTime.now().minusSeconds(5)).getTime();
+        BooleanQuery filter1 = new BooleanQuery.Builder()
+                .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST)
                 .build();
-        maps = query(new Option(query));
-        logger.info("【待处理】总条数:{}", maps.size());
-
-        // 查询【待处理】和【处理中】
-        query = new BooleanQuery.Builder()
-                .add(IntPoint.newSetQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY, BinlogConstant.PROCESSING), BooleanClause.Occur.MUST)
+        BooleanQuery filter2 = new BooleanQuery.Builder()
+                .add(IntPoint.newExactQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.PROCESSING), BooleanClause.Occur.MUST)
+                .add(LongPoint.newRangeQuery(BinlogConstant.BINLOG_TIME, Long.MIN_VALUE, lastTime), BooleanClause.Occur.MUST)
+                .build();
+        BooleanQuery query = new BooleanQuery.Builder()
+                .add(filter1, BooleanClause.Occur.SHOULD)
+                .add(filter2, BooleanClause.Occur.SHOULD)
                 .build();
-        maps = query(new Option(query));
-        logger.info("【待处理】和【处理中】总条数:{}", maps.size());
+        return query(new Option(query));
     }
 
     private List<Map> query(Option option) throws IOException {
+        option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_ID, IndexFieldResolverEnum.STRING);
         option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_STATUS, IndexFieldResolverEnum.INT);
         option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_CONTENT, IndexFieldResolverEnum.BINARY);
-        Paging paging = shard.query(option, 1, 10001, null);
+        option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_TIME, IndexFieldResolverEnum.LONG);
+        Sort sort = new Sort(new SortField(BinlogConstant.BINLOG_TIME, SortField.Type.LONG));
+        Paging paging = shard.query(option, 1, 10001, sort);
         List<Map> maps = (List<Map>) paging.getData();
         for (Map m : maps) {
             String id = (String) m.get(BinlogConstant.BINLOG_ID);
             Integer s = (Integer) m.get(BinlogConstant.BINLOG_STATUS);
             BytesRef ref = (BytesRef) m.get(BinlogConstant.BINLOG_CONTENT);
+            Long t = (Long) m.get(BinlogConstant.BINLOG_TIME);
             BinlogMessage message = BinlogMessage.parseFrom(ref.bytes);
             Map<String, ByteString> rowMap = message.getData().getRowMap();
-            logger.info("id:{}, s:{}, message:{}", id, s, rowMap.get("name").toStringUtf8());
+            String timestamp = DateFormatUtil.timestampToString(new Timestamp(t));
+            logger.info("t:{}, id:{}, s:{}, message:{}", timestamp, id, s, rowMap.get("name").toStringUtf8());
         }
         return maps;
     }