1
0
AE86 3 жил өмнө
parent
commit
b01422188a

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/FileConfigChecker.java

@@ -39,7 +39,7 @@ public class FileConfigChecker implements ConnectorConfigChecker<FileConfig> {
         }
         }
 
 
         fileConfig.setFileDir(fileDir);
         fileConfig.setFileDir(fileDir);
-        fileConfig.setSeparator(separator);
+        fileConfig.setSeparator(separator.charAt(0));
         fileConfig.setSchema(schema);
         fileConfig.setSchema(schema);
     }
     }
 
 

+ 3 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/FileConfig.java

@@ -15,7 +15,7 @@ public class FileConfig extends ConnectorConfig {
     /**
     /**
      * 分隔符
      * 分隔符
      */
      */
-    private String separator;
+    private char separator;
 
 
     /**
     /**
      * 文件描述信息
      * 文件描述信息
@@ -30,11 +30,11 @@ public class FileConfig extends ConnectorConfig {
         this.fileDir = fileDir;
         this.fileDir = fileDir;
     }
     }
 
 
-    public String getSeparator() {
+    public char getSeparator() {
         return separator;
         return separator;
     }
     }
 
 
-    public void setSeparator(String separator) {
+    public void setSeparator(char separator) {
         this.separator = separator;
         this.separator = separator;
     }
     }
 
 

+ 3 - 22
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnector.java

@@ -2,7 +2,6 @@ package org.dbsyncer.connector.file;
 
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.LineIterator;
 import org.apache.commons.io.LineIterator;
-import org.dbsyncer.common.column.Lexer;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.StringUtil;
@@ -42,7 +41,7 @@ public final class FileConnector extends AbstractConnector implements Connector<
 
 
     private static final String FILE_NAME = "fileName";
     private static final String FILE_NAME = "fileName";
     private static final String FILE_PATH = "filePath";
     private static final String FILE_PATH = "filePath";
-    private final FileResolver resolver = new FileResolver();
+    private final FileResolver fileResolver = new FileResolver();
 
 
     @Override
     @Override
     public ConnectorMapper connect(FileConfig config) {
     public ConnectorMapper connect(FileConfig config) {
@@ -110,7 +109,7 @@ public final class FileConnector extends AbstractConnector implements Connector<
             FileSchema fileSchema = getFileSchema(connectorMapper, config.getCommand().get(FILE_NAME));
             FileSchema fileSchema = getFileSchema(connectorMapper, config.getCommand().get(FILE_NAME));
             final List<Field> fields = fileSchema.getFields();
             final List<Field> fields = fileSchema.getFields();
             Assert.notEmpty(fields, "The fields of file schema is empty.");
             Assert.notEmpty(fields, "The fields of file schema is empty.");
-            char separator = fileConfig.getSeparator().charAt(0);
+            final char separator = fileConfig.getSeparator();
 
 
             reader = new FileReader(new File(config.getCommand().get(FILE_PATH)));
             reader = new FileReader(new File(config.getCommand().get(FILE_PATH)));
             LineIterator lineIterator = IOUtils.lineIterator(reader);
             LineIterator lineIterator = IOUtils.lineIterator(reader);
@@ -119,7 +118,7 @@ public final class FileConnector extends AbstractConnector implements Connector<
             AtomicLong count = new AtomicLong();
             AtomicLong count = new AtomicLong();
             while (lineIterator.hasNext()) {
             while (lineIterator.hasNext()) {
                 if (count.get() >= from && count.get() < to) {
                 if (count.get() >= from && count.get() < to) {
-                    list.add(parseRow(fields, separator, lineIterator.next()));
+                    list.add(fileResolver.parseMap(fields, separator, lineIterator.next()));
                 } else {
                 } else {
                     lineIterator.next();
                     lineIterator.next();
                 }
                 }
@@ -161,24 +160,6 @@ public final class FileConnector extends AbstractConnector implements Connector<
         return Collections.EMPTY_MAP;
         return Collections.EMPTY_MAP;
     }
     }
 
 
-    private Map<String, Object> parseRow(List<Field> fields, char separator, String next) {
-        Map<String, Object> row = new LinkedHashMap<>();
-        List<String> columns = new LinkedList<>();
-        Lexer lexer = new Lexer(next);
-        while (lexer.hasNext()) {
-            columns.add(lexer.nextToken(separator));
-        }
-
-        int columnSize = columns.size();
-        int fieldSize = fields.size();
-        for (int i = 0; i < fieldSize; i++) {
-            if (i < columnSize) {
-                row.put(fields.get(i).getName(), resolver.resolveValue(fields.get(i).getTypeName(), columns.get(i)));
-            }
-        }
-        return row;
-    }
-
     private FileSchema getFileSchema(FileConnectorMapper connectorMapper, String tableName) {
     private FileSchema getFileSchema(FileConnectorMapper connectorMapper, String tableName) {
         List<FileSchema> fileSchemaList = getFileSchema(connectorMapper);
         List<FileSchema> fileSchemaList = getFileSchema(connectorMapper);
         for (FileSchema fileSchema : fileSchemaList) {
         for (FileSchema fileSchema : fileSchemaList) {

+ 38 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileResolver.java

@@ -1,7 +1,14 @@
 package org.dbsyncer.connector.file;
 package org.dbsyncer.connector.file;
 
 
+import org.dbsyncer.common.column.Lexer;
 import org.dbsyncer.connector.file.column.ColumnValue;
 import org.dbsyncer.connector.file.column.ColumnValue;
 import org.dbsyncer.connector.file.column.FileColumnValue;
 import org.dbsyncer.connector.file.column.FileColumnValue;
+import org.dbsyncer.connector.model.Field;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 
 /**
 /**
  * @author AE86
  * @author AE86
@@ -12,6 +19,18 @@ public class FileResolver {
 
 
     private ColumnValue value = new FileColumnValue();
     private ColumnValue value = new FileColumnValue();
 
 
+    public Map<String, Object> parseMap(List<Field> fields, char separator, String line) {
+        Map<String, Object> row = new LinkedHashMap<>();
+        parse(fields, separator, line, (key, value) -> row.put(key, value));
+        return row;
+    }
+
+    public List<Object> parseList(List<Field> fields, char separator, String line) {
+        List<Object> data = new ArrayList<>();
+        parse(fields, separator, line, (key, value) -> data.add(value));
+        return data;
+    }
+
     /**
     /**
      * Resolve the value of a {@link ColumnValue}.
      * Resolve the value of a {@link ColumnValue}.
      *
      *
@@ -19,7 +38,7 @@ public class FileResolver {
      * @param columnValue
      * @param columnValue
      * @return
      * @return
      */
      */
-    protected Object resolveValue(String typeName, String columnValue) {
+    private Object resolveValue(String typeName, String columnValue) {
         value.setValue(columnValue);
         value.setValue(columnValue);
 
 
         if (value.isNull()) {
         if (value.isNull()) {
@@ -63,4 +82,22 @@ public class FileResolver {
 
 
     }
     }
 
 
+    private void parse(List<Field> fields, char separator, String line, ResultSetMapper mapper) {
+        int fieldSize = fields.size();
+        int i = 0;
+        Lexer lexer = new Lexer(line);
+        while (i < fieldSize) {
+            if (lexer.hasNext()) {
+                mapper.apply(fields.get(i).getName(), resolveValue(fields.get(i).getTypeName(), lexer.nextToken(separator)));
+            } else {
+                mapper.apply(fields.get(i).getName(), null);
+            }
+            i++;
+        }
+    }
+
+    private interface ResultSetMapper {
+        void apply(String key, Object value);
+    }
+
 }
 }

+ 8 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/column/FileColumnValue.java

@@ -64,11 +64,15 @@ public class FileColumnValue implements ColumnValue {
 
 
     @Override
     @Override
     public Timestamp asTimestamp() {
     public Timestamp asTimestamp() {
-        if (NumberUtil.isCreatable(value)) {
-            return new Timestamp(asLong());
+        try {
+            if (NumberUtil.isCreatable(value)) {
+                return new Timestamp(asLong());
+            }
+
+            return DateFormatUtil.stringToTimestamp(value);
+        } catch (Exception e) {
+            return null;
         }
         }
-
-        return DateFormatUtil.stringToTimestamp(value);
     }
     }
 
 
     @Override
     @Override

+ 331 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/BufferedRandomAccessFile.java

@@ -0,0 +1,331 @@
+package org.dbsyncer.listener.file;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/7 22:27
+ */
+public class BufferedRandomAccessFile extends RandomAccessFile {
+    static final int LogBuffSz_ = 16; // 64K buffer
+    public static final int BuffSz_ = (1 << LogBuffSz_);
+    static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+
+    private String path_;
+
+    /*
+     * This implementation is based on the buffer implementation in Modula-3's
+     * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+     */
+    private boolean dirty_; // true iff unflushed bytes exist
+    private boolean syncNeeded_; // dirty_ can be cleared by e.g. seek, so track sync separately
+    private long curr_; // current position in file
+    private long lo_, hi_; // bounds on characters in "buff"
+    private byte[] buff_; // local buffer
+    private long maxHi_; // this.lo + this.buff.length
+    private boolean hitEOF_; // buffer contains last file block?
+    private long diskPos_; // disk position
+
+    /*
+     * To describe the above fields, we introduce the following abstractions for
+     * the file "f":
+     *
+     * len(f) the length of the file curr(f) the current position in the file
+     * c(f) the abstract contents of the file disk(f) the contents of f's
+     * backing disk file closed(f) true iff the file is closed
+     *
+     * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+     * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+     * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+     * operation has the effect of making "disk(f)" identical to "c(f)".
+     *
+     * A file is said to be *valid* if the following conditions hold:
+     *
+     * V1. The "closed" and "curr" fields are correct:
+     *
+     * f.closed == closed(f) f.curr == curr(f)
+     *
+     * V2. The current position is either contained in the buffer, or just past
+     * the buffer:
+     *
+     * f.lo <= f.curr <= f.hi
+     *
+     * V3. Any (possibly) unflushed characters are stored in "f.buff":
+     *
+     * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+     *
+     * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+     *
+     * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+     * disk(f)[i])
+     *
+     * V5. "f.dirty" is true iff the buffer contains bytes that should be
+     * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+     *
+     * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+     *
+     * V6. this.maxHi == this.lo + this.buff.length
+     *
+     * Note that "f.buff" can be "null" in a valid file, since the range of
+     * characters in V3 is empty when "f.lo == f.curr".
+     *
+     * A file is said to be *ready* if the buffer contains the current position,
+     * i.e., when:
+     *
+     * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+     *
+     * When a file is ready, reading or writing a single byte can be performed
+     * by reading or writing the in-memory buffer without performing a disk
+     * operation.
+     */
+
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
+     * in mode <code>mode</code>, which should be "r" for reading only, or
+     * "rw" for reading and writing.
+     */
+    public BufferedRandomAccessFile(File file, String mode) throws IOException {
+        this(file, mode, 0);
+    }
+
+    public BufferedRandomAccessFile(File file, String mode, int size) throws IOException {
+        super(file, mode);
+        path_ = file.getAbsolutePath();
+        this.init(size);
+    }
+
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on the file named
+     * <code>name</code> in mode <code>mode</code>, which should be "r" for
+     * reading only, or "rw" for reading and writing.
+     */
+    public BufferedRandomAccessFile(String name, String mode) throws IOException {
+        this(name, mode, 0);
+    }
+
+    public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException {
+        super(name, mode);
+        path_ = name;
+        this.init(size);
+    }
+
+    private void init(int size) {
+        this.dirty_ = false;
+        this.lo_ = this.curr_ = this.hi_ = 0;
+        this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+        this.maxHi_ = (long) BuffSz_;
+        this.hitEOF_ = false;
+        this.diskPos_ = 0L;
+    }
+
+    public String getPath() {
+        return path_;
+    }
+
+    public void sync() throws IOException {
+        if (syncNeeded_) {
+            flush();
+            getChannel().force(true);
+            syncNeeded_ = false;
+        }
+    }
+
+//      public boolean isEOF() throws IOException
+//      {
+//          assert getFilePointer() <= length();
+//          return getFilePointer() == length();
+//      }
+
+    public void close() throws IOException {
+        this.flush();
+        this.buff_ = null;
+        super.close();
+    }
+
+    /**
+     * Flush any bytes in the file's buffer that have not yet been written to
+     * disk. If the file was created read-only, this method is a no-op.
+     */
+    public void flush() throws IOException {
+        this.flushBuffer();
+    }
+
+    /* Flush any dirty bytes in the buffer to disk. */
+    private void flushBuffer() throws IOException {
+        if (this.dirty_) {
+            if (this.diskPos_ != this.lo_)
+                super.seek(this.lo_);
+            int len = (int) (this.curr_ - this.lo_);
+            super.write(this.buff_, 0, len);
+            this.diskPos_ = this.curr_;
+            this.dirty_ = false;
+        }
+    }
+
+    /*
+     * Read at most "this.buff.length" bytes into "this.buff", returning the
+     * number of bytes read. If the return result is less than
+     * "this.buff.length", then EOF was read.
+     */
+    private int fillBuffer() throws IOException {
+        int cnt = 0;
+        int rem = this.buff_.length;
+        while (rem > 0) {
+            int n = super.read(this.buff_, cnt, rem);
+            if (n < 0)
+                break;
+            cnt += n;
+            rem -= n;
+        }
+        if ((cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length))) {
+            // make sure buffer that wasn't read is initialized with -1
+            Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
+        }
+        this.diskPos_ += cnt;
+        return cnt;
+    }
+
+    /*
+     * This method positions <code>this.curr</code> at position <code>pos</code>.
+     * If <code>pos</code> does not fall in the current buffer, it flushes the
+     * current buffer and loads the correct one.<p>
+     *
+     * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
+     * is at or past the end-of-file, which can only happen if the file was
+     * opened in read-only mode.
+     */
+    public void seek(long pos) throws IOException {
+        if (pos >= this.hi_ || pos < this.lo_) {
+            // seeking outside of current buffer -- flush and read
+            this.flushBuffer();
+            this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+            this.maxHi_ = this.lo_ + (long) this.buff_.length;
+            if (this.diskPos_ != this.lo_) {
+                super.seek(this.lo_);
+                this.diskPos_ = this.lo_;
+            }
+            int n = this.fillBuffer();
+            this.hi_ = this.lo_ + (long) n;
+        } else {
+            // seeking inside current buffer -- no read required
+            if (pos < this.curr_) {
+                // if seeking backwards, we must flush to maintain V4
+                this.flushBuffer();
+            }
+        }
+        this.curr_ = pos;
+    }
+
+    public long getFilePointer() {
+        return this.curr_;
+    }
+
+    public long length() throws IOException {
+        // max accounts for the case where we have written past the old file length, but not yet flushed our buffer
+        return Math.max(this.curr_, super.length());
+    }
+
+    public int read() throws IOException {
+        if (this.curr_ >= this.hi_) {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        byte res = this.buff_[(int) (this.curr_ - this.lo_)];
+        this.curr_++;
+        return ((int) res) & 0xFF; // convert byte -> int
+    }
+
+    public int read(byte[] b) throws IOException {
+        return this.read(b, 0, b.length);
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (this.curr_ >= this.hi_) {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(this.buff_, buffOff, b, off, len);
+        this.curr_ += len;
+        return len;
+    }
+
+    public void write(int b) throws IOException {
+        if (this.curr_ >= this.hi_) {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_) {
+                // at EOF -- bump "hi"
+                this.hi_++;
+            } else {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_) {
+                    // appending to EOF -- bump "hi"
+                    this.hi_++;
+                }
+            }
+        }
+        this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
+        this.curr_++;
+        this.dirty_ = true;
+        syncNeeded_ = true;
+    }
+
+    public void write(byte[] b) throws IOException {
+        this.write(b, 0, b.length);
+    }
+
+    public void write(byte[] b, int off, int len) throws IOException {
+        while (len > 0) {
+            int n = this.writeAtMost(b, off, len);
+            off += n;
+            len -= n;
+            this.dirty_ = true;
+            syncNeeded_ = true;
+        }
+    }
+
+    /*
+     * Write at most "len" bytes to "b" starting at position "off", and return
+     * the number of bytes written.
+     */
+    private int writeAtMost(byte[] b, int off, int len) throws IOException {
+        if (this.curr_ >= this.hi_) {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_) {
+                // at EOF -- bump "hi"
+                this.hi_ = this.maxHi_;
+            } else {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_) {
+                    // appending to EOF -- bump "hi"
+                    this.hi_ = this.maxHi_;
+                }
+            }
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(b, off, this.buff_, buffOff, len);
+        this.curr_ += len;
+        return len;
+    }
+}

+ 56 - 19
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -1,11 +1,15 @@
 package org.dbsyncer.listener.file;
 package org.dbsyncer.listener.file;
 
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.IOUtils;
+import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.RandomUtil;
 import org.dbsyncer.common.util.RandomUtil;
 import org.dbsyncer.connector.config.FileConfig;
 import org.dbsyncer.connector.config.FileConfig;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.file.FileConnectorMapper;
 import org.dbsyncer.connector.file.FileConnectorMapper;
+import org.dbsyncer.connector.file.FileResolver;
+import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.FileSchema;
 import org.dbsyncer.connector.model.FileSchema;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.ListenerException;
@@ -15,8 +19,8 @@ import org.springframework.util.Assert;
 
 
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.file.*;
 import java.nio.file.*;
+import java.util.Collections;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -32,14 +36,17 @@ public class FileExtractor extends AbstractExtractor {
 
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
 
+    private static final byte[] buffer = new byte[4096];
+    private static final String POS_PREFIX = "pos_";
+    private static final String CHARSET_NAME = "UTF-8";
     private final Lock connectLock = new ReentrantLock();
     private final Lock connectLock = new ReentrantLock();
     private volatile boolean connected;
     private volatile boolean connected;
     private FileConnectorMapper connectorMapper;
     private FileConnectorMapper connectorMapper;
     private WatchService watchService;
     private WatchService watchService;
     private Worker worker;
     private Worker worker;
-    private Map<String, RandomAccessFile> pipeline = new ConcurrentHashMap<>();
-    private static final byte[] buffer = new byte[4096];
-    private static final String POS_PREFIX = "pos_";
+    private Map<String, PipelineResolver> pipeline = new ConcurrentHashMap<>();
+    private final FileResolver fileResolver = new FileResolver();
+    private char separator;
 
 
     @Override
     @Override
     public void start() {
     public void start() {
@@ -55,11 +62,17 @@ public class FileExtractor extends AbstractExtractor {
             final String mapperCacheKey = connectorFactory.getConnector(connectorMapper).getConnectorMapperCacheKey(connectorConfig);
             final String mapperCacheKey = connectorFactory.getConnector(connectorMapper).getConnectorMapperCacheKey(connectorConfig);
             connected = true;
             connected = true;
 
 
+            separator = config.getSeparator();
             initPipeline(config.getFileDir(), config.getSchema());
             initPipeline(config.getFileDir(), config.getSchema());
             watchService = FileSystems.getDefault().newWatchService();
             watchService = FileSystems.getDefault().newWatchService();
             Path p = Paths.get(config.getFileDir());
             Path p = Paths.get(config.getFileDir());
             p.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
             p.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
 
 
+            for (String fileName : pipeline.keySet()) {
+                parseEvent(fileName);
+            }
+            forceFlushEvent();
+
             worker = new Worker();
             worker = new Worker();
             worker.setName(new StringBuilder("file-parser-").append(mapperCacheKey).append("_").append(RandomUtil.nextInt(1, 100)).toString());
             worker.setName(new StringBuilder("file-parser-").append(mapperCacheKey).append("_").append(RandomUtil.nextInt(1, 100)).toString());
             worker.setDaemon(false);
             worker.setDaemon(false);
@@ -77,19 +90,19 @@ public class FileExtractor extends AbstractExtractor {
         List<FileSchema> fileSchemas = JsonUtil.jsonToArray(schema, FileSchema.class);
         List<FileSchema> fileSchemas = JsonUtil.jsonToArray(schema, FileSchema.class);
         Assert.notEmpty(fileSchemas, "found not file schema.");
         Assert.notEmpty(fileSchemas, "found not file schema.");
         for (FileSchema fileSchema : fileSchemas) {
         for (FileSchema fileSchema : fileSchemas) {
-            String file = fileDir.concat(fileSchema.getFileName());
+            String fileName = fileSchema.getFileName();
+            String file = fileDir.concat(fileName);
             Assert.isTrue(new File(file).exists(), String.format("found not file '%s'", file));
             Assert.isTrue(new File(file).exists(), String.format("found not file '%s'", file));
 
 
-            final RandomAccessFile raf = new RandomAccessFile(file, "r");
-            pipeline.put(fileSchema.getFileName(), raf);
-
-            final String filePosKey = getFilePosKey(fileSchema.getFileName());
+            final BufferedRandomAccessFile raf = new BufferedRandomAccessFile(file, "r");
+            final String filePosKey = getFilePosKey(fileName);
             if (snapshot.containsKey(filePosKey)) {
             if (snapshot.containsKey(filePosKey)) {
                 raf.seek(NumberUtil.toLong(snapshot.get(filePosKey), 0L));
                 raf.seek(NumberUtil.toLong(snapshot.get(filePosKey), 0L));
-                continue;
+            } else {
+                raf.seek(raf.length());
             }
             }
 
 
-            raf.seek(raf.length());
+            pipeline.put(fileName, new PipelineResolver(fileSchema.getFields(), raf));
         }
         }
     }
     }
 
 
@@ -109,7 +122,7 @@ public class FileExtractor extends AbstractExtractor {
 
 
     private void closePipelineAndWatch() {
     private void closePipelineAndWatch() {
         try {
         try {
-            pipeline.values().forEach(raf -> IOUtils.closeQuietly(raf));
+            pipeline.values().forEach(pipelineResolver -> IOUtils.closeQuietly(pipelineResolver.getRaf()));
             pipeline.clear();
             pipeline.clear();
 
 
             if (null != watchService) {
             if (null != watchService) {
@@ -126,16 +139,17 @@ public class FileExtractor extends AbstractExtractor {
 
 
     private void parseEvent(String fileName) throws IOException {
     private void parseEvent(String fileName) throws IOException {
         if (pipeline.containsKey(fileName)) {
         if (pipeline.containsKey(fileName)) {
-            final RandomAccessFile raf = pipeline.get(fileName);
+            PipelineResolver pipelineResolver = pipeline.get(fileName);
+            final BufferedRandomAccessFile raf = pipelineResolver.getRaf();
 
 
             int len = 0;
             int len = 0;
             while (-1 != len) {
             while (-1 != len) {
+                // TODO 多行出现粘包,需手动readline
                 len = raf.read(buffer);
                 len = raf.read(buffer);
                 if (0 < len) {
                 if (0 < len) {
-                    // TODO 解析 line
-                    logger.info("offset:{}, len:{}", raf.getFilePointer(), len);
-                    logger.info(new String(buffer, 1, len, "UTF-8"));
-                    continue;
+                    String lines = new String(buffer, 0, len, CHARSET_NAME);
+                    List<Object> row = fileResolver.parseList(pipelineResolver.getFields(), separator, lines);
+                    changedEvent(new RowChangedEvent(fileName, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, row));
                 }
                 }
             }
             }
 
 
@@ -144,20 +158,43 @@ public class FileExtractor extends AbstractExtractor {
         }
         }
     }
     }
 
 
+    final class PipelineResolver {
+
+        List<Field> fields;
+        BufferedRandomAccessFile raf;
+
+        public PipelineResolver(List<Field> fields, BufferedRandomAccessFile raf) {
+            this.fields = fields;
+            this.raf = raf;
+        }
+
+        public List<Field> getFields() {
+            return fields;
+        }
+
+        public BufferedRandomAccessFile getRaf() {
+            return raf;
+        }
+    }
+
     final class Worker extends Thread {
     final class Worker extends Thread {
 
 
         @Override
         @Override
         public void run() {
         public void run() {
             while (!isInterrupted() && connected) {
             while (!isInterrupted() && connected) {
+                WatchKey watchKey = null;
                 try {
                 try {
-                    WatchKey watchKey = watchService.take();
+                    watchKey = watchService.take();
                     List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
                     List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
                     for (WatchEvent<?> event : watchEvents) {
                     for (WatchEvent<?> event : watchEvents) {
                         parseEvent(event.context().toString());
                         parseEvent(event.context().toString());
                     }
                     }
-                    watchKey.reset();
                 } catch (Exception e) {
                 } catch (Exception e) {
                     logger.error(e.getMessage());
                     logger.error(e.getMessage());
+                } finally {
+                    if (null != watchKey) {
+                        watchKey.reset();
+                    }
                 }
                 }
             }
             }
         }
         }