AE86 2 سال پیش
والد
کامیت
c165b262a1
1فایلهای تغییر یافته به همراه61 افزوده شده و 52 حذف شده
  1. 61 52
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

+ 61 - 52
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -25,8 +25,7 @@ import java.nio.file.attribute.BasicFileAttributes;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDateTime;
-import java.util.Collection;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -43,7 +42,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
     @Autowired
     private ScheduledTaskService scheduledTaskService;
 
-    private static final long BINLOG_MAX_SIZE = 512 * 1024 * 1024;
+    private static final long BINLOG_MAX_SIZE = 256 * 1024 * 1024;
 
     private static final int BINLOG_EXPIRE_DAYS = 7;
 
@@ -67,10 +66,12 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
     private String path;
 
-    private File configPath;
+    private File configFile;
 
     private File binlogFile;
 
+    private File indexFile;
+
     private Binlog binlog;
 
     private BinlogPipeline pipeline;
@@ -89,7 +90,24 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         }
 
         // binlog.config
-        configPath = new File(path + BINLOG_CONFIG);
+        configFile = new File(path + BINLOG_CONFIG);
+        // binlog.index
+        indexFile = new File(path + BINLOG_INDEX);
+        if (!configFile.exists()) {
+            initBinlog(createBinlogName(0));
+            initBinlogIndex();
+        }
+        discardExpiredFile();
+
+        if (null == binlog) {
+            binlog = JsonUtil.jsonToObj(FileUtils.readFileToString(configFile, Charset.defaultCharset()), Binlog.class);
+        }
+        binlogFile = new File(path + binlog.getBinlog());
+        if (!binlogFile.exists()) {
+            logger.warn("The binlogFile '{}' is not exist.", binlogFile.getAbsolutePath());
+            initBinlog(createBinlogName(getBinlogIndex(binlog.getBinlog())));
+            initBinlogIndex();
+        }
 
         initPipeline();
         scheduledTaskService.start(PERIOD, this);
@@ -172,34 +190,51 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
         if (batchCounter.get() > 0) {
             binlog.setPos(pipeline.getFilePointer());
-            FileUtils.writeStringToFile(configPath, JsonUtil.objToJson(binlog), DEFAULT_CHARSET);
+            FileUtils.writeStringToFile(configFile, JsonUtil.objToJson(binlog), DEFAULT_CHARSET);
         }
     }
 
-    private void switchFile() {
-        // fixme 切换新文件
-    }
+    private void switchFile() throws IOException {
+        if(binlogFile.length() > BINLOG_MAX_SIZE){
+            // fixme 锁上下文
 
-    private void createNewBinlog() {
-        Collection<File> files = getBinlogIndexFileList();
-        int index = 1;
-        if (CollectionUtils.isEmpty(files)) {
-            for (File file : files) {
-                String binlogName = file.getName();
-                int i = getBinlogNameIndex(binlogName);
-                index = i > index ? i : index;
+            List<String> list = FileUtils.readLines(indexFile, DEFAULT_CHARSET);
+            int index = 1;
+            if (!CollectionUtils.isEmpty(list)) {
+                index = getBinlogIndex(list.get(list.size() - 1));
             }
+            String newBinlogName = createBinlogName(index);
+            FileUtils.write(indexFile, newBinlogName + LINE_SEPARATOR, DEFAULT_CHARSET, true);
+            IOUtils.closeQuietly(pipeline);
+            initBinlog(newBinlogName);
+            initPipeline();
         }
     }
 
     private void discardExpiredFile() throws IOException {
-        Collection<File> files = getBinlogIndexFileList();
-        if (CollectionUtils.isEmpty(files)) {
-            for (File file : files) {
+        List<String> index = FileUtils.readLines(indexFile, DEFAULT_CHARSET);
+        if (!CollectionUtils.isEmpty(index)) {
+            Set<String> shouldDelete = new HashSet<>();
+            for (String i : index) {
+                File file = new File(path + i);
+                if (!file.exists()) {
+                    shouldDelete.add(i);
+                    continue;
+                }
                 if (isExpiredFile(file)) {
                     FileUtils.forceDelete(file);
+                    shouldDelete.add(i);
                 }
             }
+            if (!CollectionUtils.isEmpty(shouldDelete)) {
+                StringBuilder indexBuffer = new StringBuilder();
+                index.forEach(i -> {
+                    if (!shouldDelete.contains(i)) {
+                        indexBuffer.append(i).append(LINE_SEPARATOR);
+                    }
+                });
+                FileUtils.writeStringToFile(indexFile, indexBuffer.toString(), DEFAULT_CHARSET);
+            }
         }
     }
 
@@ -209,58 +244,32 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         return Timestamp.from(instant).getTime() < Timestamp.valueOf(LocalDateTime.now().minusDays(BINLOG_EXPIRE_DAYS)).getTime();
     }
 
-    private Collection<File> getBinlogIndexFileList() {
-        File dir = new File(path + BINLOG_CONFIG);
-        OrFileFilter filter = new OrFileFilter();
-        filter.addFileFilter(FileFilterUtils.nameFileFilter(BINLOG_INDEX));
-        filter.addFileFilter(FileFilterUtils.nameFileFilter(BINLOG_CONFIG));
-        Collection<File> files = FileUtils.listFiles(dir, FileFilterUtils.notFileFilter(filter), null);
-        return files;
-    }
-
     private void initBinlog(String binlogName) throws IOException {
         // binlog.config
         binlog = new Binlog().setBinlog(binlogName);
-        FileUtils.writeStringToFile(configPath, JsonUtil.objToJson(binlog), DEFAULT_CHARSET);
+        FileUtils.writeStringToFile(configFile, JsonUtil.objToJson(binlog), DEFAULT_CHARSET);
 
         // binlog.000001
         binlogFile = new File(path + binlogName);
         FileUtils.writeStringToFile(binlogFile, "", DEFAULT_CHARSET);
+    }
 
+    private void initBinlogIndex() throws IOException {
         // binlog.index
-        FileUtils.writeStringToFile(new File(path + BINLOG_INDEX), binlogName + LINE_SEPARATOR, DEFAULT_CHARSET);
+        FileUtils.writeStringToFile(indexFile, binlog.getBinlog() + LINE_SEPARATOR, DEFAULT_CHARSET);
     }
 
     private void initPipeline() throws IOException {
-        if (!configPath.exists()) {
-            initBinlog(createBinlogName(1));
-        }
-        discardExpiredFile();
-        createNewBinlog();
-
-        if (null == binlog) {
-            binlog = JsonUtil.jsonToObj(FileUtils.readFileToString(configPath, Charset.defaultCharset()), Binlog.class);
-        }
-        binlogFile = new File(path + binlog.getBinlog());
-        if (!binlogFile.exists()) {
-            logger.warn("The binlogFile '{}' is not exist.", binlogFile.getAbsolutePath());
-            initBinlog(createBinlogName(binlog.getBinlog()));
-        }
-
         final RandomAccessFile raf = new BufferedRandomAccessFile(binlogFile, "rwd");
         raf.seek(binlog.getPos());
         pipeline = new BinlogPipeline(raf);
     }
 
     private String createBinlogName(int index) {
-        return String.format("%s.%06d", BINLOG, index <= 0 ? 1 : index);
-    }
-
-    private String createBinlogName(String binlogName) {
-        return createBinlogName(getBinlogNameIndex(binlogName) + 1);
+        return String.format("%s.%06d", BINLOG, index <= 0 ? 1 : index + 1);
     }
 
-    private int getBinlogNameIndex(String binlogName) {
+    private int getBinlogIndex(String binlogName) {
         return NumberUtil.toInt(StringUtil.substring(binlogName, BINLOG.length() + 1));
     }