Selaa lähdekoodia

add BinlogRecorder

AE86 3 vuotta sitten
vanhempi
säilyke
be6a25a135

+ 3 - 1
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskService.java

@@ -17,12 +17,14 @@ public interface ScheduledTaskService {
      */
     void start(String key, String cron, ScheduledTaskJob job);
 
-    void start(String key, long period, ScheduledTaskJob job);
+    void start(String key, long period, ScheduledTaskJob job, String suffix);
 
     void start(String cron, ScheduledTaskJob job);
 
     void start(long period, ScheduledTaskJob job);
 
+    void start(long period, ScheduledTaskJob job, String suffix);
+
     void stop(String key);
 
 }

+ 8 - 3
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java

@@ -39,8 +39,8 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService, Disposabl
     }
 
     @Override
-    public void start(String key, long period, ScheduledTaskJob job) {
-        logger.info("[period={}], Started task [{}]", period, job.getClass().getName());
+    public void start(String key, long period, ScheduledTaskJob job, String suffix) {
+        logger.info("[period={}], Started task [{}{}]", period, job.getClass().getName(), suffix);
         apply(key, () -> taskScheduler.scheduleAtFixedRate(job, period));
     }
 
@@ -51,7 +51,12 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService, Disposabl
 
     @Override
     public void start(long period, ScheduledTaskJob job) {
-        start(UUIDUtil.getUUID(), period, job);
+        start(UUIDUtil.getUUID(), period, job, "");
+    }
+
+    @Override
+    public void start(long period, ScheduledTaskJob job, String suffix) {
+        start(UUIDUtil.getUUID(), period, job, suffix);
     }
 
     @Override

+ 115 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBinlogRecorder.java

@@ -0,0 +1,115 @@
+package org.dbsyncer.parser.flush;
+
+import org.apache.commons.io.FileUtils;
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.listener.file.BufferedRandomAccessFile;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.util.Assert;
+
+import javax.annotation.PostConstruct;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.charset.Charset;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/6/8 0:53
+ */
+public abstract class AbstractBinlogRecorder implements BinlogRecorder, ScheduledTaskJob {
+
+    @Autowired
+    private ScheduledTaskService scheduledTaskService;
+
+    private static final long PERIOD = 1000;
+
+    private static final long BINLOG_MAX_SIZE = 512 * 1024 * 1024;
+
+    private static final int BINLOG_EXPIRE_DAYS = 7;
+
+    private static final String BINLOG = "binlog";
+
+    private static final String BINLOG_CONFIG = BINLOG + ".config";
+
+    private static final String BINLOG_INDEX = BINLOG + ".index";
+
+    private static final String BINLOG_TEMP = BINLOG + ".temp";
+
+    private String path;
+
+    private Binlog binlog;
+
+    private RandomAccessFile raf;
+
+    @PostConstruct
+    private void init() throws IOException {
+        scheduledTaskService.start(PERIOD, this, String.format(".%s", BinlogRecorder.class.getSimpleName()));
+        initPipeline(getClass().getSimpleName());
+    }
+
+    protected abstract BufferActuator getBufferActuator();
+
+    @Override
+    public void run() {
+        // TODO 同步消息到缓存队列
+    }
+
+    @Override
+    public void flush(BufferRequest request) {
+        // TODO 序列化消息
+    }
+
+    private void initPipeline(String fileName) throws IOException {
+        // /data/binlog/{BufferActuator}/
+        path = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar).append("data").append(File.separatorChar)
+                .append("binlog").append(File.separatorChar)
+                .append(fileName).append(File.separatorChar)
+                .toString();
+
+        File dir = new File(path);
+        File binlogConfigFile = new File(path.concat(BINLOG_CONFIG));
+        if (!dir.exists()) {
+            FileUtils.forceMkdir(dir);
+            binlog = new Binlog().setBinlog(String.format("%s.%06d", BINLOG, 1));
+            FileUtils.writeStringToFile(binlogConfigFile, JsonUtil.objToJson(binlog), Charset.defaultCharset());
+            return;
+        }
+
+        Assert.isTrue(binlogConfigFile.exists(), String.format("The binlogConfigFile '%s' is null.", binlogConfigFile.getAbsolutePath()));
+
+        String binlogJsonStr = FileUtils.readFileToString(binlogConfigFile, Charset.defaultCharset());
+        binlog = JsonUtil.jsonToObj(binlogJsonStr, Binlog.class);
+        File binlogFile = new File(path.concat(binlog.getBinlog()));
+        raf = new BufferedRandomAccessFile(binlogFile, "rw");
+    }
+
+    public static void main(String[] args) {
+        System.out.println(String.format("%s.%06d", BINLOG, 1));
+    }
+
+    static class Binlog {
+        private String binlog;
+        private long pos = 0;
+
+        public String getBinlog() {
+            return binlog;
+        }
+
+        public Binlog setBinlog(String binlog) {
+            this.binlog = binlog;
+            return this;
+        }
+
+        public long getPos() {
+            return pos;
+        }
+
+        public Binlog setPos(long pos) {
+            this.pos = pos;
+            return this;
+        }
+    }
+}

+ 12 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -2,7 +2,6 @@ package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
-import org.dbsyncer.parser.flush.binlog.BinlogRecorder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -28,7 +27,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * @version 1.0.0
  * @date 2022/3/27 17:36
  */
-public abstract class AbstractBufferActuator<Request, Response> implements BufferActuator, ScheduledTaskJob {
+public abstract class AbstractBufferActuator<Request, Response> extends AbstractBinlogRecorder implements BufferActuator, ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -47,11 +46,8 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
 
     private static final long MAX_BATCH_COUNT = 1000L;
 
-    private BinlogRecorder recorder;
-
     @PostConstruct
     private void init() {
-        recorder = new BinlogRecorder(getClass().getSimpleName(), buffer, scheduledTaskService);
         scheduledTaskService.start(getPeriod(), this);
     }
 
@@ -92,11 +88,21 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
      */
     protected abstract void pull(Response response);
 
+    @Override
+    protected BufferActuator getBufferActuator() {
+        return this;
+    }
+
+    @Override
+    public Queue getBuffer() {
+        return buffer;
+    }
+
     @Override
     public void offer(BufferRequest request) {
         if (running) {
             temp.offer((Request) request);
-            recorder.offer(request);
+            super.flush(request);
         } else {
             buffer.offer((Request) request);
         }

+ 17 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BinlogRecorder.java

@@ -0,0 +1,17 @@
+package org.dbsyncer.parser.flush;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/6/8 23:34
+ */
+public interface BinlogRecorder {
+
+    /**
+     * 将任务序列化刷入磁盘
+     *
+     * @param request
+     */
+    void flush(BufferRequest request);
+
+}

+ 4 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.parser.flush;
 
+import java.util.Queue;
+
 /**
  * @author AE86
  * @version 1.0.0
@@ -7,6 +9,8 @@ package org.dbsyncer.parser.flush;
  */
 public interface BufferActuator {
 
+    Queue getBuffer();
+
     void offer(BufferRequest request);
 
 }

+ 0 - 54
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/binlog/BinlogRecorder.java

@@ -1,54 +0,0 @@
-package org.dbsyncer.parser.flush.binlog;
-
-import org.dbsyncer.common.scheduled.ScheduledTaskJob;
-import org.dbsyncer.common.scheduled.ScheduledTaskService;
-import org.dbsyncer.parser.flush.BufferRequest;
-
-import java.io.File;
-import java.util.Queue;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/6/8 0:53
- */
-public class BinlogRecorder implements ScheduledTaskJob {
-
-    private static final long PERIOD = 1000;
-
-    private static final long BINLOG_MAX_SIZE = 512 * 1024 * 1024;
-
-    private static final int BINLOG_EXPIRE_DAYS = 7;
-
-    /**
-     * 相对路径/data/binlog
-     */
-    private final String PATH = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar)
-            .append("data").append(File.separatorChar)
-            .append("binlog").append(File.separatorChar)
-            .toString();
-
-    private String currentPath;
-
-    private Queue buffer;
-
-    private ScheduledTaskService scheduledTaskService;
-
-    public BinlogRecorder(String fileName, Queue buffer, ScheduledTaskService scheduledTaskService) {
-        currentPath = PATH + fileName;
-        this.buffer = buffer;
-        this.scheduledTaskService = scheduledTaskService;
-        scheduledTaskService.start(PERIOD, this);
-
-        //
-    }
-
-    @Override
-    public void run() {
-        // TODO 同步消息到缓存队列
-    }
-
-    public void offer(BufferRequest request) {
-        // TODO 序列化消息
-    }
-}