Browse Source

add BinlogRecorder

AE86 3 năm trước cách đây
mục cha
commit
10e742b367

+ 3 - 5
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskService.java

@@ -11,20 +11,18 @@ public interface ScheduledTaskService {
      * 第六位,星期,取值1-7
      * [秒 分 时 日 月 星期]
      *
-     * @param key 任务唯一key
+     * @param key  任务唯一key
      * @param cron 任务表达式
-     * @param job 任务实现
+     * @param job  任务实现
      */
     void start(String key, String cron, ScheduledTaskJob job);
 
-    void start(String key, long period, ScheduledTaskJob job, String suffix);
+    void start(String key, long period, ScheduledTaskJob job);
 
     void start(String cron, ScheduledTaskJob job);
 
     void start(long period, ScheduledTaskJob job);
 
-    void start(long period, ScheduledTaskJob job, String suffix);
-
     void stop(String key);
 
 }

+ 3 - 8
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java

@@ -39,8 +39,8 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService, Disposabl
     }
 
     @Override
-    public void start(String key, long period, ScheduledTaskJob job, String suffix) {
-        logger.info("[period={}], Started task [{}{}]", period, job.getClass().getName(), suffix);
+    public void start(String key, long period, ScheduledTaskJob job) {
+        logger.info("[period={}], Started task [{}]", period, job.getClass().getName());
         apply(key, () -> taskScheduler.scheduleAtFixedRate(job, period));
     }
 
@@ -51,12 +51,7 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService, Disposabl
 
     @Override
     public void start(long period, ScheduledTaskJob job) {
-        start(UUIDUtil.getUUID(), period, job, "");
-    }
-
-    @Override
-    public void start(long period, ScheduledTaskJob job, String suffix) {
-        start(UUIDUtil.getUUID(), period, job, suffix);
+        start(UUIDUtil.getUUID(), period, job);
     }
 
     @Override

+ 137 - 39
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBinlogRecorder.java

@@ -1,28 +1,29 @@
 package org.dbsyncer.parser.flush;
 
 import org.apache.commons.io.FileUtils;
-import org.dbsyncer.common.scheduled.ScheduledTaskJob;
-import org.dbsyncer.common.scheduled.ScheduledTaskService;
+import org.apache.commons.io.IOUtils;
 import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.listener.file.BufferedRandomAccessFile;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
 import org.springframework.util.Assert;
 
 import javax.annotation.PostConstruct;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
+import java.io.*;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Queue;
 
 /**
  * @author AE86
  * @version 1.0.0
  * @date 2022/6/8 0:53
  */
-public abstract class AbstractBinlogRecorder implements BinlogRecorder, ScheduledTaskJob {
+public abstract class AbstractBinlogRecorder implements BinlogRecorder, DisposableBean {
 
-    @Autowired
-    private ScheduledTaskService scheduledTaskService;
+    private final Logger logger = LoggerFactory.getLogger(getClass());
 
     private static final long PERIOD = 1000;
 
@@ -32,62 +33,159 @@ public abstract class AbstractBinlogRecorder implements BinlogRecorder, Schedule
 
     private static final String BINLOG = "binlog";
 
-    private static final String BINLOG_CONFIG = BINLOG + ".config";
-
     private static final String BINLOG_INDEX = BINLOG + ".index";
 
-    private static final String BINLOG_TEMP = BINLOG + ".temp";
+    private static final String BINLOG_CONFIG = BINLOG + ".config";
 
     private String path;
 
+    private File configPath;
+
     private Binlog binlog;
 
-    private RandomAccessFile raf;
+    private Pipeline pipeline;
+
+    private OutputStream writer;
+
+    private static final String LINE_SEPARATOR = System.lineSeparator();
+
+    private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();
 
     @PostConstruct
     private void init() throws IOException {
-        scheduledTaskService.start(PERIOD, this, String.format(".%s", BinlogRecorder.class.getSimpleName()));
-        initPipeline(getClass().getSimpleName());
+        // /data/binlog/{BufferActuator}/
+        path = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar)
+                .append("data").append(File.separatorChar)
+                .append("binlog").append(File.separatorChar)
+                .append(getClass().getSimpleName()).append(File.separatorChar)
+                .toString();
+        File dir = new File(path);
+        if (!dir.exists()) {
+            FileUtils.forceMkdir(dir);
+        }
+
+        initPipeline();
     }
 
-    protected abstract BufferActuator getBufferActuator();
+    protected abstract Queue getBuffer();
 
-    @Override
     public void run() {
-        // TODO 同步消息到缓存队列
+        // 如果缓冲繁忙,继续等待
+        if (getBuffer().size() > 500) {
+            return;
+        }
+
+        try {
+            String line;
+            boolean hasLine = false;
+            while (null != (line = pipeline.readLine())) {
+                if (StringUtil.isNotBlank(line)) {
+                    logger.info(line);
+                    hasLine = true;
+                }
+            }
+
+            if (hasLine) {
+                binlog.setPos(pipeline.filePointer);
+                FileUtils.writeStringToFile(configPath, JsonUtil.objToJson(binlog), DEFAULT_CHARSET);
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage());
+        }
     }
 
     @Override
     public void flush(BufferRequest request) {
-        // TODO 序列化消息
+        try {
+            writeLine(writer, JsonUtil.objToJson(request));
+        } catch (IOException e) {
+            logger.error(e.getMessage());
+        }
     }
 
-    private void initPipeline(String fileName) throws IOException {
-        // /data/binlog/{BufferActuator}/
-        path = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar).append("data").append(File.separatorChar)
-                .append("binlog").append(File.separatorChar)
-                .append(fileName).append(File.separatorChar)
-                .toString();
+    @Override
+    public void destroy() {
+        IOUtils.closeQuietly(writer);
+        IOUtils.closeQuietly(pipeline.raf);
+    }
 
-        File dir = new File(path);
-        File binlogConfigFile = new File(path.concat(BINLOG_CONFIG));
-        if (!dir.exists()) {
-            FileUtils.forceMkdir(dir);
-            binlog = new Binlog().setBinlog(String.format("%s.%06d", BINLOG, 1));
-            FileUtils.writeStringToFile(binlogConfigFile, JsonUtil.objToJson(binlog), Charset.defaultCharset());
-            return;
+    private void initPipeline() throws IOException {
+        // binlog.config
+        configPath = new File(path + BINLOG_CONFIG);
+        if (!configPath.exists()) {
+            final String binlogName = createBinlogName(1);
+            binlog = new Binlog().setBinlog(binlogName);
+            FileUtils.writeStringToFile(configPath, JsonUtil.objToJson(binlog), DEFAULT_CHARSET);
+
+            // binlog.000001
+            FileUtils.writeStringToFile(new File(path + binlog.getBinlog()), "", DEFAULT_CHARSET);
+
+            // binlog.index
+            FileUtils.writeStringToFile(new File(path + BINLOG_INDEX), binlogName + LINE_SEPARATOR, DEFAULT_CHARSET);
         }
 
-        Assert.isTrue(binlogConfigFile.exists(), String.format("The binlogConfigFile '%s' is null.", binlogConfigFile.getAbsolutePath()));
+        String configJson = FileUtils.readFileToString(configPath, Charset.defaultCharset());
+        binlog = JsonUtil.jsonToObj(configJson, Binlog.class);
+        File binlogFile = new File(path + binlog.getBinlog());
+        Assert.isTrue(binlogFile.exists(), String.format("The binlogFile '%s' is not exist.", binlogFile.getAbsolutePath()));
 
-        String binlogJsonStr = FileUtils.readFileToString(binlogConfigFile, Charset.defaultCharset());
-        binlog = JsonUtil.jsonToObj(binlogJsonStr, Binlog.class);
-        File binlogFile = new File(path.concat(binlog.getBinlog()));
-        raf = new BufferedRandomAccessFile(binlogFile, "rw");
+        final RandomAccessFile raf = new BufferedRandomAccessFile(binlogFile, "r");
+        raf.seek(binlog.getPos());
+        pipeline = new Pipeline(raf);
+        writer = new FileOutputStream(binlogFile, true);
     }
 
-    public static void main(String[] args) {
-        System.out.println(String.format("%s.%06d", BINLOG, 1));
+    private void writeLine(final OutputStream output, final String line) throws IOException {
+        if (line == null) {
+            return;
+        }
+        if (line != null) {
+            output.write(line.getBytes(DEFAULT_CHARSET));
+        }
+        output.write(LINE_SEPARATOR.getBytes(DEFAULT_CHARSET));
+    }
+
+    private String createBinlogName(int index) {
+        return String.format("%s.%06d", BINLOG, index <= 0 ? 1 : index);
+    }
+
+    final class Pipeline {
+        RandomAccessFile raf;
+        byte[] b;
+        long filePointer;
+
+        public Pipeline(RandomAccessFile raf) {
+            this.raf = raf;
+        }
+
+        public String readLine() throws IOException {
+            this.filePointer = raf.getFilePointer();
+            if (filePointer >= raf.length()) {
+                b = new byte[0];
+                return null;
+            }
+            if (b == null || b.length == 0) {
+                b = new byte[(int) (raf.length() - filePointer)];
+            }
+            raf.read(b);
+
+            ByteArrayOutputStream stream = new ByteArrayOutputStream();
+            int read = 0;
+            for (int i = 0; i < b.length; i++) {
+                read++;
+                if (b[i] == '\n' || b[i] == '\r') {
+                    break;
+                }
+                stream.write(b[i]);
+            }
+            b = Arrays.copyOfRange(b, read, b.length);
+
+            raf.seek(this.filePointer + read);
+            byte[] _b = stream.toByteArray();
+            stream.close();
+            stream = null;
+            return new String(_b, DEFAULT_CHARSET);
+        }
     }
 
     static class Binlog {

+ 2 - 13
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -27,7 +27,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * @version 1.0.0
  * @date 2022/3/27 17:36
  */
-public abstract class AbstractBufferActuator<Request, Response> extends AbstractBinlogRecorder implements BufferActuator, ScheduledTaskJob {
+public abstract class AbstractBufferActuator<Request, Response> implements BufferActuator, ScheduledTaskJob {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -36,7 +36,7 @@ public abstract class AbstractBufferActuator<Request, Response> extends Abstract
 
     private static final int CAPACITY = 10_0000;
 
-    private Queue<Request> buffer = new LinkedBlockingQueue(CAPACITY);
+    protected Queue<Request> buffer = new LinkedBlockingQueue(CAPACITY);
 
     private Queue<Request> temp = new LinkedBlockingQueue(CAPACITY);
 
@@ -88,21 +88,10 @@ public abstract class AbstractBufferActuator<Request, Response> extends Abstract
      */
     protected abstract void pull(Response response);
 
-    @Override
-    protected BufferActuator getBufferActuator() {
-        return this;
-    }
-
-    @Override
-    public Queue getBuffer() {
-        return buffer;
-    }
-
     @Override
     public void offer(BufferRequest request) {
         if (running) {
             temp.offer((Request) request);
-            super.flush(request);
         } else {
             buffer.offer((Request) request);
         }

+ 0 - 4
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -1,7 +1,5 @@
 package org.dbsyncer.parser.flush;
 
-import java.util.Queue;
-
 /**
  * @author AE86
  * @version 1.0.0
@@ -9,8 +7,6 @@ import java.util.Queue;
  */
 public interface BufferActuator {
 
-    Queue getBuffer();
-
     void offer(BufferRequest request);
 
 }