瀏覽代碼

add binlog service

AE86 2 年之前
父節點
當前提交
1f9ce653ca

+ 196 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogService.java

@@ -0,0 +1,196 @@
+package org.dbsyncer.storage.binlog;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.dbsyncer.common.config.BinlogRecorderConfig;
+import org.dbsyncer.common.model.Paging;
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.common.snowflake.SnowflakeIdWorker;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.storage.StorageService;
+import org.dbsyncer.storage.binlog.proto.BinlogMessage;
+import org.dbsyncer.storage.constant.BinlogConstant;
+import org.dbsyncer.storage.constant.ConfigConstant;
+import org.dbsyncer.storage.enums.StorageEnum;
+import org.dbsyncer.storage.query.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.annotation.PostConstruct;
+import java.time.Instant;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/11/25 0:53
+ */
+public abstract class AbstractBinlogService<Message> implements BinlogRecorder {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private StorageService storageService;
+
+    @Autowired
+    private ScheduledTaskService scheduledTaskService;
+
+    @Autowired
+    private SnowflakeIdWorker snowflakeIdWorker;
+
+    @Autowired
+    private BinlogRecorderConfig binlogRecorderConfig;
+
+    private Queue<BinlogMessage> queue;
+
+    private WriterTask writerTask = new WriterTask();
+
+    private ReaderTask readerTask = new ReaderTask();
+
+    @PostConstruct
+    private void init() {
+        queue = new LinkedBlockingQueue(binlogRecorderConfig.getQueueCapacity());
+        scheduledTaskService.start(binlogRecorderConfig.getWriterPeriodMillisecond(), writerTask);
+        scheduledTaskService.start(binlogRecorderConfig.getReaderPeriodMillisecond(), readerTask);
+    }
+
+    /**
+     * 反序列化消息
+     *
+     * @param message
+     * @return
+     */
+    protected abstract Message deserialize(String messageId, BinlogMessage message);
+
+    @Override
+    public void flush(BinlogMessage message) {
+        queue.offer(message);
+    }
+
+    @Override
+    public void complete(List<String> messageIds) {
+        if (!CollectionUtils.isEmpty(messageIds)) {
+            storageService.removeBatch(StorageEnum.BINLOG, messageIds);
+        }
+    }
+
+    /**
+     * 合并缓存队列任务到磁盘
+     */
+    final class WriterTask implements ScheduledTaskJob {
+
+        @Override
+        public void run() {
+            if (queue.isEmpty()) {
+                return;
+            }
+
+            List<Map> tasks = new ArrayList<>();
+            int count = 0;
+            long now = Instant.now().toEpochMilli();
+            Map task = null;
+            while (!queue.isEmpty() && count < binlogRecorderConfig.getBatchCount()) {
+                BinlogMessage message = queue.poll();
+                if (null != message) {
+                    task = new HashMap();
+                    task.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
+                    task.put(ConfigConstant.BINLOG_STATUS, BinlogConstant.READY);
+                    task.put(ConfigConstant.CONFIG_MODEL_JSON, message.toByteArray());
+                    task.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
+                    tasks.add(task);
+                }
+                count++;
+            }
+
+            if (!CollectionUtils.isEmpty(tasks)) {
+                storageService.addBatch(StorageEnum.BINLOG, tasks);
+            }
+            tasks = null;
+        }
+    }
+
+    /**
+     * 从磁盘读取日志到任务队列
+     */
+    final class ReaderTask implements ScheduledTaskJob {
+
+        private final Lock lock = new ReentrantLock(true);
+
+        private volatile boolean running;
+
+        @Override
+        public void run() {
+            // 读取任务数 >= 1/2缓存同步队列容量则继续等待
+            if (running || binlogRecorderConfig.getBatchCount() + getQueue().size() >= getQueueCapacity() / 2) {
+                return;
+            }
+
+            final Lock binlogLock = lock;
+            boolean locked = false;
+            try {
+                locked = binlogLock.tryLock();
+                if (locked) {
+                    running = true;
+                    doParse();
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage());
+            } finally {
+                if (locked) {
+                    running = false;
+                    binlogLock.unlock();
+                }
+            }
+        }
+
+        private void doParse() {
+            //  TODO 查询[待处理] 或 [处理中 & 处理超时]
+            Query query = new Query();
+            query.setType(StorageEnum.BINLOG);
+            query.addFilter(ConfigConstant.BINLOG_STATUS, String.valueOf(BinlogConstant.READY));
+            query.setPageNum(1);
+            query.setPageSize(binlogRecorderConfig.getBatchCount());
+            Paging paging = storageService.query(query);
+            if (CollectionUtils.isEmpty(paging.getData())) {
+                return;
+            }
+
+            List<Map> list = (List<Map>) paging.getData();
+            final int size = list.size();
+            final List<Message> messages = new ArrayList<>(size);
+            final List<Map> updateTasks = new ArrayList<>(size);
+            boolean existProcessing = false;
+            for (int i = 0; i < size; i++) {
+                Map row = list.get(i);
+                String id = (String) row.get(ConfigConstant.CONFIG_MODEL_ID);
+                Integer status = (Integer) row.get(ConfigConstant.BINLOG_STATUS);
+                byte[] bytes = (byte[]) row.get(ConfigConstant.CONFIG_MODEL_JSON);
+                if (BinlogConstant.PROCESSING == status) {
+                    existProcessing = true;
+                }
+                try {
+                    Message message = deserialize(id, BinlogMessage.parseFrom(bytes));
+                    if (null != message) {
+                        messages.add(message);
+                        row.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, Instant.now().toEpochMilli());
+                        updateTasks.add(row);
+                    }
+                } catch (InvalidProtocolBufferException e) {
+                    logger.error(e.getMessage());
+                }
+            }
+            if (existProcessing) {
+                logger.warn("存在超时未处理数据,正在重试,建议优化配置参数[max-processing-seconds={}].", binlogRecorderConfig.getMaxProcessingSeconds());
+            }
+
+            // 如果在更新消息状态的过程中服务被中止,为保证数据的安全性,重启后消息可能会同步2次)
+            storageService.editBatch(StorageEnum.BINLOG, updateTasks);
+            getQueue().addAll(messages);
+        }
+    }
+
+}

+ 5 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/constant/ConfigConstant.java

@@ -37,4 +37,9 @@ public class ConfigConstant {
     public static final String DATA_EVENT = "event";
     public static final String DATA_ERROR = "error";
 
+    /**
+     * Binlog
+     */
+    public static final String BINLOG_STATUS = "status";
+
 }

+ 21 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/strategy/impl/BinlogStrategy.java

@@ -0,0 +1,21 @@
+package org.dbsyncer.storage.strategy.impl;
+
+import org.dbsyncer.storage.enums.StorageEnum;
+import org.dbsyncer.storage.strategy.Strategy;
+import org.springframework.stereotype.Component;
+
+/**
+ * 缓存队列数据
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2019/11/15 22:39
+ */
+@Component
+public class BinlogStrategy implements Strategy {
+
+    @Override
+    public String createSharding(String separator, String collectionId) {
+        return StorageEnum.BINLOG.getType();
+    }
+}

+ 4 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -47,6 +47,7 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
         // 创建配置和日志索引shard
         getShard(getSharding(StorageEnum.CONFIG, null));
         getShard(getSharding(StorageEnum.LOG, null));
+        getShard(getSharding(StorageEnum.BINLOG, null));
     }
 
     @Override
@@ -152,6 +153,9 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
                 case CONFIG:
                     docs.add(DocumentUtil.convertConfig2Doc(r));
                     break;
+                case BINLOG:
+                    docs.add(DocumentUtil.convertBinlog2Doc(r));
+                    break;
                 default:
                     break;
             }

+ 21 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/util/DocumentUtil.java

@@ -116,6 +116,7 @@ public abstract class DocumentUtil {
         return doc;
     }
 
+    @Deprecated
     public static Document convertBinlog2Doc(String messageId, int status, BytesRef bytes, long updateTime) {
         Document doc = new Document();
         doc.add(new StringField(BinlogConstant.BINLOG_ID, messageId, Field.Store.YES));
@@ -132,4 +133,24 @@ public abstract class DocumentUtil {
         return doc;
     }
 
+    public static Document convertBinlog2Doc(Map params) {
+        Document doc = new Document();
+        String id = (String) params.get(ConfigConstant.CONFIG_MODEL_ID);
+        doc.add(new StringField(ConfigConstant.CONFIG_MODEL_ID, id, Field.Store.YES));
+
+        Integer status = (Integer) params.get(ConfigConstant.BINLOG_STATUS);
+        doc.add(new IntPoint(ConfigConstant.BINLOG_STATUS, status));
+        doc.add(new StoredField(ConfigConstant.BINLOG_STATUS, status));
+
+        byte[] bytes = (byte[]) params.get(ConfigConstant.CONFIG_MODEL_JSON);
+        doc.add(new BinaryDocValuesField(ConfigConstant.CONFIG_MODEL_JSON, new BytesRef(bytes)));
+        doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_JSON, bytes));
+
+        Long createTime = (Long) params.get(ConfigConstant.CONFIG_MODEL_CREATE_TIME);
+        doc.add(new LongPoint(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
+        doc.add(new StoredField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
+        doc.add(new NumericDocValuesField(ConfigConstant.CONFIG_MODEL_CREATE_TIME, createTime));
+        return doc;
+    }
+
 }