1
0
Эх сурвалжийг харах

实现binlog索引文件读取

AE86 2 жил өмнө
parent
commit
4db554147e

+ 17 - 6
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogActuator.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.storage.binlog;
 
+import org.dbsyncer.storage.enums.BinlogStatusEnum;
 import org.dbsyncer.storage.model.BinlogIndex;
 
 import java.time.LocalDateTime;
@@ -8,20 +9,30 @@ public abstract class AbstractBinlogActuator implements BinlogActuator {
 
     private BinlogIndex binlogIndex;
 
-    @Override
-    public void initBinlogIndex(BinlogIndex binlogIndex) {
+    private BinlogStatusEnum status = BinlogStatusEnum.RUNNING;
+
+    protected void initBinlogIndex(BinlogIndex binlogIndex) {
         binlogIndex.addLock(this);
         this.binlogIndex = binlogIndex;
     }
 
-    @Override
-    public void refreshBinlogIndexUpdateTime() {
+    protected void refreshBinlogIndexUpdateTime() {
         binlogIndex.setUpdateTime(LocalDateTime.now());
     }
 
     @Override
-    public BinlogIndex getBinlogIndex() {
-        return binlogIndex;
+    public String getFileName() {
+        return binlogIndex.getFileName();
+    }
+
+    @Override
+    public boolean isRunning() {
+        return status == BinlogStatusEnum.RUNNING;
+    }
+
+    @Override
+    public void stop() {
+        this.status = BinlogStatusEnum.STOP;
     }
 
 }

+ 16 - 5
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogActuator.java

@@ -1,7 +1,5 @@
 package org.dbsyncer.storage.binlog;
 
-import org.dbsyncer.storage.model.BinlogIndex;
-
 import java.io.Closeable;
 
 /**
@@ -11,9 +9,22 @@ import java.io.Closeable;
  */
 public interface BinlogActuator extends Closeable {
 
-    void initBinlogIndex(BinlogIndex binlogIndex);
+    /**
+     * 获取索引文件名
+     *
+     * @return
+     */
+    String getFileName();
 
-    void refreshBinlogIndexUpdateTime();
+    /**
+     * 状态是否为运行中
+     *
+     * @return
+     */
+    boolean isRunning();
 
-    BinlogIndex getBinlogIndex();
+    /**
+     * 标记为停止状态
+     */
+    void stop();
 }

+ 120 - 45
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogContext.java

@@ -7,6 +7,7 @@ import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.storage.binlog.impl.BinlogPipeline;
+import org.dbsyncer.storage.binlog.impl.BinlogReader;
 import org.dbsyncer.storage.binlog.impl.BinlogWriter;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.model.BinlogConfig;
@@ -26,11 +27,27 @@ import java.time.ZoneId;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
+/**
+ * <p>组件介绍</p>
+ * <ol>
+ *     <li>BinlogPipeline(提供文件流读写)</li>
+ *     <li>定时器(维护索引文件状态,回收文件流)</li>
+ *     <li>索引</li>
+ * </ol>
+ * <p>定时器</p>
+ * <ol>
+ *     <li>生成新索引(超过限制大小|过期)</li>
+ *     <li>关闭索引流(有锁 & 读写状态关闭 & 30s未用)</li>
+ *     <li>删除旧索引(无锁 & 过期)</li>
+ * </ol>
+ *
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/6/29 1:28
+ */
 public class BinlogContext implements ScheduledTaskJob, Closeable {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
@@ -39,6 +56,8 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
 
     private static final int BINLOG_EXPIRE_DAYS = 7;
 
+    private static final int BINLOG_ACTUATOR_CLOSE_DELAYED_SECONDS = 30;
+
     private static final String LINE_SEPARATOR = System.lineSeparator();
 
     private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();
@@ -59,6 +78,8 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
 
     private final BinlogPipeline pipeline;
 
+    private final Lock readerLock = new ReentrantLock(true);
+
     private final Lock lock = new ReentrantLock(true);
 
     private volatile boolean running;
@@ -82,12 +103,12 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
         configFile = new File(path + BINLOG_CONFIG);
         if (!configFile.exists()) {
             // binlog.000001
-            config = initBinlogConfig(createNewBinlogName(0));
+            config = initBinlogConfigAndIndex(createNewBinlogName(0));
         }
 
         // read index
         Assert.isTrue(indexFile.exists(), String.format("The index file '%s' is not exist.", indexFile.getName()));
-        readIndex();
+        readIndexFromDisk();
 
         // delete index file
         deleteExpiredIndexFile();
@@ -100,19 +121,22 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
         // no index
         if (CollectionUtils.isEmpty(indexList)) {
             // binlog.000002
-            config = initBinlogConfig(createNewBinlogName(getBinlogIndex(config.getFileName())));
-            readIndex();
+            config = initBinlogConfigAndIndex(createNewBinlogName(config.getFileName()));
+            readIndexFromDisk();
         }
 
         // 配置文件已失效,取最早的索引文件
-        BinlogIndex binlogIndex = getBinlogIndexByName(config.getFileName());
-        if (null == binlogIndex) {
+        BinlogIndex startBinlogIndex = getBinlogIndexByFileName(config.getFileName());
+        if (null == startBinlogIndex) {
             logger.warn("The binlog file '{}' is expired.", config.getFileName());
-            config = new BinlogConfig().setFileName(config.getFileName());
+            startBinlogIndex = indexList.get(0);
+            config = new BinlogConfig().setFileName(startBinlogIndex.getFileName());
             write(configFile, JsonUtil.objToJson(config), false);
         }
 
-        pipeline = new BinlogPipeline(this);
+        final BinlogWriter binlogWriter = new BinlogWriter(path, indexList.get(indexList.size() - 1));
+        final BinlogReader binlogReader = new BinlogReader(path, startBinlogIndex, config.getPosition());
+        pipeline = new BinlogPipeline(binlogWriter, binlogReader);
         logger.info("BinlogContext initialized with config:{}", JsonUtil.objToJson(config));
     }
 
@@ -128,7 +152,9 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
             locked = binlogLock.tryLock();
             if (locked) {
                 running = true;
-                doCheck();
+                createNewBinlogIndex();
+                closeFreeBinlogIndex();
+                deleteOldBinlogIndex();
             }
         } catch (Exception e) {
             logger.error(e.getMessage());
@@ -152,30 +178,97 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
     }
 
     public byte[] readLine() throws IOException {
-        return pipeline.readLine();
+        byte[] line = pipeline.readLine();
+        if(null == line){
+            switchNextBinlogIndex();
+        }
+        return line;
     }
 
     public void write(BinlogMessage message) throws IOException {
         pipeline.write(message);
     }
 
-    /**
-     * <p>1. 生成新索引(超过限制大小 | 过期)</p>
-     * <p>2. 关闭索引流(状态运行 & 无锁 & 30s未用)</p>
-     * <p>3. 删除旧索引(状态关闭 & 过期)</p>
-     */
-    private void doCheck() throws IOException {
-        createNewBinlogIndex();
+    public BinlogIndex getBinlogIndexByFileName(String fileName) {
+        BinlogIndex index = null;
+        for (BinlogIndex binlogIndex : indexList) {
+            if (StringUtil.equals(binlogIndex.getFileName(), fileName)) {
+                index = binlogIndex;
+                break;
+            }
+        }
+        return index;
+    }
+
+    private void switchNextBinlogIndex() {
+        // 有新索引文件
+        if(!isCreatedNewBinlogIndex()){
+            return;
+        }
+        boolean locked = false;
+        try {
+            locked = readerLock.tryLock();
+            if (locked) {
+                // 有新索引文件
+                if(isCreatedNewBinlogIndex()){
+                    String newBinlogName = createNewBinlogName(pipeline.getReaderFileName());
+                    BinlogIndex startBinlogIndex = getBinlogIndexByFileName(newBinlogName);
+                    final BinlogReader binlogReader = pipeline.getBinlogReader();
+                    config = new BinlogConfig().setFileName(startBinlogIndex.getFileName());
+                    write(configFile, JsonUtil.objToJson(config), false);
+                    pipeline.setBinlogReader(new BinlogReader(path, startBinlogIndex, config.getPosition()));
+                    binlogReader.stop();
+                    logger.info("Switch to new file {}.", newBinlogName);
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        } finally {
+            if (locked) {
+                readerLock.unlock();
+            }
+        }
+    }
+
+    private boolean isCreatedNewBinlogIndex() {
+        return !StringUtil.equals(pipeline.getReaderFileName(), pipeline.getWriterFileName());
     }
 
     private void createNewBinlogIndex() throws IOException {
         final String writerFileName = pipeline.getWriterFileName();
         File file = new File(path + writerFileName);
+        // 超过限制大小|过期
         if (file.length() > BINLOG_MAX_SIZE || isExpiredFile(file)) {
-            String newBinlogName = createNewBinlogName(getBinlogIndex(writerFileName));
-            logger.info("文件大小已达到{}MB, 超过限制{}MB, 准备切换新文件{}.", getMB(file.length()), getMB(BINLOG_MAX_SIZE), newBinlogName);
-            indexList.add(new BinlogIndex(newBinlogName, getFileCreateDateTime(new File(path + newBinlogName))));
-            pipeline.setBinlogWriter(new BinlogWriter(path, getLastBinlogIndex()));
+            final BinlogWriter binlogWriter = pipeline.getBinlogWriter();
+            String newBinlogName = createNewBinlogName(writerFileName);
+            logger.info("The file size has reached {}MB, exceeding the limit of {}MB, switching to a new file {}.", getMB(file.length()), getMB(BINLOG_MAX_SIZE), newBinlogName);
+            write(indexFile, newBinlogName + LINE_SEPARATOR, true);
+            BinlogIndex newBinlogIndex = new BinlogIndex(newBinlogName, getFileCreateDateTime(new File(path + newBinlogName)));
+            indexList.add(newBinlogIndex);
+            pipeline.setBinlogWriter(new BinlogWriter(path, newBinlogIndex));
+            binlogWriter.stop();
+        }
+    }
+
+    private void closeFreeBinlogIndex() throws IOException {
+        Iterator<BinlogIndex> iterator = indexList.iterator();
+        while (iterator.hasNext()) {
+            BinlogIndex next = iterator.next();
+            // 有锁 & 读写状态关闭 & 30s未用
+            if (!next.isFreeLock() && !next.isRunning() && next.getUpdateTime().isBefore(LocalDateTime.now().minusSeconds(BINLOG_ACTUATOR_CLOSE_DELAYED_SECONDS))) {
+                next.removeAllLock();
+            }
+        }
+    }
+
+    private void deleteOldBinlogIndex() throws IOException {
+        Iterator<BinlogIndex> iterator = indexList.iterator();
+        while (iterator.hasNext()) {
+            BinlogIndex next = iterator.next();
+            // 无锁 & 过期
+            if (next.isFreeLock() && isExpiredFile(new File(next.getFileName()))) {
+                iterator.remove();
+            }
         }
     }
 
@@ -183,7 +276,7 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
         return size / 1024 / 1024;
     }
 
-    private void readIndex() throws IOException {
+    private void readIndexFromDisk() throws IOException {
         indexList.clear();
         List<String> indexNames = FileUtils.readLines(indexFile, DEFAULT_CHARSET);
         if (!CollectionUtils.isEmpty(indexNames)) {
@@ -193,7 +286,7 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
         }
     }
 
-    private BinlogConfig initBinlogConfig(String binlogName) throws IOException {
+    private BinlogConfig initBinlogConfigAndIndex(String binlogName) throws IOException {
         BinlogConfig config = new BinlogConfig().setFileName(binlogName);
         write(configFile, JsonUtil.objToJson(config), false);
         write(indexFile, binlogName + LINE_SEPARATOR, false);
@@ -248,29 +341,11 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
         return String.format("%s.%06d", BINLOG, index % 999999 + 1);
     }
 
-    private int getBinlogIndex(String binlogName) {
-        return NumberUtil.toInt(StringUtil.substring(binlogName, BINLOG.length() + 1));
+    private String createNewBinlogName(String binlogName) {
+        return createNewBinlogName(NumberUtil.toInt(StringUtil.substring(binlogName, BINLOG.length() + 1)));
     }
 
     private void write(File file, String line, boolean append) throws IOException {
         FileUtils.write(file, line, DEFAULT_CHARSET, append);
     }
-
-    public BinlogIndex getLastBinlogIndex() {
-        return indexList.get(indexList.size() - 1);
-    }
-
-    public BinlogIndex getBinlogIndexByName(String fileName) {
-        Map<String, BinlogIndex> binlogIndex = indexList.stream().collect(Collectors.toMap(BinlogIndex::getFileName, i -> i, (k1, k2) -> k1));
-        return binlogIndex.get(fileName);
-    }
-
-    public String getPath() {
-        return path;
-    }
-
-    public BinlogConfig getConfig() {
-        return config;
-    }
-
 }

+ 14 - 13
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogPipeline.java

@@ -1,9 +1,6 @@
 package org.dbsyncer.storage.binlog.impl;
 
-import org.dbsyncer.storage.binlog.BinlogContext;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
-import org.dbsyncer.storage.model.BinlogConfig;
-import org.dbsyncer.storage.model.BinlogIndex;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -14,23 +11,19 @@ import java.io.IOException;
  * @date 2022/6/19 23:36
  */
 public class BinlogPipeline implements Closeable {
-    private final BinlogContext context;
     private BinlogWriter binlogWriter;
     private BinlogReader binlogReader;
 
-    public BinlogPipeline(BinlogContext context) throws IOException {
-        this.context = context;
-        this.binlogWriter = new BinlogWriter(context.getPath(), context.getLastBinlogIndex());
-        final BinlogConfig config = context.getConfig();
-        final BinlogIndex startIndex = context.getBinlogIndexByName(config.getFileName());
-        this.binlogReader = new BinlogReader(context.getPath(), startIndex, config.getPosition());
+    public BinlogPipeline(BinlogWriter binlogWriter, BinlogReader binlogReader) {
+        this.binlogWriter = binlogWriter;
+        this.binlogReader = binlogReader;
     }
 
     public void write(BinlogMessage message) throws IOException {
         binlogWriter.write(message);
     }
 
-    public byte[] readLine() throws IOException{
+    public byte[] readLine() throws IOException {
         return binlogReader.readLine();
     }
 
@@ -39,11 +32,11 @@ public class BinlogPipeline implements Closeable {
     }
 
     public String getReaderFileName() {
-        return binlogReader.getBinlogIndex().getFileName();
+        return binlogReader.getFileName();
     }
 
     public String getWriterFileName() {
-        return binlogWriter.getBinlogIndex().getFileName();
+        return binlogWriter.getFileName();
     }
 
     @Override
@@ -52,10 +45,18 @@ public class BinlogPipeline implements Closeable {
         binlogReader.close();
     }
 
+    public BinlogWriter getBinlogWriter() {
+        return binlogWriter;
+    }
+
     public void setBinlogWriter(BinlogWriter binlogWriter) {
         this.binlogWriter = binlogWriter;
     }
 
+    public BinlogReader getBinlogReader() {
+        return binlogReader;
+    }
+
     public void setBinlogReader(BinlogReader binlogReader) {
         this.binlogReader = binlogReader;
     }

+ 3 - 3
dbsyncer-storage/src/main/java/org/dbsyncer/storage/enums/BinlogStatusEnum.java

@@ -3,7 +3,7 @@ package org.dbsyncer.storage.enums;
 /**
  * @author AE86
  * @version 1.0.0
- * @date 2019/11/16 20:31
+ * @date 2022/6/29 20:31
  */
 public enum BinlogStatusEnum {
 
@@ -12,7 +12,7 @@ public enum BinlogStatusEnum {
      */
     RUNNING,
     /**
-     * 关闭
+     * 停止
      */
-    CLOSED;
+    STOP
 }

+ 20 - 29
dbsyncer-storage/src/main/java/org/dbsyncer/storage/model/BinlogIndex.java

@@ -1,10 +1,11 @@
 package org.dbsyncer.storage.model;
 
 import org.dbsyncer.storage.binlog.BinlogActuator;
-import org.dbsyncer.storage.enums.BinlogStatusEnum;
 
+import java.io.IOException;
 import java.time.LocalDateTime;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 
 /**
@@ -14,61 +15,51 @@ import java.util.Set;
  */
 public class BinlogIndex {
     private String fileName;
-    private BinlogStatusEnum status;
     private Set<BinlogActuator> lock;
     private LocalDateTime createTime;
     private LocalDateTime updateTime;
 
     public BinlogIndex(String fileName, LocalDateTime createTime) {
         this.fileName = fileName;
-        this.status = BinlogStatusEnum.CLOSED;
         this.lock = new HashSet<>();
         this.createTime = createTime;
         this.updateTime = LocalDateTime.now();
     }
 
     public void addLock(BinlogActuator binlogActuator) {
-        synchronized (lock){
-            this.lock.add(binlogActuator);
-            this.status = BinlogStatusEnum.RUNNING;
-        }
+        this.lock.add(binlogActuator);
     }
 
-    public void removeLock(BinlogActuator binlogActuator) {
-        synchronized (lock) {
-            this.lock.remove(binlogActuator);
-            this.status = lock.isEmpty() ? BinlogStatusEnum.CLOSED : status;
+    public void removeAllLock() throws IOException {
+        Iterator<BinlogActuator> iterator = lock.iterator();
+        while (iterator.hasNext()){
+            BinlogActuator next = iterator.next();
+            next.close();
+            iterator.remove();
         }
     }
 
-    public String getFileName() {
-        return fileName;
-    }
-
-    public void setFileName(String fileName) {
-        this.fileName = fileName;
-    }
-
-    public BinlogStatusEnum getStatus() {
-        return status;
+    public boolean isRunning() {
+        for (BinlogActuator actuator : lock){
+            if(actuator.isRunning()){
+                return true;
+            }
+        }
+        return false;
     }
 
-    public void setStatus(BinlogStatusEnum status) {
-        this.status = status;
+    public boolean isFreeLock() {
+        return lock.isEmpty();
     }
 
-    public Set<BinlogActuator> getLock() {
-        return lock;
+    public String getFileName() {
+        return fileName;
     }
 
     public LocalDateTime getCreateTime() {
         return createTime;
     }
 
-    public void setCreateTime(LocalDateTime createTime) {
-        this.createTime = createTime;
-    }
-
     public LocalDateTime getUpdateTime() {
         return updateTime;
     }