瀏覽代碼

add recorder

AE86 3 年之前
父節點
當前提交
607f1c5ad7

+ 11 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -2,6 +2,7 @@ package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.parser.flush.binlog.BinlogRecorder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -18,6 +19,11 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
+ * 任务缓存执行器
+ * <p>1. 任务优先进入缓存队列
+ * <p>2. 任务数超过队列阈值80%时,序列化写入磁盘
+ * <p>3. 内置定时同步线程,在队列空闲时,将磁盘数据刷入缓存
+ *
  * @author AE86
  * @version 1.0.0
  * @date 2022/3/27 17:36
@@ -39,10 +45,13 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
 
     private volatile boolean running;
 
-    private final static long MAX_BATCH_COUNT = 1000L;
+    private static final long MAX_BATCH_COUNT = 1000L;
+
+    private BinlogRecorder recorder;
 
     @PostConstruct
     private void init() {
+        recorder = new BinlogRecorder(getClass().getSimpleName(), buffer, scheduledTaskService);
         scheduledTaskService.start(getPeriod(), this);
     }
 
@@ -87,6 +96,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     public void offer(BufferRequest request) {
         if (running) {
             temp.offer((Request) request);
+            recorder.offer(request);
         } else {
             buffer.offer((Request) request);
         }

+ 54 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/binlog/BinlogRecorder.java

@@ -0,0 +1,54 @@
+package org.dbsyncer.parser.flush.binlog;
+
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.parser.flush.BufferRequest;
+
+import java.io.File;
+import java.util.Queue;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/6/8 0:53
+ */
+public class BinlogRecorder implements ScheduledTaskJob {
+
+    private static final long PERIOD = 1000;
+
+    private static final long BINLOG_MAX_SIZE = 512 * 1024 * 1024;
+
+    private static final int BINLOG_EXPIRE_DAYS = 7;
+
+    /**
+     * 相对路径/data/binlog
+     */
+    private final String PATH = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar)
+            .append("data").append(File.separatorChar)
+            .append("binlog").append(File.separatorChar)
+            .toString();
+
+    private String currentPath;
+
+    private Queue buffer;
+
+    private ScheduledTaskService scheduledTaskService;
+
+    public BinlogRecorder(String fileName, Queue buffer, ScheduledTaskService scheduledTaskService) {
+        currentPath = PATH + fileName;
+        this.buffer = buffer;
+        this.scheduledTaskService = scheduledTaskService;
+        scheduledTaskService.start(PERIOD, this);
+
+        //
+    }
+
+    @Override
+    public void run() {
+        // TODO 同步消息到缓存队列
+    }
+
+    public void offer(BufferRequest request) {
+        // TODO 序列化消息
+    }
+}

+ 5 - 2
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/DiskStorageServiceImpl.java

@@ -41,8 +41,11 @@ public class DiskStorageServiceImpl extends AbstractStorageService {
 
     private Map<String, Shard> map = new ConcurrentHashMap();
 
-    // 相对路径:./data/
-    private static final String PATH = "data" + File.separator;
+    /**
+     * 相对路径/data/
+     */
+    private static final String PATH = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar).append("data")
+            .append(File.separatorChar).toString();
 
     @PostConstruct
     private void init() {