AE86 2 年 前
コミット
8034881ba7

+ 11 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -2,6 +2,7 @@ package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
@@ -9,6 +10,7 @@ import org.dbsyncer.parser.ParserFactory;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
 import org.dbsyncer.parser.model.*;
 import org.dbsyncer.parser.strategy.FlushStrategy;
+import org.dbsyncer.parser.strategy.ParserStrategy;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
@@ -30,6 +32,9 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Autowired
     private FlushStrategy flushStrategy;
 
+    @Autowired
+    private ParserStrategy parserStrategy;
+
     @Autowired
     private CacheService cacheService;
 
@@ -43,6 +48,9 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     @Override
     protected void partition(WriterRequest request, WriterResponse response) {
         response.getDataList().add(request.getRow());
+        if(StringUtil.isNotBlank(request.getMessageId())){
+            response.getMessageIds().add(request.getMessageId());
+        }
         if (response.isMerged()) {
             return;
         }
@@ -66,6 +74,9 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
 
         // 3、持久化同步结果
         flushStrategy.flushIncrementData(mapping.getMetaId(), result, response.getEvent());
+
+        // 4、消息处理完成
+        parserStrategy.complete(response.getMessageIds());
     }
 
     /**

+ 11 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java

@@ -11,14 +11,25 @@ import java.util.Map;
  */
 public class WriterRequest extends AbstractWriter implements BufferRequest {
 
+    private String messageId;
+
     private Map row;
 
     public WriterRequest(String tableGroupId, String event, Map row) {
+        this(null, tableGroupId, event, row);
+    }
+
+    public WriterRequest(String messageId, String tableGroupId, String event, Map row) {
         setTableGroupId(tableGroupId);
         setEvent(event);
+        this.messageId = messageId;
         this.row = row;
     }
 
+    public String getMessageId() {
+        return messageId;
+    }
+
     public Map getRow() {
         return row;
     }

+ 3 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterResponse.java

@@ -14,6 +14,7 @@ import java.util.Map;
 public class WriterResponse extends AbstractWriter implements BufferResponse {
 
     private List<Map> dataList = new LinkedList<>();
+    private List<String> messageIds = new LinkedList<>();
 
     private boolean isMerged;
 
@@ -26,8 +27,8 @@ public class WriterResponse extends AbstractWriter implements BufferResponse {
         return dataList;
     }
 
-    public void setDataList(List<Map> dataList) {
-        this.dataList = dataList;
+    public List<String> getMessageIds() {
+        return messageIds;
     }
 
     public boolean isMerged() {

+ 2 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/ParserStrategy.java

@@ -1,9 +1,11 @@
 package org.dbsyncer.parser.strategy;
 
+import java.util.List;
 import java.util.Map;
 
 public interface ParserStrategy {
 
     void execute(String tableGroupId, String event, Map<String, Object> data);
 
+    default void complete(List<String> messageIds) {}
 }

+ 8 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableWriterBufferActuatorStrategy.java

@@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
@@ -55,6 +56,11 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
         }
     }
 
+    @Override
+    public void complete(List<String> messageIds) {
+        super.completeMessage(messageIds);
+    }
+
     @Override
     public Queue getQueue() {
         return writerBufferActuator.getQueue();
@@ -71,7 +77,7 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
     }
 
     @Override
-    protected WriterRequest deserialize(BinlogMessage message) {
+    protected WriterRequest deserialize(String messageId, BinlogMessage message) {
         if (CollectionUtils.isEmpty(message.getData().getRowMap())) {
             return null;
         }
@@ -90,7 +96,7 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
                     data.put(k, resolveValue(fieldMap.get(k).getType(), v));
                 }
             });
-            return new WriterRequest(message.getTableGroupId(), message.getEvent().name(), data);
+            return new WriterRequest(messageId, message.getTableGroupId(), message.getEvent().name(), data);
         } catch (Exception e) {
             logger.error(e.getMessage());
         }

+ 68 - 50
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -7,11 +7,10 @@ import oracle.sql.CLOB;
 import oracle.sql.TIMESTAMP;
 import org.apache.commons.io.IOUtils;
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.StoredField;
-import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.IntPoint;
 import org.apache.lucene.index.Term;
-import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.util.BytesRef;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
@@ -20,9 +19,11 @@ import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.storage.binlog.impl.BinlogColumnValue;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
+import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.enums.IndexFieldResolverEnum;
 import org.dbsyncer.storage.lucene.Shard;
 import org.dbsyncer.storage.query.Option;
+import org.dbsyncer.storage.util.ParamsUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
@@ -37,6 +38,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.sql.Date;
 import java.sql.*;
+import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.Lock;
@@ -57,10 +59,6 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
     @Autowired
     private SnowflakeIdWorker snowflakeIdWorker;
 
-    private static final String BINLOG_ID = "id";
-
-    private static final String BINLOG_CONTENT = "c";
-
     private static final int SUBMIT_COUNT = 1000;
 
     private static final Queue<BinlogMessage> queue = new LinkedBlockingQueue(10000);
@@ -101,7 +99,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
      * @param message
      * @return
      */
-    protected abstract Message deserialize(BinlogMessage message);
+    protected abstract Message deserialize(String messageId, BinlogMessage message);
 
     @Override
     public void flush(BinlogMessage message) {
@@ -114,8 +112,12 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
     }
 
     private void doParse() throws IOException {
-        Option option = new Option(new MatchAllDocsQuery());
-        option.addIndexFieldResolverEnum(BINLOG_CONTENT, IndexFieldResolverEnum.BINARY);
+        BooleanQuery query = new BooleanQuery.Builder()
+                .add(IntPoint.newSetQuery(BinlogConstant.BINLOG_STATUS, BinlogConstant.READY), BooleanClause.Occur.MUST)
+                .build();
+        Option option = new Option(query);
+        option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_ID, IndexFieldResolverEnum.STRING);
+        option.addIndexFieldResolverEnum(BinlogConstant.BINLOG_CONTENT, IndexFieldResolverEnum.BINARY);
         Paging paging = shard.query(option, 1, SUBMIT_COUNT, null);
         if (CollectionUtils.isEmpty(paging.getData())) {
             return;
@@ -123,20 +125,38 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
         List<Map> list = (List<Map>) paging.getData();
         int size = list.size();
-        Term[] terms = new Term[size];
+        List<Message> messageList = new ArrayList<>();
+        long now = Instant.now().toEpochMilli();
         for (int i = 0; i < size; i++) {
             try {
-                BytesRef ref = (BytesRef) list.get(i).get(BINLOG_CONTENT);
-                Message message = deserialize(BinlogMessage.parseFrom(ref.bytes));
+                String id = (String) list.get(i).get(BinlogConstant.BINLOG_ID);
+                BytesRef ref = (BytesRef) list.get(i).get(BinlogConstant.BINLOG_CONTENT);
+                Message message = deserialize(id, BinlogMessage.parseFrom(ref.bytes));
                 if (null != message) {
-                    getQueue().offer(message);
+                    messageList.add(message);
                 }
-                terms[i] = new Term(BINLOG_ID, (String) list.get(i).get(BINLOG_ID));
+                shard.update(new Term(BinlogConstant.BINLOG_ID, String.valueOf(id)), ParamsUtil.convertBinlog2Doc(id, BinlogConstant.PROCESSING, ref, now));
             } catch (InvalidProtocolBufferException e) {
                 logger.error(e.getMessage());
             }
         }
-        shard.deleteBatch(terms);
+
+        getQueue().addAll(messageList);
+    }
+
+    protected void completeMessage(List<String> messageIds) {
+        if (!CollectionUtils.isEmpty(messageIds)) {
+            try {
+                int size = messageIds.size();
+                Term[] terms = new Term[size];
+                for (int i = 0; i < size; i++) {
+                    terms[i] = new Term(BinlogConstant.BINLOG_ID, messageIds.get(i));
+                }
+                shard.deleteBatch(terms);
+            } catch (IOException e) {
+                logger.error(e.getMessage());
+            }
+        }
     }
 
     /**
@@ -331,6 +351,34 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         }
     }
 
+    private byte[] getBytes(BLOB blob) {
+        InputStream is = null;
+        byte[] b = null;
+        try {
+            is = blob.getBinaryStream();
+            b = new byte[(int) blob.length()];
+            is.read(b);
+            return b;
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        } finally {
+            IOUtils.closeQuietly(is);
+        }
+        return b;
+    }
+
+    private byte[] getBytes(CLOB clob) {
+        try {
+            long length = clob.length();
+            if (length > 0) {
+                return clob.getSubString(1, (int) length).getBytes(Charset.defaultCharset());
+            }
+        } catch (SQLException e) {
+            logger.error(e.getMessage());
+        }
+        return null;
+    }
+
     /**
      * 合并缓存队列任务到磁盘
      */
@@ -344,14 +392,12 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
             List<Document> tasks = new ArrayList<>();
             int count = 0;
-            Document doc;
+            long now = Instant.now().toEpochMilli();
             while (!queue.isEmpty() && count < SUBMIT_COUNT) {
                 BinlogMessage message = queue.poll();
                 if (null != message) {
-                    doc = new Document();
-                    doc.add(new StringField(BINLOG_ID, String.valueOf(snowflakeIdWorker.nextId()), Field.Store.YES));
-                    doc.add(new StoredField(BINLOG_CONTENT, new BytesRef(message.toByteArray())));
-                    tasks.add(doc);
+                    tasks.add(ParamsUtil.convertBinlog2Doc(String.valueOf(snowflakeIdWorker.nextId()), BinlogConstant.READY,
+                            new BytesRef(message.toByteArray()), now));
                 }
                 count++;
             }
@@ -400,32 +446,4 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         }
     }
 
-    private byte[] getBytes(BLOB blob) {
-        InputStream is = null;
-        byte[] b = null;
-        try {
-            is = blob.getBinaryStream();
-            b = new byte[(int) blob.length()];
-            is.read(b);
-            return b;
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-        } finally {
-            IOUtils.closeQuietly(is);
-        }
-        return b;
-    }
-
-    private byte[] getBytes(CLOB clob) {
-        try {
-            long length = clob.length();
-            if (length > 0) {
-                return clob.getSubString(1, (int) length).getBytes(Charset.defaultCharset());
-            }
-        } catch (SQLException e) {
-            logger.error(e.getMessage());
-        }
-        return null;
-    }
-
 }

+ 24 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/BinlogConstant.java

@@ -0,0 +1,24 @@
+package org.dbsyncer.storage.constant;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/7/13 22:14
+ */
+public class BinlogConstant {
+
+    /**
+     * 属性
+     */
+    public static final String BINLOG_ID = "id";
+    public static final String BINLOG_STATUS = "s";
+    public static final String BINLOG_CONTENT = "c";
+    public static final String BINLOG_TIME = "t";
+
+    /**
+     * 状态类型
+     */
+    public static final int READY = 0;
+    public static final int PROCESSING = 1;
+
+}

+ 18 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/ParamsUtil.java

@@ -1,6 +1,8 @@
 package org.dbsyncer.storage.util;
 
 import org.apache.lucene.document.*;
+import org.apache.lucene.util.BytesRef;
+import org.dbsyncer.storage.constant.BinlogConstant;
 import org.dbsyncer.storage.constant.ConfigConstant;
 import org.springframework.util.Assert;
 
@@ -109,4 +111,20 @@ public abstract class ParamsUtil {
         return doc;
     }
 
+    public static Document convertBinlog2Doc(String messageId, int status, BytesRef bytes, long updateTime) {
+        Document doc = new Document();
+        doc.add(new StringField(BinlogConstant.BINLOG_ID, messageId, Field.Store.YES));
+
+        doc.add(new IntPoint(BinlogConstant.BINLOG_STATUS, status));
+        doc.add(new StoredField(BinlogConstant.BINLOG_STATUS, status));
+
+        doc.add(new BinaryDocValuesField(BinlogConstant.BINLOG_CONTENT, bytes));
+        doc.add(new StoredField(BinlogConstant.BINLOG_CONTENT, bytes));
+
+        doc.add(new LongPoint(BinlogConstant.BINLOG_TIME, updateTime));
+        doc.add(new StoredField(BinlogConstant.BINLOG_TIME, updateTime));
+        doc.add(new NumericDocValuesField(BinlogConstant.BINLOG_TIME, updateTime));
+        return doc;
+    }
+
 }

+ 1 - 1
dbsyncer-storage/src/main/test/BinlogMessageTest.java

@@ -217,7 +217,7 @@ public class BinlogMessageTest extends AbstractBinlogRecorder {
     }
 
     @Override
-    protected Object deserialize(BinlogMessage message) {
+    protected Object deserialize(String messageId, BinlogMessage message) {
         return null;
     }
 

+ 1 - 1
dbsyncer-storage/src/main/test/ShardBinlogTest.java

@@ -151,7 +151,7 @@ public class ShardBinlogTest extends AbstractBinlogRecorder {
     }
 
     @Override
-    protected Object deserialize(BinlogMessage message) {
+    protected Object deserialize(String messageId, BinlogMessage message) {
         return null;
     }