AE86 2 years ago
parent
commit
69b6409f4d

+ 22 - 20
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogContext.java

@@ -49,22 +49,22 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
 
     private static final String BINLOG_CONFIG = BINLOG + ".config";
 
-    private List<BinlogIndex> indexList = new LinkedList<>();
+    private final List<BinlogIndex> indexList = new LinkedList<>();
 
-    private String path;
+    private final String path;
 
-    private File configFile;
+    private final File configFile;
 
-    private File indexFile;
+    private final File indexFile;
 
-    private BinlogConfig config;
-
-    private BinlogPipeline pipeline;
+    private final BinlogPipeline pipeline;
 
     private final Lock lock = new ReentrantLock(true);
 
     private volatile boolean running;
 
+    private BinlogConfig config;
+
     public BinlogContext(String taskName) throws IOException {
         path = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar)
                 .append("data").append(File.separatorChar)
@@ -108,7 +108,7 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
         BinlogIndex binlogIndex = getBinlogIndexByName(config.getFileName());
         if (null == binlogIndex) {
             logger.warn("The binlog file '{}' is expired.", config.getFileName());
-            config = new BinlogConfig().setFileName(binlogIndex.getFileName());
+            config = new BinlogConfig().setFileName(config.getFileName());
             write(configFile, JsonUtil.objToJson(config), false);
         }
 
@@ -146,8 +146,8 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
     }
 
     public void flush() throws IOException {
-        config.setFileName(pipeline.getFileName());
-        config.setPosition(pipeline.getOffset());
+        config.setFileName(pipeline.getReaderFileName());
+        config.setPosition(pipeline.getReaderOffset());
         write(configFile, JsonUtil.objToJson(config), false);
     }
 
@@ -165,28 +165,30 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
      * <p>3. 删除旧索引(状态关闭 & 过期)</p>
      */
     private void doCheck() throws IOException {
-        // TODO Try to save the world ..
-        logger.info("test");
         createNewBinlogIndex();
     }
 
     private void createNewBinlogIndex() throws IOException {
-        BinlogWriter binlogWriter = pipeline.getBinlogWriter();
-        BinlogIndex lastIndex = binlogWriter.getBinlogIndex();
-        File file = new File(path + lastIndex.getFileName());
-        long length = file.length();
-        if(length > BINLOG_MAX_SIZE || isExpiredFile(file)){
-            
+        final String writerFileName = pipeline.getWriterFileName();
+        File file = new File(path + writerFileName);
+        if (file.length() > BINLOG_MAX_SIZE || isExpiredFile(file)) {
+            String newBinlogName = createNewBinlogName(getBinlogIndex(writerFileName));
+            logger.info("文件大小已达到{}MB, 超过限制{}MB, 准备切换新文件{}.", getMB(file.length()), getMB(BINLOG_MAX_SIZE), newBinlogName);
+            indexList.add(new BinlogIndex(newBinlogName, getFileCreateDateTime(new File(path + newBinlogName))));
+            pipeline.setBinlogWriter(new BinlogWriter(path, getLastBinlogIndex()));
         }
     }
 
+    private long getMB(long size) {
+        return size / 1024 / 1024;
+    }
+
     private void readIndex() throws IOException {
         indexList.clear();
         List<String> indexNames = FileUtils.readLines(indexFile, DEFAULT_CHARSET);
         if (!CollectionUtils.isEmpty(indexNames)) {
             for (String indexName : indexNames) {
-                LocalDateTime createTime = getFileCreateDateTime(new File(path + indexName)).atZone(ZoneId.systemDefault()).toLocalDateTime();
-                indexList.add(new BinlogIndex(indexName, createTime));
+                indexList.add(new BinlogIndex(indexName, getFileCreateDateTime(new File(path + indexName))));
             }
         }
     }

+ 8 - 12
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogPipeline.java

@@ -34,12 +34,16 @@ public class BinlogPipeline implements Closeable {
         return binlogReader.readLine();
     }
 
-    public String getFileName() {
-        return binlogReader.getFileName();
+    public long getReaderOffset() {
+        return binlogReader.getOffset();
     }
 
-    public long getOffset() {
-        return binlogReader.getOffset();
+    public String getReaderFileName() {
+        return binlogReader.getBinlogIndex().getFileName();
+    }
+
+    public String getWriterFileName() {
+        return binlogWriter.getBinlogIndex().getFileName();
     }
 
     @Override
@@ -48,18 +52,10 @@ public class BinlogPipeline implements Closeable {
         binlogReader.close();
     }
 
-    public BinlogWriter getBinlogWriter() {
-        return binlogWriter;
-    }
-
     public void setBinlogWriter(BinlogWriter binlogWriter) {
         this.binlogWriter = binlogWriter;
     }
 
-    public BinlogReader getBinlogReader() {
-        return binlogReader;
-    }
-
     public void setBinlogReader(BinlogReader binlogReader) {
         this.binlogReader = binlogReader;
     }

+ 0 - 4
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/impl/BinlogReader.java

@@ -43,10 +43,6 @@ public class BinlogReader extends AbstractBinlogActuator {
         return offset;
     }
 
-    public String getFileName() {
-        return getBinlogIndex().getFileName();
-    }
-
     @Override
     public void close() {
         IOUtils.closeQuietly(raf);