|
@@ -39,8 +39,6 @@ public class FileExtractor extends AbstractExtractor {
|
|
|
|
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
- private static final int BUFFER_SIZE = 55;
|
|
|
|
- private static final byte[] buffer = new byte[BUFFER_SIZE];
|
|
|
|
private static final String POS_PREFIX = "pos_";
|
|
private static final String POS_PREFIX = "pos_";
|
|
private static final String CHARSET_NAME = "UTF-8";
|
|
private static final String CHARSET_NAME = "UTF-8";
|
|
private final Lock connectLock = new ReentrantLock();
|
|
private final Lock connectLock = new ReentrantLock();
|
|
@@ -147,20 +145,6 @@ public class FileExtractor extends AbstractExtractor {
|
|
final RandomAccessFile raf = pipelineResolver.raf;
|
|
final RandomAccessFile raf = pipelineResolver.raf;
|
|
logger.info("{}", raf.getFilePointer());
|
|
logger.info("{}", raf.getFilePointer());
|
|
|
|
|
|
-// byte[] buffer = new byte[BUFFER_SIZE];
|
|
|
|
-// int len = 0;
|
|
|
|
-// while (-1 != len) {
|
|
|
|
-// len = raf.read(buffer);
|
|
|
|
-// if (0 < len) {
|
|
|
|
-// AtomicInteger pointer = new AtomicInteger();
|
|
|
|
-// AtomicInteger limit = new AtomicInteger(len);
|
|
|
|
-// String line;
|
|
|
|
-// while (null != (line = readLine(raf, buffer, pointer, limit))) {
|
|
|
|
-// List<Object> row = fileResolver.parseList(pipelineResolver.fields, separator, line);
|
|
|
|
-// changedEvent(new RowChangedEvent(fileName, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, row));
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
final String filePosKey = getFilePosKey(fileName);
|
|
final String filePosKey = getFilePosKey(fileName);
|
|
String line;
|
|
String line;
|
|
while (null != (line = pipelineResolver.readLine())) {
|
|
while (null != (line = pipelineResolver.readLine())) {
|
|
@@ -174,39 +158,6 @@ public class FileExtractor extends AbstractExtractor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-// public final String readLine(RandomAccessFile raf, byte[] buffer, AtomicInteger pointer, AtomicInteger limit) throws IOException {
|
|
|
|
-// String s = new String(buffer, 0, limit.get(), CHARSET_NAME);
|
|
|
|
-//
|
|
|
|
-// int offset = pointer.get();
|
|
|
|
-// int end = 0;
|
|
|
|
-// boolean eol = false;
|
|
|
|
-// while (!eol && pointer.get() < limit.get()) {
|
|
|
|
-// char c = (char) buffer[pointer.get()];
|
|
|
|
-// switch (c) {
|
|
|
|
-// case '\n':
|
|
|
|
-// case '\r':
|
|
|
|
-// eol = true;
|
|
|
|
-// break;
|
|
|
|
-// default:
|
|
|
|
-// end++;
|
|
|
|
-// break;
|
|
|
|
-// }
|
|
|
|
-// pointer.getAndAdd(1);
|
|
|
|
-// }
|
|
|
|
-// if (end == 0) {
|
|
|
|
-// return null;
|
|
|
|
-// }
|
|
|
|
-//
|
|
|
|
-// if (!eol) {
|
|
|
|
-// long rollback = raf.getFilePointer() - pointer.get() - offset;
|
|
|
|
-// raf.seek(rollback);
|
|
|
|
-// return null;
|
|
|
|
-// }
|
|
|
|
-// String s1 = new String(buffer, offset, end, CHARSET_NAME);
|
|
|
|
-// logger.info(s1);
|
|
|
|
-// return s1;
|
|
|
|
-// }
|
|
|
|
-
|
|
|
|
final class PipelineResolver {
|
|
final class PipelineResolver {
|
|
List<Field> fields;
|
|
List<Field> fields;
|
|
RandomAccessFile raf;
|
|
RandomAccessFile raf;
|
|
@@ -219,60 +170,36 @@ public class FileExtractor extends AbstractExtractor {
|
|
}
|
|
}
|
|
|
|
|
|
public String readLine() throws IOException {
|
|
public String readLine() throws IOException {
|
|
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
|
- int size = 0;
|
|
|
|
|
|
+ this.filePointer = raf.getFilePointer();
|
|
|
|
+ if (filePointer >= raf.length()) {
|
|
|
|
+ b = new byte[0];
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ if (b == null || b.length == 0) {
|
|
|
|
+ b = new byte[(int) (raf.length() - filePointer)];
|
|
|
|
+ }
|
|
|
|
+ raf.read(b);
|
|
|
|
+
|
|
|
|
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
|
|
boolean ifCRLF = false;
|
|
boolean ifCRLF = false;
|
|
- int read = 0; // 已读数
|
|
|
|
- while (true) {
|
|
|
|
- // 填充byte[]
|
|
|
|
- fill();
|
|
|
|
- size = b.length;
|
|
|
|
- if (b.length == 0) {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- for (int i = 0; i < size; i++) {
|
|
|
|
|
|
+ int read = 0;
|
|
|
|
+ while (!ifCRLF) {
|
|
|
|
+ for (int i = 0; i < b.length; i++) {
|
|
read++;
|
|
read++;
|
|
- if (b[i] == '\n') {
|
|
|
|
|
|
+ if (b[i] == '\n' || b[i] == '\r') {
|
|
ifCRLF = true;
|
|
ifCRLF = true;
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
- if (b[i] != '\r') {
|
|
|
|
- baos.write(b[i]);
|
|
|
|
- }
|
|
|
|
|
|
+ stream.write(b[i]);
|
|
}
|
|
}
|
|
-
|
|
|
|
- // 重置b
|
|
|
|
- b = Arrays.copyOfRange(b, read, size);
|
|
|
|
- if (ifCRLF)
|
|
|
|
- break;
|
|
|
|
}
|
|
}
|
|
|
|
+ b = Arrays.copyOfRange(b, read, b.length);
|
|
|
|
|
|
- byte[] b = baos.toByteArray();
|
|
|
|
raf.seek(this.filePointer + read);
|
|
raf.seek(this.filePointer + read);
|
|
- baos.close();
|
|
|
|
- baos = null;
|
|
|
|
- return new String(b, CHARSET_NAME);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void fill() throws IOException {
|
|
|
|
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
|
|
|
|
- if (b != null && b.length > 0) {
|
|
|
|
- stream.write(b, 0, b.length);
|
|
|
|
- }
|
|
|
|
- int size = 0;
|
|
|
|
- long len = raf.length();
|
|
|
|
- long pointer = raf.getFilePointer();
|
|
|
|
- if (pointer >= len) {
|
|
|
|
- b = new byte[0];
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- byte[] _b = new byte[(int) (len - pointer)];
|
|
|
|
- this.filePointer = raf.getFilePointer();
|
|
|
|
- size = raf.read(_b);
|
|
|
|
- stream.write(_b, 0, size);
|
|
|
|
- _b = null;
|
|
|
|
- b = stream.toByteArray();
|
|
|
|
|
|
+ byte[] _b = stream.toByteArray();
|
|
|
|
+ stream.close();
|
|
stream = null;
|
|
stream = null;
|
|
|
|
+ return new String(_b, CHARSET_NAME);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|