AE86 2 лет назад
Родитель
Сommit
73a8428288

+ 2 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -94,7 +94,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
 
     @Override
     @Override
     public void flush(BinlogMessage message) throws IOException {
     public void flush(BinlogMessage message) throws IOException {
-        context.getBinlogPipeline().write(message);
+        context.write(message);
     }
     }
 
 
     @Override
     @Override
@@ -105,8 +105,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
     private void doParse() throws IOException {
     private void doParse() throws IOException {
         byte[] line;
         byte[] line;
         AtomicInteger batchCounter = new AtomicInteger();
         AtomicInteger batchCounter = new AtomicInteger();
-        final BinlogPipeline pipeline = context.getBinlogPipeline();
-        while (batchCounter.get() < MAX_BATCH_COUNT && null != (line = pipeline.readLine())) {
+        while (batchCounter.get() < MAX_BATCH_COUNT && null != (line = context.readLine())) {
             deserialize(BinlogMessage.parseFrom(line));
             deserialize(BinlogMessage.parseFrom(line));
             // getQueue().offer(deserialize(message));
             // getQueue().offer(deserialize(message));
             batchCounter.getAndAdd(1);
             batchCounter.getAndAdd(1);

+ 10 - 10
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/Binlog.java

@@ -6,24 +6,24 @@ package org.dbsyncer.storage.binlog;
  * @date 2022/6/19 23:03
  * @date 2022/6/19 23:03
  */
  */
 public final class Binlog {
 public final class Binlog {
-    private String binlog;
-    private long pos = 0;
+    private String fileName;
+    private long position = 0;
 
 
-    public String getBinlog() {
-        return binlog;
+    public String getFileName() {
+        return fileName;
     }
     }
 
 
-    public Binlog setBinlog(String binlog) {
-        this.binlog = binlog;
+    public Binlog setFileName(String fileName) {
+        this.fileName = fileName;
         return this;
         return this;
     }
     }
 
 
-    public long getPos() {
-        return pos;
+    public long getPosition() {
+        return position;
     }
     }
 
 
-    public Binlog setPos(long pos) {
-        this.pos = pos;
+    public Binlog setPosition(long position) {
+        this.position = position;
         return this;
         return this;
     }
     }
 }
 }

+ 44 - 53
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogContext.java

@@ -5,6 +5,7 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
@@ -49,9 +50,7 @@ public class BinlogContext implements Closeable {
 
 
     private File indexFile;
     private File indexFile;
 
 
-    private File binlogFile;
-
-    private Binlog binlog;
+    private Binlog config;
 
 
     private BinlogPipeline pipeline;
     private BinlogPipeline pipeline;
 
 
@@ -72,67 +71,64 @@ public class BinlogContext implements Closeable {
         configFile = new File(path + BINLOG_CONFIG);
         configFile = new File(path + BINLOG_CONFIG);
         if (!configFile.exists()) {
         if (!configFile.exists()) {
             // binlog.000001
             // binlog.000001
-            binlog = new Binlog().setBinlog(createNewBinlogName(0));
-            write(configFile, JsonUtil.objToJson(binlog), false);
-            binlogFile = new File(path + binlog.getBinlog());
-            write(binlogFile, "", false);
-            write(indexFile, binlog.getBinlog() + LINE_SEPARATOR, false);
+            config = initBinlogConfig(createNewBinlogName(0));
         }
         }
 
 
         // read index
         // read index
         Assert.isTrue(indexFile.exists(), String.format("The index file '%s' is not exist.", indexFile.getName()));
         Assert.isTrue(indexFile.exists(), String.format("The index file '%s' is not exist.", indexFile.getName()));
-        List<String> indexFileNames = FileUtils.readLines(indexFile, DEFAULT_CHARSET);
-        Assert.notEmpty(indexFileNames, String.format("The index file '%s' is not empty.", indexFile.getName()));
-        index.addAll(indexFileNames);
+        index.addAll(FileUtils.readLines(indexFile, DEFAULT_CHARSET));
 
 
-        // expired file
-        deleteExpiredBinlogFile();
+        // delete index file
+        deleteExpiredIndexFile();
 
 
         // {"binlog":"binlog.000001","pos":0}
         // {"binlog":"binlog.000001","pos":0}
-        binlog = JsonUtil.jsonToObj(FileUtils.readFileToString(configFile, DEFAULT_CHARSET), Binlog.class);
-        binlogFile = new File(path + binlog.getBinlog());
-
-        if (!binlogFile.exists()) {
-            logger.warn("The binlog file '{}' is expired.", binlog.getBinlog());
+        if (null == config) {
+            config = JsonUtil.jsonToObj(FileUtils.readFileToString(configFile, DEFAULT_CHARSET), Binlog.class);
+        }
 
 
+        // no index
+        if (CollectionUtils.isEmpty(index)) {
             // binlog.000002
             // binlog.000002
-            binlog = new Binlog().setBinlog(createNewBinlogName(getBinlogIndex(binlog.getBinlog())));
-            write(configFile, JsonUtil.objToJson(binlog), false);
-            binlogFile = new File(path + binlog.getBinlog());
-            write(binlogFile, "", false);
-            write(indexFile, binlog.getBinlog() + LINE_SEPARATOR, true);
-            index.add(binlog.getBinlog());
+            config = initBinlogConfig(createNewBinlogName(getBinlogIndex(config.getFileName())));
+            index.addAll(FileUtils.readLines(indexFile, DEFAULT_CHARSET));
         }
         }
 
 
-        initPipeline();
+        // 配置文件已失效,取最早的索引文件
+        int indexOf = index.indexOf(config.getFileName());
+        if (-1 == indexOf) {
+            logger.warn("The binlog file '{}' is expired.", config.getFileName());
+            config = new Binlog().setFileName(index.get(0));
+            write(configFile, JsonUtil.objToJson(config), false);
+        }
+
+        pipeline = new BinlogPipeline(new File(path + config.getFileName()), config.getPosition());
+        logger.info("BinlogContext initialized with config:{}", JsonUtil.objToJson(config));
     }
     }
 
 
-    private void createNewBinlogFile() throws IOException {
-        if (binlogFile.length() > BINLOG_MAX_SIZE) {
-            int i = 1;
-            if (!CollectionUtils.isEmpty(index)) {
-                i = getBinlogIndex(index.get(index.size() - 1));
-            }
-            String newBinlogName = createNewBinlogName(i);
-            index.add(newBinlogName);
-            write(indexFile, newBinlogName + LINE_SEPARATOR, true);
-            // fixme 锁上下文
-            pipeline.close();
-            pipeline = new BinlogPipeline(binlogFile, binlog.getPos());
-        }
+    private Binlog initBinlogConfig(String binlogName) throws IOException {
+        Binlog config = new Binlog().setFileName(binlogName);
+        write(configFile, JsonUtil.objToJson(config), false);
+        write(indexFile, binlogName + LINE_SEPARATOR, false);
+        write(new File(path + binlogName), "", false);
+        return config;
     }
     }
 
 
-    private void deleteExpiredBinlogFile() throws IOException {
+    private void deleteExpiredIndexFile() throws IOException {
+        if (CollectionUtils.isEmpty(index)) {
+            return;
+        }
         Set<String> shouldDelete = new HashSet<>();
         Set<String> shouldDelete = new HashSet<>();
         for (String name : index) {
         for (String name : index) {
             File file = new File(path + name);
             File file = new File(path + name);
             if (!file.exists()) {
             if (!file.exists()) {
                 shouldDelete.add(name);
                 shouldDelete.add(name);
+                logger.info("Delete invalid binlog file '{}'.", name);
                 continue;
                 continue;
             }
             }
             if (isExpiredFile(file)) {
             if (isExpiredFile(file)) {
                 FileUtils.forceDelete(file);
                 FileUtils.forceDelete(file);
                 shouldDelete.add(name);
                 shouldDelete.add(name);
+                logger.info("Delete expired binlog file '{}'.", name);
             }
             }
         }
         }
         if (!CollectionUtils.isEmpty(shouldDelete)) {
         if (!CollectionUtils.isEmpty(shouldDelete)) {
@@ -149,10 +145,6 @@ public class BinlogContext implements Closeable {
         return Timestamp.from(instant).getTime() < Timestamp.valueOf(LocalDateTime.now().minusDays(BINLOG_EXPIRE_DAYS)).getTime();
         return Timestamp.from(instant).getTime() < Timestamp.valueOf(LocalDateTime.now().minusDays(BINLOG_EXPIRE_DAYS)).getTime();
     }
     }
 
 
-    private void initPipeline() throws IOException {
-        pipeline = new BinlogPipeline(new File(path + binlog.getBinlog()), binlog.getPos());
-    }
-
     private String createNewBinlogName(int index) {
     private String createNewBinlogName(int index) {
         return String.format("%s.%06d", BINLOG, index <= 0 ? 1 : index + 1);
         return String.format("%s.%06d", BINLOG, index <= 0 ? 1 : index + 1);
     }
     }
@@ -161,19 +153,18 @@ public class BinlogContext implements Closeable {
         return NumberUtil.toInt(StringUtil.substring(binlogName, BINLOG.length() + 1));
         return NumberUtil.toInt(StringUtil.substring(binlogName, BINLOG.length() + 1));
     }
     }
 
 
-    /**
-     * 持久化增量点
-     *
-     * @throws IOException
-     */
     public void flush() throws IOException {
     public void flush() throws IOException {
-        binlog.setBinlog(pipeline.getBinlogName());
-        binlog.setPos(pipeline.getOffset());
-        FileUtils.writeStringToFile(configFile, JsonUtil.objToJson(binlog), DEFAULT_CHARSET);
+        config.setFileName(pipeline.getBinlogName());
+        config.setPosition(pipeline.getOffset());
+        write(configFile, JsonUtil.objToJson(config), false);
+    }
+
+    public byte[] readLine() throws IOException {
+        return pipeline.readLine();
     }
     }
 
 
-    public BinlogPipeline getBinlogPipeline() {
-        return pipeline;
+    public void write(BinlogMessage message) throws IOException {
+        pipeline.write(message);
     }
     }
 
 
     private void write(File file, String line, boolean append) throws IOException {
     private void write(File file, String line, boolean append) throws IOException {

+ 6 - 6
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogPipeline.java

@@ -16,13 +16,13 @@ public class BinlogPipeline implements Closeable {
     private final OutputStream out;
     private final OutputStream out;
     private final byte[] h = new byte[1];
     private final byte[] h = new byte[1];
     private byte[] b;
     private byte[] b;
+    private File file;
     private long offset;
     private long offset;
-    private File binlogFile;
 
 
-    public BinlogPipeline(File binlogFile, long pos) throws IOException {
-        this.binlogFile = binlogFile;
-        this.raf = new BufferedRandomAccessFile(binlogFile, "r");
-        this.out = new FileOutputStream(binlogFile, true);
+    public BinlogPipeline(File file, long pos) throws IOException {
+        this.file = file;
+        this.raf = new BufferedRandomAccessFile(file, "r");
+        this.out = new FileOutputStream(file, true);
         raf.seek(pos);
         raf.seek(pos);
     }
     }
 
 
@@ -49,7 +49,7 @@ public class BinlogPipeline implements Closeable {
     }
     }
 
 
     public String getBinlogName() {
     public String getBinlogName() {
-        return binlogFile.getName();
+        return file.getName();
     }
     }
 
 
     @Override
     @Override

+ 1 - 1
dbsyncer-storage/src/main/test/BinlogMessageTest.java

@@ -39,7 +39,7 @@ public class BinlogMessageTest {
         File configPath = new File(path + "binlog.config");
         File configPath = new File(path + "binlog.config");
         String configJson = FileUtils.readFileToString(configPath, Charset.defaultCharset());
         String configJson = FileUtils.readFileToString(configPath, Charset.defaultCharset());
         Binlog binlog = JsonUtil.jsonToObj(configJson, Binlog.class);
         Binlog binlog = JsonUtil.jsonToObj(configJson, Binlog.class);
-        pipeline = new BinlogPipeline(new File(path + binlog.getBinlog()), binlog.getPos());
+        pipeline = new BinlogPipeline(new File(path + binlog.getFileName()), binlog.getPosition());
     }
     }
 
 
     @After
     @After