AE86 2 gadi atpakaļ
vecāks
revīzija
add9db0621

+ 11 - 168
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -1,16 +1,7 @@
 package org.dbsyncer.storage.binlog;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.filefilter.FileFilterUtils;
-import org.apache.commons.io.filefilter.OrFileFilter;
-import org.dbsyncer.common.file.BufferedRandomAccessFile;
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
-import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.common.util.JsonUtil;
-import org.dbsyncer.common.util.NumberUtil;
-import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,14 +9,8 @@ import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import javax.annotation.PostConstruct;
-import java.io.*;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.util.*;
+import java.io.IOException;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -42,74 +27,20 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
     @Autowired
     private ScheduledTaskService scheduledTaskService;
 
-    private static final long BINLOG_MAX_SIZE = 256 * 1024 * 1024;
-
-    private static final int BINLOG_EXPIRE_DAYS = 7;
-
     private static final long MAX_BATCH_COUNT = 100L;
 
-    private static final String LINE_SEPARATOR = System.lineSeparator();
-
-    private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();
-
-    private static final String BINLOG = "binlog";
-
-    private static final String BINLOG_INDEX = BINLOG + ".index";
-
-    private static final String BINLOG_CONFIG = BINLOG + ".config";
-
     private static final long PERIOD = 3000;
 
     private final Lock lock = new ReentrantLock(true);
 
     private volatile boolean running;
 
-    private String path;
-
-    private File configFile;
-
-    private File binlogFile;
-
-    private File indexFile;
-
-    private Binlog binlog;
-
-    private BinlogPipeline pipeline;
+    private BinlogContext context;
 
     @PostConstruct
     private void init() throws IOException {
-        // /data/binlog/{BufferActuator}/
-        path = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar)
-                .append("data").append(File.separatorChar)
-                .append("binlog").append(File.separatorChar)
-                .append(getTaskName()).append(File.separatorChar)
-                .toString();
-        File dir = new File(path);
-        if (!dir.exists()) {
-            FileUtils.forceMkdir(dir);
-        }
-
-        // binlog.config
-        configFile = new File(path + BINLOG_CONFIG);
-        // binlog.index
-        indexFile = new File(path + BINLOG_INDEX);
-        if (!configFile.exists()) {
-            initBinlog(createBinlogName(0));
-            initBinlogIndex();
-        }
-        discardExpiredFile();
-
-        if (null == binlog) {
-            binlog = JsonUtil.jsonToObj(FileUtils.readFileToString(configFile, Charset.defaultCharset()), Binlog.class);
-        }
-        binlogFile = new File(path + binlog.getBinlog());
-        if (!binlogFile.exists()) {
-            logger.warn("The binlogFile '{}' is not exist.", binlogFile.getAbsolutePath());
-            initBinlog(createBinlogName(getBinlogIndex(binlog.getBinlog())));
-            initBinlogIndex();
-        }
-
-        initPipeline();
+        // /data/binlog/WriterBinlog/
+        context = new BinlogContext(getTaskName());
         scheduledTaskService.start(PERIOD, this);
     }
 
@@ -150,8 +81,6 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
             if (locked) {
                 running = true;
                 doParse();
-                discardExpiredFile();
-                switchFile();
             }
         } catch (Exception e) {
             logger.error(e.getMessage());
@@ -164,113 +93,27 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
     }
 
     @Override
-    public void flush(BinlogMessage message) {
-        if (null != message) {
-            try {
-                pipeline.write(message.toByteArray());
-            } catch (IOException e) {
-                logger.error(e.getMessage());
-            }
-        }
+    public void flush(BinlogMessage message) throws IOException {
+        context.getBinlogPipeline().write(message);
     }
 
     @Override
     public void destroy() {
-        IOUtils.closeQuietly(pipeline);
+        context.close();
     }
 
     private void doParse() throws IOException {
         byte[] line;
         AtomicInteger batchCounter = new AtomicInteger();
+        final BinlogPipeline pipeline = context.getBinlogPipeline();
         while (batchCounter.get() < MAX_BATCH_COUNT && null != (line = pipeline.readLine())) {
             deserialize(BinlogMessage.parseFrom(line));
-            //                getQueue().offer(deserialize(message));
+            // getQueue().offer(deserialize(message));
             batchCounter.getAndAdd(1);
         }
 
         if (batchCounter.get() > 0) {
-            binlog.setPos(pipeline.getFilePointer());
-            FileUtils.writeStringToFile(configFile, JsonUtil.objToJson(binlog), DEFAULT_CHARSET);
+            context.flush();
         }
     }
-
-    private void switchFile() throws IOException {
-        if(binlogFile.length() > BINLOG_MAX_SIZE){
-            // fixme 锁上下文
-
-            List<String> list = FileUtils.readLines(indexFile, DEFAULT_CHARSET);
-            int index = 1;
-            if (!CollectionUtils.isEmpty(list)) {
-                index = getBinlogIndex(list.get(list.size() - 1));
-            }
-            String newBinlogName = createBinlogName(index);
-            FileUtils.write(indexFile, newBinlogName + LINE_SEPARATOR, DEFAULT_CHARSET, true);
-            IOUtils.closeQuietly(pipeline);
-            initBinlog(newBinlogName);
-            initPipeline();
-        }
-    }
-
-    private void discardExpiredFile() throws IOException {
-        List<String> index = FileUtils.readLines(indexFile, DEFAULT_CHARSET);
-        if (!CollectionUtils.isEmpty(index)) {
-            Set<String> shouldDelete = new HashSet<>();
-            for (String i : index) {
-                File file = new File(path + i);
-                if (!file.exists()) {
-                    shouldDelete.add(i);
-                    continue;
-                }
-                if (isExpiredFile(file)) {
-                    FileUtils.forceDelete(file);
-                    shouldDelete.add(i);
-                }
-            }
-            if (!CollectionUtils.isEmpty(shouldDelete)) {
-                StringBuilder indexBuffer = new StringBuilder();
-                index.forEach(i -> {
-                    if (!shouldDelete.contains(i)) {
-                        indexBuffer.append(i).append(LINE_SEPARATOR);
-                    }
-                });
-                FileUtils.writeStringToFile(indexFile, indexBuffer.toString(), DEFAULT_CHARSET);
-            }
-        }
-    }
-
-    private boolean isExpiredFile(File file) throws IOException {
-        BasicFileAttributes attr = Files.readAttributes(file.toPath(), BasicFileAttributes.class);
-        Instant instant = attr.creationTime().toInstant();
-        return Timestamp.from(instant).getTime() < Timestamp.valueOf(LocalDateTime.now().minusDays(BINLOG_EXPIRE_DAYS)).getTime();
-    }
-
-    private void initBinlog(String binlogName) throws IOException {
-        // binlog.config
-        binlog = new Binlog().setBinlog(binlogName);
-        FileUtils.writeStringToFile(configFile, JsonUtil.objToJson(binlog), DEFAULT_CHARSET);
-
-        // binlog.000001
-        binlogFile = new File(path + binlogName);
-        FileUtils.writeStringToFile(binlogFile, "", DEFAULT_CHARSET);
-    }
-
-    private void initBinlogIndex() throws IOException {
-        // binlog.index
-        FileUtils.writeStringToFile(indexFile, binlog.getBinlog() + LINE_SEPARATOR, DEFAULT_CHARSET);
-    }
-
-    private void initPipeline() throws IOException {
-        final RandomAccessFile raf = new BufferedRandomAccessFile(binlogFile, "rwd");
-        raf.seek(binlog.getPos());
-        pipeline = new BinlogPipeline(raf);
-    }
-
-    private String createBinlogName(int index) {
-        return String.format("%s.%06d", BINLOG, index <= 0 ? 1 : index + 1);
-    }
-
-    private int getBinlogIndex(String binlogName) {
-        return NumberUtil.toInt(StringUtil.substring(binlogName, BINLOG.length() + 1));
-    }
-
 }

+ 188 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogContext.java

@@ -0,0 +1,188 @@
+package org.dbsyncer.storage.binlog;
+
+import org.apache.commons.io.FileUtils;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.common.util.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+public class BinlogContext implements Closeable {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private static final long BINLOG_MAX_SIZE = 256 * 1024 * 1024;
+
+    private static final int BINLOG_EXPIRE_DAYS = 7;
+
+    private static final String LINE_SEPARATOR = System.lineSeparator();
+
+    private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();
+
+    private static final String BINLOG = "binlog";
+
+    private static final String BINLOG_INDEX = BINLOG + ".index";
+
+    private static final String BINLOG_CONFIG = BINLOG + ".config";
+
+    private List<String> index = new LinkedList<>();
+
+    private String path;
+
+    private File configFile;
+
+    private File indexFile;
+
+    private File binlogFile;
+
+    private Binlog binlog;
+
+    private BinlogPipeline pipeline;
+
+    public BinlogContext(String taskName) throws IOException {
+        path = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar)
+                .append("data").append(File.separatorChar)
+                .append("binlog").append(File.separatorChar)
+                .append(taskName).append(File.separatorChar)
+                .toString();
+        File dir = new File(path);
+        if (!dir.exists()) {
+            FileUtils.forceMkdir(dir);
+        }
+
+        // binlog.config
+        configFile = new File(path + BINLOG_CONFIG);
+        if (!configFile.exists()) {
+            // binlog.000001
+            binlog = new Binlog().setBinlog(createNewBinlogName(0));
+            write(configFile, JsonUtil.objToJson(binlog), false);
+
+            binlogFile = new File(path + binlog.getBinlog());
+            write(binlogFile, "", false);
+        }
+        binlog = JsonUtil.jsonToObj(FileUtils.readFileToString(configFile, DEFAULT_CHARSET), Binlog.class);
+
+        // binlog.index
+        indexFile = new File(path + BINLOG_INDEX);
+        if (!indexFile.exists()) {
+            write(indexFile, binlog.getBinlog() + LINE_SEPARATOR, false);
+        }
+
+        // binlog.000001
+        binlogFile = new File(path + binlog.getBinlog());
+        if (!binlogFile.exists()) {
+            // 过期/误删
+            logger.warn("The binlog file '{}' is expired or deleted.", binlog.getBinlog());
+            write(binlogFile, "", false);
+
+            binlog = new Binlog().setBinlog(createNewBinlogName(getBinlogIndex(binlog.getBinlog())));
+            write(indexFile, binlog.getBinlog() + LINE_SEPARATOR, true);
+            binlogFile = new File(path + binlog.getBinlog());
+        }
+
+        // read index
+        List<String> indexFileNames = FileUtils.readLines(indexFile, DEFAULT_CHARSET);
+        Assert.notEmpty(indexFileNames, String.format("Could not read index from file '%s'", indexFile.getName()));
+        index.addAll(indexFileNames);
+
+        // expired file
+        discardExpiredFile();
+
+        initPipeline();
+    }
+
+    private void switchFile() throws IOException {
+        if (binlogFile.length() > BINLOG_MAX_SIZE) {
+            int i = 1;
+            if (!CollectionUtils.isEmpty(index)) {
+                i = getBinlogIndex(index.get(index.size() - 1));
+            }
+            String newBinlogName = createNewBinlogName(i);
+            index.add(newBinlogName);
+            FileUtils.write(new File(path + BINLOG_INDEX), newBinlogName + LINE_SEPARATOR, DEFAULT_CHARSET, true);
+            // fixme 锁上下文
+            pipeline.close();
+            pipeline = new BinlogPipeline(binlogFile, binlog.getPos());
+        }
+    }
+
+    private void discardExpiredFile() throws IOException {
+        Set<String> shouldDelete = new HashSet<>();
+        for (String name : index) {
+            File file = new File(path + name);
+            if (!file.exists()) {
+                shouldDelete.add(name);
+                continue;
+            }
+            if (isExpiredFile(file)) {
+                FileUtils.forceDelete(file);
+                shouldDelete.add(name);
+            }
+        }
+        if (!CollectionUtils.isEmpty(shouldDelete)) {
+            StringBuilder indexBuffer = new StringBuilder();
+            index.forEach(name -> {
+                if (!shouldDelete.contains(name)) {
+                    indexBuffer.append(name).append(LINE_SEPARATOR);
+                }
+            });
+            write(indexFile, indexBuffer.toString(), false);
+        }
+    }
+
+    private boolean isExpiredFile(File file) throws IOException {
+        BasicFileAttributes attr = Files.readAttributes(file.toPath(), BasicFileAttributes.class);
+        Instant instant = attr.creationTime().toInstant();
+        return Timestamp.from(instant).getTime() < Timestamp.valueOf(LocalDateTime.now().minusDays(BINLOG_EXPIRE_DAYS)).getTime();
+    }
+
+    private void initPipeline() throws IOException {
+        pipeline = new BinlogPipeline(binlogFile, binlog.getPos());
+    }
+
+    private String createNewBinlogName(int index) {
+        return String.format("%s.%06d", BINLOG, index <= 0 ? 1 : index + 1);
+    }
+
+    private int getBinlogIndex(String binlogName) {
+        return NumberUtil.toInt(StringUtil.substring(binlogName, BINLOG.length() + 1));
+    }
+
+    /**
+     * 持久化增量点
+     *
+     * @throws IOException
+     */
+    public void flush() throws IOException {
+        FileUtils.writeStringToFile(configFile, JsonUtil.objToJson(binlog), DEFAULT_CHARSET);
+    }
+
+    public BinlogPipeline getBinlogPipeline() {
+        return pipeline;
+    }
+
+    private void write(File file, String line, boolean append) throws IOException {
+        FileUtils.write(file, line, DEFAULT_CHARSET, append);
+    }
+
+    @Override
+    public void close() {
+        pipeline.close();
+    }
+}

+ 23 - 21
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogPipeline.java

@@ -1,8 +1,10 @@
 package org.dbsyncer.storage.binlog;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.RandomAccessFile;
+import org.apache.commons.io.IOUtils;
+import org.dbsyncer.common.file.BufferedRandomAccessFile;
+import org.dbsyncer.storage.binlog.proto.BinlogMessage;
+
+import java.io.*;
 
 /**
  * @author AE86
@@ -11,42 +13,42 @@ import java.io.RandomAccessFile;
  */
 public class BinlogPipeline implements Closeable {
     private final RandomAccessFile raf;
+    private final OutputStream out;
     private final byte[] h = new byte[1];
     private byte[] b;
-    private long filePointer;
+    private long offset;
 
-    public BinlogPipeline(RandomAccessFile raf) {
-        this.raf = raf;
+    public BinlogPipeline(File binlogFile, long pos) throws IOException {
+        this.raf = new BufferedRandomAccessFile(binlogFile, "r");
+        this.out = new FileOutputStream(binlogFile, true);
+        raf.seek(pos);
     }
 
     public byte[] readLine() throws IOException {
-        this.filePointer = raf.getFilePointer();
-        if (filePointer >= raf.length()) {
+        this.offset = raf.getFilePointer();
+        if (offset >= raf.length()) {
             return null;
         }
         raf.read(h);
         b = new byte[Byte.toUnsignedInt(h[0])];
         raf.read(b);
-        raf.seek(this.filePointer + (h.length + b.length));
+        raf.seek(this.offset + (h.length + b.length));
         return b;
     }
 
-    public RandomAccessFile getRaf() {
-        return raf;
+    public void write(BinlogMessage message) throws IOException {
+        if(null != message){
+            message.writeDelimitedTo(out);
+        }
     }
 
-    public long getFilePointer() {
-        return filePointer;
+    public long getOffset() {
+        return offset;
     }
 
     @Override
-    public void close() throws IOException {
-        if(null != raf){
-            raf.close();
-        }
-    }
-
-    public void write(byte[] bytes) throws IOException {
-        raf.write(bytes);
+    public void close() {
+        IOUtils.closeQuietly(out);
+        IOUtils.closeQuietly(raf);
     }
 }

+ 3 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogRecorder.java

@@ -2,6 +2,8 @@ package org.dbsyncer.storage.binlog;
 
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 
+import java.io.IOException;
+
 /**
  * @author AE86
  * @version 1.0.0
@@ -14,6 +16,6 @@ public interface BinlogRecorder {
      *
      * @param message
      */
-    void flush(BinlogMessage message);
+    void flush(BinlogMessage message) throws IOException;
 
 }