|
@@ -3,11 +3,14 @@ package org.dbsyncer.storage.binlog;
|
|
|
import org.apache.commons.io.FileUtils;
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
import org.dbsyncer.common.file.BufferedRandomAccessFile;
|
|
|
+import org.dbsyncer.common.scheduled.ScheduledTaskJob;
|
|
|
+import org.dbsyncer.common.scheduled.ScheduledTaskService;
|
|
|
import org.dbsyncer.common.util.JsonUtil;
|
|
|
import org.dbsyncer.storage.binlog.proto.BinlogMessage;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.DisposableBean;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.util.Assert;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
@@ -15,17 +18,19 @@ import java.io.*;
|
|
|
import java.nio.charset.Charset;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Queue;
|
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
/**
|
|
|
* @author AE86
|
|
|
* @version 1.0.0
|
|
|
* @date 2022/6/8 0:53
|
|
|
*/
|
|
|
-public abstract class AbstractBinlogRecorder implements BinlogRecorder, DisposableBean {
|
|
|
+public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder, ScheduledTaskJob, DisposableBean {
|
|
|
|
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private ScheduledTaskService scheduledTaskService;
|
|
|
+
|
|
|
private static final long BINLOG_MAX_SIZE = 512 * 1024 * 1024;
|
|
|
|
|
|
private static final int BINLOG_EXPIRE_DAYS = 7;
|
|
@@ -40,9 +45,7 @@ public abstract class AbstractBinlogRecorder implements BinlogRecorder, Disposab
|
|
|
|
|
|
private static final String BINLOG_CONFIG = BINLOG + ".config";
|
|
|
|
|
|
- private static final int MAX_CYCLE = 100;
|
|
|
-
|
|
|
- private final AtomicInteger cycle = new AtomicInteger();
|
|
|
+ private static final long PERIOD = 3000;
|
|
|
|
|
|
private String path;
|
|
|
|
|
@@ -60,7 +63,7 @@ public abstract class AbstractBinlogRecorder implements BinlogRecorder, Disposab
|
|
|
path = new StringBuilder(System.getProperty("user.dir")).append(File.separatorChar)
|
|
|
.append("data").append(File.separatorChar)
|
|
|
.append("binlog").append(File.separatorChar)
|
|
|
- .append(getClass().getSimpleName()).append(File.separatorChar)
|
|
|
+ .append(getTaskName()).append(File.separatorChar)
|
|
|
.toString();
|
|
|
File dir = new File(path);
|
|
|
if (!dir.exists()) {
|
|
@@ -68,6 +71,16 @@ public abstract class AbstractBinlogRecorder implements BinlogRecorder, Disposab
|
|
|
}
|
|
|
|
|
|
initPipeline();
|
|
|
+ scheduledTaskService.start(PERIOD, this);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取任务名称
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ protected String getTaskName() {
|
|
|
+ return getClass().getSimpleName();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -78,28 +91,25 @@ public abstract class AbstractBinlogRecorder implements BinlogRecorder, Disposab
|
|
|
protected abstract Queue getQueue();
|
|
|
|
|
|
/**
|
|
|
- * 解析binlog
|
|
|
+ * 反序列化任务
|
|
|
+ *
|
|
|
+ * @param message
|
|
|
+ * @return
|
|
|
*/
|
|
|
- protected void parseBinlog() {
|
|
|
+ protected abstract Message deserialize(BinlogMessage message);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
if (!getQueue().isEmpty()) {
|
|
|
return;
|
|
|
}
|
|
|
- if (getQueue().isEmpty()) {
|
|
|
- cycle.getAndAdd(1);
|
|
|
- if (cycle.get() < MAX_CYCLE) {
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- cycle.set(0);
|
|
|
|
|
|
try {
|
|
|
byte[] line;
|
|
|
boolean hasLine = false;
|
|
|
while (null != (line = pipeline.readLine())) {
|
|
|
- BinlogMessage message = BinlogMessage.parseFrom(line);
|
|
|
- logger.info("parse message:{}", message.toString());
|
|
|
-// getQueue().offer(message);
|
|
|
+ deserialize(BinlogMessage.parseFrom(line));
|
|
|
+// getQueue().offer(deserialize(message));
|
|
|
hasLine = true;
|
|
|
}
|
|
|
|
|
@@ -113,12 +123,14 @@ public abstract class AbstractBinlogRecorder implements BinlogRecorder, Disposab
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void flushBinlog(BinlogMessage message) {
|
|
|
- try {
|
|
|
- out.write(message.toByteArray());
|
|
|
- out.write(LINE_SEPARATOR.getBytes(DEFAULT_CHARSET));
|
|
|
- } catch (IOException e) {
|
|
|
- logger.error(e.getMessage());
|
|
|
+ public void flush(BinlogMessage message) {
|
|
|
+ if (null != message) {
|
|
|
+ try {
|
|
|
+ out.write(message.toByteArray());
|
|
|
+ out.write(LINE_SEPARATOR.getBytes(DEFAULT_CHARSET));
|
|
|
+ } catch (IOException e) {
|
|
|
+ logger.error(e.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|