AE86 3 gadi atpakaļ
vecāks
revīzija
5c6cf9d161

+ 16 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/FileConfigChecker.java

@@ -1,10 +1,15 @@
 package org.dbsyncer.biz.checker.impl.connector;
 package org.dbsyncer.biz.checker.impl.connector;
 
 
 import org.dbsyncer.biz.checker.ConnectorConfigChecker;
 import org.dbsyncer.biz.checker.ConnectorConfigChecker;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.FileConfig;
 import org.dbsyncer.connector.config.FileConfig;
+import org.dbsyncer.connector.model.FileSchema;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
 
 
+import java.io.File;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
 /**
 /**
@@ -19,10 +24,19 @@ public class FileConfigChecker implements ConnectorConfigChecker<FileConfig> {
     public void modify(FileConfig fileConfig, Map<String, String> params) {
     public void modify(FileConfig fileConfig, Map<String, String> params) {
         String fileDir = params.get("fileDir");
         String fileDir = params.get("fileDir");
         String schema = params.get("schema");
         String schema = params.get("schema");
-        String separator = params.get("separator");
+        String separator = StringUtil.trim(params.get("separator"));
         Assert.hasText(fileDir, "fileDir is empty.");
         Assert.hasText(fileDir, "fileDir is empty.");
         Assert.hasText(schema, "schema is empty.");
         Assert.hasText(schema, "schema is empty.");
-        Assert.hasText(schema, "separator is empty.");
+        Assert.hasText(separator, "separator is empty.");
+
+        List<FileSchema> fileSchemas = JsonUtil.jsonToArray(schema, FileSchema.class);
+        Assert.notEmpty(fileSchemas, "found not file schema.");
+
+        fileDir += !StringUtil.endsWith(fileDir, File.separator) ? File.separator : "";
+        for (FileSchema fileSchema : fileSchemas) {
+            String file = fileDir.concat(fileSchema.getFileName());
+            Assert.isTrue(new File(file).exists(), String.format("found not file '%s'", file));
+        }
 
 
         fileConfig.setFileDir(fileDir);
         fileConfig.setFileDir(fileDir);
         fileConfig.setSeparator(separator);
         fileConfig.setSeparator(separator);

+ 4 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java

@@ -28,6 +28,10 @@ public abstract class StringUtil {
         return StringUtils.endsWith(str, suffix);
         return StringUtils.endsWith(str, suffix);
     }
     }
 
 
+    public static String trim(String text) {
+        return StringUtils.trim(text);
+    }
+
     public static String replace(String text, String searchString, String replacement) {
     public static String replace(String text, String searchString, String replacement) {
         return StringUtils.replace(text, searchString, replacement);
         return StringUtils.replace(text, searchString, replacement);
     }
     }

+ 41 - 34
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnector.java

@@ -1,11 +1,9 @@
 package org.dbsyncer.connector.file;
 package org.dbsyncer.connector.file;
 
 
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.LineIterator;
 import org.apache.commons.io.LineIterator;
 import org.dbsyncer.common.column.Lexer;
 import org.dbsyncer.common.column.Lexer;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.model.Result;
-import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.AbstractConnector;
@@ -29,7 +27,6 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.net.UnknownHostException;
-import java.nio.charset.Charset;
 import java.util.*;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
@@ -107,42 +104,34 @@ public final class FileConnector extends AbstractConnector implements Connector<
     @Override
     @Override
     public Result reader(FileConnectorMapper connectorMapper, ReaderConfig config) {
     public Result reader(FileConnectorMapper connectorMapper, ReaderConfig config) {
         List<Map<String, Object>> list = new ArrayList<>();
         List<Map<String, Object>> list = new ArrayList<>();
+        FileReader reader = null;
         try {
         try {
-            String filePath = config.getCommand().get(FILE_PATH);
-            List<String> lines = FileUtils.readLines(new File(filePath), Charset.defaultCharset());
-
-            if (!CollectionUtils.isEmpty(lines)) {
-                int total = lines.size();
-                int from = (config.getPageIndex() - 1) * config.getPageSize();
-                int to = from + config.getPageSize() > total ? total : from + config.getPageSize();
-
-                if (from < total) {
-                    FileConfig fileConfig = connectorMapper.getConfig();
-                    FileSchema fileSchema = getFileSchema(connectorMapper, config.getCommand().get(FILE_NAME));
-                    final List<Field> fields = fileSchema.getFields();
-                    Assert.notEmpty(fields, "The fields of file schema is empty.");
-
-                    lines.subList(from, to).forEach(line -> {
-                        Map<String, Object> row = new LinkedHashMap<>();
-                        List<String> columns = new LinkedList<>();
-                        Lexer lexer = new Lexer(line);
-                        while (lexer.hasNext()) {
-                            columns.add(lexer.nextToken(fileConfig.getSeparator().charAt(0)));
-                        }
-
-                        int columnSize = columns.size();
-                        int fieldSize = fields.size();
-                        for (int i = 0; i < fieldSize; i++) {
-                            if (i < columnSize) {
-                                row.put(fields.get(i).getName(), resolver.resolveValue(fields.get(i).getTypeName(), columns.get(i)));
-                            }
-                        }
-                        list.add(row);
-                    });
+            FileConfig fileConfig = connectorMapper.getConfig();
+            FileSchema fileSchema = getFileSchema(connectorMapper, config.getCommand().get(FILE_NAME));
+            final List<Field> fields = fileSchema.getFields();
+            Assert.notEmpty(fields, "The fields of file schema is empty.");
+            char separator = fileConfig.getSeparator().charAt(0);
+
+            reader = new FileReader(new File(config.getCommand().get(FILE_PATH)));
+            LineIterator lineIterator = IOUtils.lineIterator(reader);
+            int from = (config.getPageIndex() - 1) * config.getPageSize();
+            int to = from + config.getPageSize();
+            AtomicLong count = new AtomicLong();
+            while (lineIterator.hasNext()) {
+                if (count.get() >= from && count.get() < to) {
+                    list.add(parseRow(fields, separator, lineIterator.next()));
+                } else {
+                    lineIterator.next();
+                }
+                count.addAndGet(1);
+                if (count.get() >= to) {
+                    break;
                 }
                 }
             }
             }
         } catch (IOException e) {
         } catch (IOException e) {
             throw new ConnectorException(e.getCause());
             throw new ConnectorException(e.getCause());
+        } finally {
+            IOUtils.closeQuietly(reader);
         }
         }
         return new Result(list);
         return new Result(list);
     }
     }
@@ -172,6 +161,24 @@ public final class FileConnector extends AbstractConnector implements Connector<
         return Collections.EMPTY_MAP;
         return Collections.EMPTY_MAP;
     }
     }
 
 
+    private Map<String, Object> parseRow(List<Field> fields, char separator, String next) {
+        Map<String, Object> row = new LinkedHashMap<>();
+        List<String> columns = new LinkedList<>();
+        Lexer lexer = new Lexer(next);
+        while (lexer.hasNext()) {
+            columns.add(lexer.nextToken(separator));
+        }
+
+        int columnSize = columns.size();
+        int fieldSize = fields.size();
+        for (int i = 0; i < fieldSize; i++) {
+            if (i < columnSize) {
+                row.put(fields.get(i).getName(), resolver.resolveValue(fields.get(i).getTypeName(), columns.get(i)));
+            }
+        }
+        return row;
+    }
+
     private FileSchema getFileSchema(FileConnectorMapper connectorMapper, String tableName) {
     private FileSchema getFileSchema(FileConnectorMapper connectorMapper, String tableName) {
         List<FileSchema> fileSchemaList = getFileSchema(connectorMapper);
         List<FileSchema> fileSchemaList = getFileSchema(connectorMapper);
         for (FileSchema fileSchema : fileSchemaList) {
         for (FileSchema fileSchema : fileSchemaList) {

+ 5 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java

@@ -3,6 +3,7 @@ package org.dbsyncer.listener.enums;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.ListenerException;
+import org.dbsyncer.listener.file.FileExtractor;
 import org.dbsyncer.listener.kafka.KafkaExtractor;
 import org.dbsyncer.listener.kafka.KafkaExtractor;
 import org.dbsyncer.listener.mysql.MysqlExtractor;
 import org.dbsyncer.listener.mysql.MysqlExtractor;
 import org.dbsyncer.listener.oracle.OracleExtractor;
 import org.dbsyncer.listener.oracle.OracleExtractor;
@@ -40,6 +41,10 @@ public enum ListenerEnum {
      * log_Kafka
      * log_Kafka
      */
      */
     LOG_KAFKA(ListenerTypeEnum.LOG.getType() + ConnectorEnum.KAFKA.getType(), KafkaExtractor.class),
     LOG_KAFKA(ListenerTypeEnum.LOG.getType() + ConnectorEnum.KAFKA.getType(), KafkaExtractor.class),
+    /**
+     * log_File
+     */
+    LOG_FILE(ListenerTypeEnum.LOG.getType() + ConnectorEnum.FILE.getType(), FileExtractor.class),
     /**
     /**
      * timing_Mysql
      * timing_Mysql
      */
      */

+ 60 - 23
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -1,19 +1,25 @@
 package org.dbsyncer.listener.file;
 package org.dbsyncer.listener.file;
 
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.IOUtils;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.RandomUtil;
 import org.dbsyncer.common.util.RandomUtil;
 import org.dbsyncer.connector.config.FileConfig;
 import org.dbsyncer.connector.config.FileConfig;
 import org.dbsyncer.connector.file.FileConnectorMapper;
 import org.dbsyncer.connector.file.FileConnectorMapper;
+import org.dbsyncer.connector.model.FileSchema;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.ListenerException;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
 
 
+import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.io.RandomAccessFile;
 import java.nio.file.*;
 import java.nio.file.*;
 import java.util.List;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantLock;
 
 
@@ -31,6 +37,9 @@ public class FileExtractor extends AbstractExtractor {
     private FileConnectorMapper connectorMapper;
     private FileConnectorMapper connectorMapper;
     private WatchService watchService;
     private WatchService watchService;
     private Worker worker;
     private Worker worker;
+    private Map<String, RandomAccessFile> pipeline = new ConcurrentHashMap<>();
+    private static final byte[] buffer = new byte[4096];
+    private static final String POS_PREFIX = "pos_";
 
 
     @Override
     @Override
     public void start() {
     public void start() {
@@ -42,10 +51,11 @@ public class FileExtractor extends AbstractExtractor {
             }
             }
 
 
             connectorMapper = (FileConnectorMapper) connectorFactory.connect(connectorConfig);
             connectorMapper = (FileConnectorMapper) connectorFactory.connect(connectorConfig);
-            FileConfig config = connectorMapper.getConfig();
+            final FileConfig config = connectorMapper.getConfig();
             final String mapperCacheKey = connectorFactory.getConnector(connectorMapper).getConnectorMapperCacheKey(connectorConfig);
             final String mapperCacheKey = connectorFactory.getConnector(connectorMapper).getConnectorMapperCacheKey(connectorConfig);
             connected = true;
             connected = true;
 
 
+            initPipeline(config.getFileDir(), config.getSchema());
             watchService = FileSystems.getDefault().newWatchService();
             watchService = FileSystems.getDefault().newWatchService();
             Path p = Paths.get(config.getFileDir());
             Path p = Paths.get(config.getFileDir());
             p.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
             p.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
@@ -56,54 +66,81 @@ public class FileExtractor extends AbstractExtractor {
             worker.start();
             worker.start();
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error("启动失败:{}", e.getMessage());
             logger.error("启动失败:{}", e.getMessage());
+            closePipelineAndWatch();
             throw new ListenerException(e);
             throw new ListenerException(e);
         } finally {
         } finally {
             connectLock.unlock();
             connectLock.unlock();
         }
         }
     }
     }
 
 
+    private void initPipeline(String fileDir, String schema) throws IOException {
+        List<FileSchema> fileSchemas = JsonUtil.jsonToArray(schema, FileSchema.class);
+        Assert.notEmpty(fileSchemas, "found not file schema.");
+        for (FileSchema fileSchema : fileSchemas) {
+            String file = fileDir.concat(fileSchema.getFileName());
+            Assert.isTrue(new File(file).exists(), String.format("found not file '%s'", file));
+
+            final RandomAccessFile raf = new RandomAccessFile(file, "r");
+            pipeline.put(fileSchema.getFileName(), raf);
+
+            final String filePosKey = getFilePosKey(fileSchema.getFileName());
+            if (snapshot.containsKey(filePosKey)) {
+                raf.seek(NumberUtil.toLong(snapshot.get(filePosKey), 0L));
+                continue;
+            }
+
+            raf.seek(raf.length());
+        }
+    }
+
     @Override
     @Override
     public void close() {
     public void close() {
         try {
         try {
+            closePipelineAndWatch();
             connected = false;
             connected = false;
             if (null != worker && !worker.isInterrupted()) {
             if (null != worker && !worker.isInterrupted()) {
                 worker.interrupt();
                 worker.interrupt();
                 worker = null;
                 worker = null;
             }
             }
-            if (null != watchService) {
-                watchService.close();
-            }
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error("关闭失败:{}", e.getMessage());
             logger.error("关闭失败:{}", e.getMessage());
         }
         }
     }
     }
 
 
-    private void parseEvent(String fileName) {
+    private void closePipelineAndWatch() {
+        try {
+            pipeline.values().forEach(raf -> IOUtils.closeQuietly(raf));
+            pipeline.clear();
 
 
+            if (null != watchService) {
+                watchService.close();
+            }
+        } catch (IOException ex) {
+            logger.error(ex.getMessage());
+        }
     }
     }
 
 
-    private void read(String file) {
-        RandomAccessFile raf = null;
-        byte[] buffer = new byte[4096];
-        try {
-            raf = new RandomAccessFile(file, "r");
-            raf.seek(raf.length());
-            logger.info("offset:{}", raf.getFilePointer());
+    private String getFilePosKey(String fileName) {
+        return POS_PREFIX.concat(fileName);
+    }
+
+    private void parseEvent(String fileName) throws IOException {
+        if (pipeline.containsKey(fileName)) {
+            final RandomAccessFile raf = pipeline.get(fileName);
 
 
-            while (true) {
-                int len = raf.read(buffer);
-                if (-1 != len) {
+            int len = 0;
+            while (-1 != len) {
+                len = raf.read(buffer);
+                if (0 < len) {
+                    // TODO 解析 line
                     logger.info("offset:{}, len:{}", raf.getFilePointer(), len);
                     logger.info("offset:{}, len:{}", raf.getFilePointer(), len);
                     logger.info(new String(buffer, 1, len, "UTF-8"));
                     logger.info(new String(buffer, 1, len, "UTF-8"));
+                    continue;
                 }
                 }
-                TimeUnit.SECONDS.sleep(1);
             }
             }
-        } catch (IOException e) {
-            logger.error(e.getMessage());
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage());
-        } finally {
-            IOUtils.closeQuietly(raf);
+
+            final String filePosKey = getFilePosKey(fileName);
+            snapshot.put(filePosKey, String.valueOf(raf.getFilePointer()));
         }
         }
     }
     }
 
 

+ 8 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -284,7 +284,14 @@ public class ParserFactory implements Parser {
             // 6、更新结果
             // 6、更新结果
             flush(task, writer);
             flush(task, writer);
 
 
-            // 7、更新分页数
+            // 7、判断尾页
+            if (data.size() < pageSize) {
+                params.clear();
+                logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
+                break;
+            }
+
+            // 8、更新分页数
             params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
             params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
         }
         }
     }
     }