|
@@ -218,7 +218,7 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
|
|
|
write(configFile, JsonUtil.objToJson(config), false);
|
|
|
pipeline.setBinlogReader(new BinlogReader(path, startBinlogIndex, config.getPosition()));
|
|
|
binlogReader.stop();
|
|
|
- logger.info("Switch to new file {}.", newBinlogName);
|
|
|
+ logger.info("Switch to new index file '{}' for binlog reader.", newBinlogName);
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
@@ -241,9 +241,11 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
|
|
|
if (file.length() > BINLOG_MAX_SIZE || isExpiredFile(file)) {
|
|
|
final BinlogWriter binlogWriter = pipeline.getBinlogWriter();
|
|
|
String newBinlogName = createNewBinlogName(writerFileName);
|
|
|
- logger.info("The file size has reached {}MB, exceeding the limit of {}MB, switching to a new file {}.", getMB(file.length()), getMB(BINLOG_MAX_SIZE), newBinlogName);
|
|
|
+ logger.info("The size of index file '{}' has reached {}MB, exceeding the limit of {}MB, switching to a new index file '{}' for index writer.", writerFileName, getMB(file.length()), getMB(BINLOG_MAX_SIZE), newBinlogName);
|
|
|
write(indexFile, newBinlogName + LINE_SEPARATOR, true);
|
|
|
- BinlogIndex newBinlogIndex = new BinlogIndex(newBinlogName, getFileCreateDateTime(new File(path + newBinlogName)));
|
|
|
+ File binlogIndexFile = new File(path + newBinlogName);
|
|
|
+ write(binlogIndexFile, "", false);
|
|
|
+ BinlogIndex newBinlogIndex = new BinlogIndex(newBinlogName, getFileCreateDateTime(binlogIndexFile));
|
|
|
indexList.add(newBinlogIndex);
|
|
|
pipeline.setBinlogWriter(new BinlogWriter(path, newBinlogIndex));
|
|
|
binlogWriter.stop();
|
|
@@ -257,6 +259,7 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
|
|
|
// 有锁 & 读写状态关闭 & 30s未用
|
|
|
if (!next.isFreeLock() && !next.isRunning() && next.getUpdateTime().isBefore(LocalDateTime.now().minusSeconds(BINLOG_ACTUATOR_CLOSE_DELAYED_SECONDS))) {
|
|
|
next.removeAllLock();
|
|
|
+ logger.info("Close free index file '{}', the last update time is {}", next.getFileName(), next.getUpdateTime());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -266,8 +269,12 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
|
|
|
while (iterator.hasNext()) {
|
|
|
BinlogIndex next = iterator.next();
|
|
|
// 无锁 & 过期
|
|
|
- if (next.isFreeLock() && isExpiredFile(new File(next.getFileName()))) {
|
|
|
- iterator.remove();
|
|
|
+ if (next.isFreeLock()) {
|
|
|
+ File file = new File(path + next.getFileName());
|
|
|
+ if(isExpiredFile(file)){
|
|
|
+ FileUtils.forceDelete(file);
|
|
|
+ iterator.remove();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -281,7 +288,10 @@ public class BinlogContext implements ScheduledTaskJob, Closeable {
|
|
|
List<String> indexNames = FileUtils.readLines(indexFile, DEFAULT_CHARSET);
|
|
|
if (!CollectionUtils.isEmpty(indexNames)) {
|
|
|
for (String indexName : indexNames) {
|
|
|
- indexList.add(new BinlogIndex(indexName, getFileCreateDateTime(new File(path + indexName))));
|
|
|
+ File file = new File(path + indexName);
|
|
|
+ if(file.exists()){
|
|
|
+ indexList.add(new BinlogIndex(indexName, getFileCreateDateTime(file)));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|