AE86 2 سال پیش
والد
کامیت
8313c2b76c
1فایلهای تغییر یافته به همراه25 افزوده شده و 29 حذف شده
  1. 25 29
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogContext.java

+ 25 - 29
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogContext.java

@@ -66,49 +66,48 @@ public class BinlogContext implements Closeable {
             FileUtils.forceMkdir(dir);
         }
 
+        // binlog.index
+        indexFile = new File(path + BINLOG_INDEX);
         // binlog.config
         configFile = new File(path + BINLOG_CONFIG);
         if (!configFile.exists()) {
             // binlog.000001
             binlog = new Binlog().setBinlog(createNewBinlogName(0));
             write(configFile, JsonUtil.objToJson(binlog), false);
-
             binlogFile = new File(path + binlog.getBinlog());
             write(binlogFile, "", false);
-        }
-        binlog = JsonUtil.jsonToObj(FileUtils.readFileToString(configFile, DEFAULT_CHARSET), Binlog.class);
-
-        // binlog.index
-        indexFile = new File(path + BINLOG_INDEX);
-        if (!indexFile.exists()) {
             write(indexFile, binlog.getBinlog() + LINE_SEPARATOR, false);
         }
 
-        // binlog.000001
+        // read index
+        Assert.isTrue(indexFile.exists(), String.format("The index file '%s' is not exist.", indexFile.getName()));
+        List<String> indexFileNames = FileUtils.readLines(indexFile, DEFAULT_CHARSET);
+        Assert.notEmpty(indexFileNames, String.format("The index file '%s' is not empty.", indexFile.getName()));
+        index.addAll(indexFileNames);
+
+        // expired file
+        deleteExpiredBinlogFile();
+
+        // {"binlog":"binlog.000001","pos":0}
+        binlog = JsonUtil.jsonToObj(FileUtils.readFileToString(configFile, DEFAULT_CHARSET), Binlog.class);
         binlogFile = new File(path + binlog.getBinlog());
+
         if (!binlogFile.exists()) {
-            // 过期/误删
-            logger.warn("The binlog file '{}' is expired or deleted.", binlog.getBinlog());
+            logger.warn("The binlog file '{}' is expired.", binlog.getBinlog());
 
             // binlog.000002
             binlog = new Binlog().setBinlog(createNewBinlogName(getBinlogIndex(binlog.getBinlog())));
+            write(configFile, JsonUtil.objToJson(binlog), false);
             binlogFile = new File(path + binlog.getBinlog());
             write(binlogFile, "", false);
             write(indexFile, binlog.getBinlog() + LINE_SEPARATOR, true);
+            index.add(binlog.getBinlog());
         }
 
-        // read index
-        List<String> indexFileNames = FileUtils.readLines(indexFile, DEFAULT_CHARSET);
-        Assert.notEmpty(indexFileNames, String.format("Could not read index from file '%s'", indexFile.getName()));
-        index.addAll(indexFileNames);
-
-        // expired file
-        discardExpiredFile();
-
         initPipeline();
     }
 
-    private void switchFile() throws IOException {
+    private void createNewBinlogFile() throws IOException {
         if (binlogFile.length() > BINLOG_MAX_SIZE) {
             int i = 1;
             if (!CollectionUtils.isEmpty(index)) {
@@ -116,14 +115,14 @@ public class BinlogContext implements Closeable {
             }
             String newBinlogName = createNewBinlogName(i);
             index.add(newBinlogName);
-            FileUtils.write(new File(path + BINLOG_INDEX), newBinlogName + LINE_SEPARATOR, DEFAULT_CHARSET, true);
+            write(indexFile, newBinlogName + LINE_SEPARATOR, true);
             // fixme 锁上下文
             pipeline.close();
             pipeline = new BinlogPipeline(binlogFile, binlog.getPos());
         }
     }
 
-    private void discardExpiredFile() throws IOException {
+    private void deleteExpiredBinlogFile() throws IOException {
         Set<String> shouldDelete = new HashSet<>();
         for (String name : index) {
             File file = new File(path + name);
@@ -137,13 +136,10 @@ public class BinlogContext implements Closeable {
             }
         }
         if (!CollectionUtils.isEmpty(shouldDelete)) {
-            StringBuilder indexBuffer = new StringBuilder();
-            index.forEach(name -> {
-                if (!shouldDelete.contains(name)) {
-                    indexBuffer.append(name).append(LINE_SEPARATOR);
-                }
-            });
-            write(indexFile, indexBuffer.toString(), false);
+            index.removeAll(shouldDelete);
+            StringBuilder indexBuilder = new StringBuilder();
+            index.forEach(name -> indexBuilder.append(name).append(LINE_SEPARATOR));
+            write(indexFile, indexBuilder.toString(), false);
         }
     }
 
@@ -154,7 +150,7 @@ public class BinlogContext implements Closeable {
     }
 
     private void initPipeline() throws IOException {
-        pipeline = new BinlogPipeline(binlogFile, binlog.getPos());
+        pipeline = new BinlogPipeline(new File(path + binlog.getBinlog()), binlog.getPos());
     }
 
     private String createNewBinlogName(int index) {