AE86 3 anni fa
parent
commit
d7dab4c6c9

+ 12 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnector.java

@@ -1,6 +1,8 @@
 package org.dbsyncer.connector.file;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
 import org.dbsyncer.common.column.Lexer;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
@@ -23,6 +25,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -85,12 +88,18 @@ public final class FileConnector extends AbstractConnector implements Connector<
     @Override
     public long getCount(FileConnectorMapper connectorMapper, Map<String, String> command) {
         AtomicLong count = new AtomicLong();
+        FileReader reader = null;
         try {
-            String file = command.get(FILE_PATH);
-            List<String> lines = FileUtils.readLines(new File(file), Charset.defaultCharset());
-            count.addAndGet(lines.size());
+            reader = new FileReader(new File(command.get(FILE_PATH)));
+            LineIterator lineIterator = IOUtils.lineIterator(reader);
+            while (lineIterator.hasNext()) {
+                lineIterator.next();
+                count.addAndGet(1);
+            }
         } catch (IOException e) {
             throw new ConnectorException(e.getCause());
+        } finally {
+            IOUtils.closeQuietly(reader);
         }
         return count.get();
     }

+ 130 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -0,0 +1,130 @@
+package org.dbsyncer.listener.file;
+
+import org.apache.commons.io.IOUtils;
+import org.dbsyncer.common.util.RandomUtil;
+import org.dbsyncer.connector.config.FileConfig;
+import org.dbsyncer.connector.file.FileConnectorMapper;
+import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.ListenerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.*;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 21:42
+ */
+public class FileExtractor extends AbstractExtractor {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private final Lock connectLock = new ReentrantLock();
+    private volatile boolean connected;
+    private FileConnectorMapper connectorMapper;
+    private WatchService watchService;
+    private Worker worker;
+
+    @Override
+    public void start() {
+        try {
+            connectLock.lock();
+            if (connected) {
+                logger.error("FileExtractor is already started");
+                return;
+            }
+
+            connectorMapper = (FileConnectorMapper) connectorFactory.connect(connectorConfig);
+            FileConfig config = connectorMapper.getConfig();
+            final String mapperCacheKey = connectorFactory.getConnector(connectorMapper).getConnectorMapperCacheKey(connectorConfig);
+            connected = true;
+
+            watchService = FileSystems.getDefault().newWatchService();
+            Path p = Paths.get(config.getFileDir());
+            p.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
+
+            worker = new Worker();
+            worker.setName(new StringBuilder("file-parser-").append(mapperCacheKey).append("_").append(RandomUtil.nextInt(1, 100)).toString());
+            worker.setDaemon(false);
+            worker.start();
+        } catch (Exception e) {
+            logger.error("启动失败:{}", e.getMessage());
+            throw new ListenerException(e);
+        } finally {
+            connectLock.unlock();
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            connected = false;
+            if (null != worker && !worker.isInterrupted()) {
+                worker.interrupt();
+                worker = null;
+            }
+            if (null != watchService) {
+                watchService.close();
+            }
+        } catch (Exception e) {
+            logger.error("关闭失败:{}", e.getMessage());
+        }
+    }
+
+    private void parseEvent(String fileName) {
+
+    }
+
+    private void read(String file) {
+        RandomAccessFile raf = null;
+        byte[] buffer = new byte[4096];
+        try {
+            raf = new RandomAccessFile(file, "r");
+            raf.seek(raf.length());
+            logger.info("offset:{}", raf.getFilePointer());
+
+            while (true) {
+                int len = raf.read(buffer);
+                if (-1 != len) {
+                    logger.info("offset:{}, len:{}", raf.getFilePointer(), len);
+                    logger.info(new String(buffer, 1, len, "UTF-8"));
+                }
+                TimeUnit.SECONDS.sleep(1);
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage());
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        } finally {
+            IOUtils.closeQuietly(raf);
+        }
+    }
+
+    final class Worker extends Thread {
+
+        @Override
+        public void run() {
+            while (!isInterrupted() && connected) {
+                try {
+                    WatchKey watchKey = watchService.take();
+                    List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
+                    for (WatchEvent<?> event : watchEvents) {
+                        parseEvent(event.context().toString());
+                    }
+                    watchKey.reset();
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                }
+            }
+        }
+
+    }
+
+}

+ 83 - 0
dbsyncer-listener/src/main/test/FileWatchTest.java

@@ -0,0 +1,83 @@
+import org.apache.commons.io.IOUtils;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.*;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 21:32
+ */
+public class FileWatchTest {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private String path = "d:/test/";
+    private WatchService watchService;
+
+    @After
+    public void close() throws IOException {
+        if (null != watchService) {
+            watchService.close();
+        }
+    }
+
+    @Test
+    public void testFileWatch() throws IOException, InterruptedException {
+        watchService = FileSystems.getDefault().newWatchService();
+        Path p = Paths.get(path);
+        p.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
+
+        logger.info("启动监听");
+        long count = 0L;
+        while (count < 30) {
+            WatchKey watchKey = watchService.take();
+            List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
+            for (WatchEvent<?> event : watchEvents) {
+                Object context = event.context();
+                logger.info("[{}{}] 文件发生了[{}]事件", path, context, event.kind());
+            }
+            watchKey.reset();
+
+            TimeUnit.SECONDS.sleep(1);
+            count++;
+        }
+    }
+
+    @Test
+    public void testReadFile() {
+        read(path + "test.txt");
+    }
+
+    private void read(String file) {
+        RandomAccessFile raf = null;
+        byte[] buffer = new byte[4096];
+        try {
+            raf = new RandomAccessFile(file, "r");
+            raf.seek(raf.length());
+            logger.info("offset:{}", raf.getFilePointer());
+
+            while (true) {
+                int len = raf.read(buffer);
+                if (-1 != len) {
+                    logger.info("offset:{}, len:{}", raf.getFilePointer(), len);
+                    logger.info(new String(buffer, 1, len, "UTF-8"));
+                }
+                TimeUnit.SECONDS.sleep(1);
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage());
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        } finally {
+            IOUtils.closeQuietly(raf);
+        }
+    }
+
+}

+ 1 - 1
dbsyncer-listener/src/main/test/KafkaClientTest.java

@@ -1,12 +1,12 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.dbsyncer.common.util.JsonUtil;
-import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.KafkaConfig;
 import org.dbsyncer.connector.enums.KafkaFieldTypeEnum;
 import org.dbsyncer.connector.kafka.KafkaClient;
 import org.dbsyncer.connector.kafka.serialization.JsonToMapDeserializer;
 import org.dbsyncer.connector.kafka.serialization.MapToJsonSerializer;
+import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.util.KafkaUtil;
 import org.junit.After;
 import org.junit.Before;