AE86 пре 2 година
родитељ
комит
b9d0811b2f

+ 2 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/ParserStrategy.java

@@ -7,4 +7,5 @@ import org.dbsyncer.parser.model.TableGroup;
 public interface ParserStrategy {
 
     void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event);
-}
+
+}

+ 4 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/strategy/impl/DisableWriterBufferActuatorStrategy.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.parser.strategy.impl;
 
 import com.google.protobuf.ByteString;
+import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.constant.ConnectorConstant;
@@ -27,6 +28,9 @@ public final class DisableWriterBufferActuatorStrategy extends AbstractBinlogRec
     @Autowired
     private BufferActuator writerBufferActuator;
 
+    @Autowired
+    private CacheService cacheService;
+
     @Override
     public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event) {
         try {

+ 5 - 67
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/AbstractBinlogRecorder.java

@@ -16,7 +16,6 @@ import org.springframework.util.Assert;
 import javax.annotation.PostConstruct;
 import java.io.*;
 import java.nio.charset.Charset;
-import java.util.Arrays;
 import java.util.Queue;
 
 /**
@@ -53,7 +52,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
     private Binlog binlog;
 
-    private Pipeline pipeline;
+    private BinlogPipeline pipeline;
 
     private OutputStream out;
 
@@ -114,7 +113,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
             }
 
             if (hasLine) {
-                binlog.setPos(pipeline.filePointer);
+                binlog.setPos(pipeline.getFilePointer());
                 FileUtils.writeStringToFile(configPath, JsonUtil.objToJson(binlog), DEFAULT_CHARSET);
             }
         } catch (IOException e) {
@@ -126,7 +125,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
     public void flush(BinlogMessage message) {
         if (null != message) {
             try {
-                out.write(message.toByteArray());
+                message.writeDelimitedTo(out);
                 out.write(LINE_SEPARATOR.getBytes(DEFAULT_CHARSET));
             } catch (IOException e) {
                 logger.error(e.getMessage());
@@ -137,7 +136,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
     @Override
     public void destroy() {
         IOUtils.closeQuietly(out);
-        IOUtils.closeQuietly(pipeline.raf);
+        IOUtils.closeQuietly(pipeline.getRaf());
     }
 
     private void initPipeline() throws IOException {
@@ -162,7 +161,7 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
 
         final RandomAccessFile raf = new BufferedRandomAccessFile(binlogFile, "r");
         raf.seek(binlog.getPos());
-        pipeline = new Pipeline(raf);
+        pipeline = new BinlogPipeline(raf);
         out = new FileOutputStream(binlogFile, true);
     }
 
@@ -170,65 +169,4 @@ public abstract class AbstractBinlogRecorder<Message> implements BinlogRecorder,
         return String.format("%s.%06d", BINLOG, index <= 0 ? 1 : index);
     }
 
-    final class Pipeline {
-        RandomAccessFile raf;
-        byte[] b;
-        long filePointer;
-
-        public Pipeline(RandomAccessFile raf) {
-            this.raf = raf;
-        }
-
-        public byte[] readLine() throws IOException {
-            this.filePointer = raf.getFilePointer();
-            if (filePointer >= raf.length()) {
-                b = new byte[0];
-                return null;
-            }
-            if (b == null || b.length == 0) {
-                b = new byte[(int) (raf.length() - filePointer)];
-            }
-            raf.read(b);
-
-            ByteArrayOutputStream stream = new ByteArrayOutputStream();
-            int read = 0;
-            for (int i = 0; i < b.length; i++) {
-                read++;
-                if (b[i] == '\n' || b[i] == '\r') {
-                    break;
-                }
-                stream.write(b[i]);
-            }
-            b = Arrays.copyOfRange(b, read, b.length);
-
-            raf.seek(this.filePointer + read);
-            byte[] _b = stream.toByteArray();
-            stream.close();
-            stream = null;
-            return _b;
-        }
-    }
-
-    static class Binlog {
-        private String binlog;
-        private long pos = 0;
-
-        public String getBinlog() {
-            return binlog;
-        }
-
-        public Binlog setBinlog(String binlog) {
-            this.binlog = binlog;
-            return this;
-        }
-
-        public long getPos() {
-            return pos;
-        }
-
-        public Binlog setPos(long pos) {
-            this.pos = pos;
-            return this;
-        }
-    }
 }

+ 29 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/Binlog.java

@@ -0,0 +1,29 @@
+package org.dbsyncer.storage.binlog;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/6/19 23:03
+ */
+public final class Binlog {
+    private String binlog;
+    private long pos = 0;
+
+    public String getBinlog() {
+        return binlog;
+    }
+
+    public Binlog setBinlog(String binlog) {
+        this.binlog = binlog;
+        return this;
+    }
+
+    public long getPos() {
+        return pos;
+    }
+
+    public Binlog setPos(long pos) {
+        this.pos = pos;
+        return this;
+    }
+}

+ 67 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/binlog/BinlogPipeline.java

@@ -0,0 +1,67 @@
+package org.dbsyncer.storage.binlog;
+
+import com.google.protobuf.AbstractMessageLite;
+import com.google.protobuf.CodedInputStream;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/6/19 23:36
+ */
+public class BinlogPipeline {
+    private RandomAccessFile raf;
+    private byte[] b;
+    private long filePointer;
+
+    public BinlogPipeline(RandomAccessFile raf) {
+        this.raf = raf;
+    }
+
+    public byte[] readLine() throws IOException {
+        this.filePointer = raf.getFilePointer();
+        if (filePointer >= raf.length()) {
+            b = new byte[0];
+            return null;
+        }
+        if (b == null || b.length == 0) {
+            b = new byte[(int) (raf.length() - filePointer)];
+        }
+
+        int firstByte = raf.read(b);
+        if ((firstByte & 0x80) != 0) {
+            firstByte = firstByte & 0x7f;
+        }
+
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        int read = 1;
+        for (int i = 1; i < firstByte; i++) {
+            read++;
+            stream.write(b[i]);
+        }
+        b = Arrays.copyOfRange(b, read, b.length);
+
+        raf.seek(this.filePointer + read);
+        byte[] _b = stream.toByteArray();
+        stream.close();
+        stream = null;
+        return _b;
+    }
+
+    public RandomAccessFile getRaf() {
+        return raf;
+    }
+
+    public byte[] getBytes() {
+        return b;
+    }
+
+    public long getFilePointer() {
+        return filePointer;
+    }
+}

+ 50 - 8
dbsyncer-storage/src/main/test/BinlogMessageTest.java

@@ -1,13 +1,22 @@
 import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.dbsyncer.common.file.BufferedRandomAccessFile;
 import org.dbsyncer.common.util.DateFormatUtil;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.storage.binlog.Binlog;
+import org.dbsyncer.storage.binlog.BinlogPipeline;
 import org.dbsyncer.storage.binlog.proto.BinlogMessage;
 import org.dbsyncer.storage.binlog.proto.Data;
 import org.dbsyncer.storage.binlog.proto.EventEnum;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.*;
+import java.nio.charset.Charset;
 import java.time.LocalTime;
 
 /**
@@ -19,25 +28,58 @@ public class BinlogMessageTest {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    private String path;
+
+    private OutputStream out;
+
+    private BinlogPipeline pipeline;
+
+    @Before
+    public void init() throws IOException {
+        File dir = new File(System.getProperty("user.dir")).getParentFile();
+        path = new StringBuilder(dir.getAbsolutePath()).append(File.separatorChar)
+                .append("data").append(File.separatorChar)
+                .append("binlog").append(File.separatorChar)
+                .append("WriterBinlog").append(File.separatorChar)
+                .toString();
+        File configPath = new File(path + "binlog.config");
+        String configJson = FileUtils.readFileToString(configPath, Charset.defaultCharset());
+        Binlog binlog = JsonUtil.jsonToObj(configJson, Binlog.class);
+        File binlogFile = new File(path + binlog.getBinlog());
+        out = new FileOutputStream(binlogFile, true);
+
+        final RandomAccessFile raf = new BufferedRandomAccessFile(binlogFile, "r");
+        raf.seek(binlog.getPos());
+        pipeline = new BinlogPipeline(raf);
+    }
+
+    @After
+    public void close() {
+        IOUtils.closeQuietly(out);
+        IOUtils.closeQuietly(pipeline.getRaf());
+    }
+
     @Test
-    public void testBinlogMessage() throws InvalidProtocolBufferException {
+    public void testBinlogMessage() throws IOException {
         LocalTime localTime = DateFormatUtil.stringToLocalTime("2022-06-18 22:59:59");
         String s = localTime.toString();
 
         BinlogMessage build = BinlogMessage.newBuilder()
-                .setTableGroupId("123456788888")
+                .setTableGroupId("123456700000")
                 .setEvent(EventEnum.UPDATE)
                 .addData(Data.newBuilder().putRow("aaa", ByteString.copyFromUtf8("hello,中国")).putRow("aaa111", ByteString.copyFromUtf8(s)))
                 .build();
-
         byte[] bytes = build.toByteArray();
         logger.info("序列化长度:{}", bytes.length);
         logger.info("{}", bytes);
 
-        BinlogMessage message = BinlogMessage.parseFrom(bytes);
-        logger.info(message.getTableGroupId());
-        logger.info(message.getEvent().name());
-        logger.info(message.getDataList().toString());
+//        build.writeDelimitedTo(out);
+
+        byte[] line;
+        while (null != (line = pipeline.readLine())) {
+            BinlogMessage binlogMessage = BinlogMessage.parseFrom(line);
+            logger.info(binlogMessage.toString());
+        }
     }
 
 }

+ 0 - 1
dbsyncer-storage/src/main/test/LuceneFactoryTest.java

@@ -4,7 +4,6 @@ import org.apache.lucene.analysis.cn.smart.SmartChineseAnalyzer;
 import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
 import org.apache.lucene.document.*;
 import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.queryparser.classic.ParseException;
 import org.apache.lucene.queryparser.classic.QueryParser;