AE86 3 年之前
父节点
当前提交
c240ce3bfc
共有 1 个文件被更改,包括 122 次插入23 次删除
  1. 122 23
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

+ 122 - 23
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -17,9 +17,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.file.*;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -36,7 +39,8 @@ public class FileExtractor extends AbstractExtractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private static final byte[] buffer = new byte[4096];
+    private static final int BUFFER_SIZE = 55;
+    private static final byte[] buffer = new byte[BUFFER_SIZE];
     private static final String POS_PREFIX = "pos_";
     private static final String CHARSET_NAME = "UTF-8";
     private final Lock connectLock = new ReentrantLock();
@@ -94,7 +98,7 @@ public class FileExtractor extends AbstractExtractor {
             String file = fileDir.concat(fileName);
             Assert.isTrue(new File(file).exists(), String.format("found not file '%s'", file));
 
-            final BufferedRandomAccessFile raf = new BufferedRandomAccessFile(file, "r");
+            final RandomAccessFile raf = new BufferedRandomAccessFile(file, "r");
             final String filePosKey = getFilePosKey(fileName);
             if (snapshot.containsKey(filePosKey)) {
                 raf.seek(NumberUtil.toLong(snapshot.get(filePosKey), 0L));
@@ -122,7 +126,7 @@ public class FileExtractor extends AbstractExtractor {
 
     private void closePipelineAndWatch() {
         try {
-            pipeline.values().forEach(pipelineResolver -> IOUtils.closeQuietly(pipelineResolver.getRaf()));
+            pipeline.values().forEach(pipelineResolver -> IOUtils.closeQuietly(pipelineResolver.raf));
             pipeline.clear();
 
             if (null != watchService) {
@@ -140,40 +144,135 @@ public class FileExtractor extends AbstractExtractor {
     private void parseEvent(String fileName) throws IOException {
         if (pipeline.containsKey(fileName)) {
             PipelineResolver pipelineResolver = pipeline.get(fileName);
-            final BufferedRandomAccessFile raf = pipelineResolver.getRaf();
-
-            int len = 0;
-            while (-1 != len) {
-                // TODO 多行出现粘包,需手动readline
-                len = raf.read(buffer);
-                if (0 < len) {
-                    String lines = new String(buffer, 0, len, CHARSET_NAME);
-                    List<Object> row = fileResolver.parseList(pipelineResolver.getFields(), separator, lines);
-                    changedEvent(new RowChangedEvent(fileName, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, row));
-                }
-            }
+            final RandomAccessFile raf = pipelineResolver.raf;
+            logger.info("{}", raf.getFilePointer());
 
+//            byte[] buffer = new byte[BUFFER_SIZE];
+//            int len = 0;
+//            while (-1 != len) {
+//                len = raf.read(buffer);
+//                if (0 < len) {
+//                    AtomicInteger pointer = new AtomicInteger();
+//                    AtomicInteger limit = new AtomicInteger(len);
+//                    String line;
+//                    while (null != (line = readLine(raf, buffer, pointer, limit))) {
+//                        List<Object> row = fileResolver.parseList(pipelineResolver.fields, separator, line);
+//                        changedEvent(new RowChangedEvent(fileName, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, row));
+//                    }
+//                }
+//            }
             final String filePosKey = getFilePosKey(fileName);
-            snapshot.put(filePosKey, String.valueOf(raf.getFilePointer()));
+            String line;
+            while (null != (line = pipelineResolver.readLine())) {
+                logger.info(line);
+                logger.info("{}", raf.getFilePointer());
+                snapshot.put(filePosKey, String.valueOf(raf.getFilePointer()));
+                List<Object> row = fileResolver.parseList(pipelineResolver.fields, separator, line);
+                changedEvent(new RowChangedEvent(fileName, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, row));
+            }
+
         }
     }
 
-    final class PipelineResolver {
+//    public final String readLine(RandomAccessFile raf, byte[] buffer, AtomicInteger pointer, AtomicInteger limit) throws IOException {
+//        String s = new String(buffer, 0, limit.get(), CHARSET_NAME);
+//
+//        int offset = pointer.get();
+//        int end = 0;
+//        boolean eol = false;
+//        while (!eol && pointer.get() < limit.get()) {
+//            char c = (char) buffer[pointer.get()];
+//            switch (c) {
+//                case '\n':
+//                case '\r':
+//                    eol = true;
+//                    break;
+//                default:
+//                    end++;
+//                    break;
+//            }
+//            pointer.getAndAdd(1);
+//        }
+//        if (end == 0) {
+//            return null;
+//        }
+//
+//        if (!eol) {
+//            long rollback = raf.getFilePointer() - pointer.get() - offset;
+//            raf.seek(rollback);
+//            return null;
+//        }
+//        String s1 = new String(buffer, offset, end, CHARSET_NAME);
+//        logger.info(s1);
+//        return s1;
+//    }
 
+    final class PipelineResolver {
         List<Field> fields;
-        BufferedRandomAccessFile raf;
+        RandomAccessFile raf;
+        byte[] b;
+        long filePointer;
 
-        public PipelineResolver(List<Field> fields, BufferedRandomAccessFile raf) {
+        public PipelineResolver(List<Field> fields, RandomAccessFile raf) {
             this.fields = fields;
             this.raf = raf;
         }
 
-        public List<Field> getFields() {
-            return fields;
+        public String readLine() throws IOException {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            int size = 0;
+            boolean ifCRLF = false;
+            int read = 0; // 已读数
+            while (true) {
+                // 填充byte[]
+                fill();
+                size = b.length;
+                if (b.length == 0) {
+                    return null;
+                }
+                for (int i = 0; i < size; i++) {
+                    read++;
+                    if (b[i] == '\n') {
+                        ifCRLF = true;
+                        break;
+                    }
+                    if (b[i] != '\r') {
+                        baos.write(b[i]);
+                    }
+                }
+
+                // 重置b
+                b = Arrays.copyOfRange(b, read, size);
+                if (ifCRLF)
+                    break;
+            }
+
+            byte[] b = baos.toByteArray();
+            raf.seek(this.filePointer + read);
+            baos.close();
+            baos = null;
+            return new String(b, CHARSET_NAME);
         }
 
-        public BufferedRandomAccessFile getRaf() {
-            return raf;
+        private void fill() throws IOException {
+            ByteArrayOutputStream stream = new ByteArrayOutputStream();
+            if (b != null && b.length > 0) {
+                stream.write(b, 0, b.length);
+            }
+            int size = 0;
+            long len = raf.length();
+            long pointer = raf.getFilePointer();
+            if (pointer >= len) {
+                b = new byte[0];
+                return;
+            }
+            byte[] _b = new byte[(int) (len - pointer)];
+            this.filePointer = raf.getFilePointer();
+            size = raf.read(_b);
+            stream.write(_b, 0, size);
+            _b = null;
+            b = stream.toByteArray();
+            stream = null;
         }
     }