Przeglądaj źródła

Merge remote-tracking branch 'origin/V_1.0.0_Beta' into yjwang

yjwang 3 lat temu
rodzic
commit
c89e9a9483
23 zmienionych plików z 1183 dodań i 34 usunięć
  1. 46 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/FileConfigChecker.java
  2. 1 1
      dbsyncer-common/src/main/java/org/dbsyncer/common/column/Lexer.java
  3. 8 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java
  4. 10 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java
  5. 13 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/FileConfig.java
  6. 4 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  7. 5 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BitSetter.java
  8. 101 11
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnector.java
  9. 103 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileResolver.java
  10. 37 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/column/ColumnValue.java
  11. 87 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/column/FileColumnValue.java
  12. 36 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/FileSchema.java
  13. 5 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java
  14. 331 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/BufferedRandomAccessFile.java
  15. 223 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java
  16. 1 1
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/TestDecodingMessageDecoder.java
  17. 83 0
      dbsyncer-listener/src/main/test/FileWatchTest.java
  18. 1 1
      dbsyncer-listener/src/main/test/KafkaClientTest.java
  19. 8 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  20. 64 13
      dbsyncer-web/src/main/resources/public/connector/addFile.html
  21. 7 2
      dbsyncer-web/src/main/resources/static/css/common.css
  22. 9 2
      dbsyncer-web/src/main/resources/static/css/index/index.css
  23. BIN
      dbsyncer-web/src/main/resources/static/img/File.png

+ 46 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/FileConfigChecker.java

@@ -0,0 +1,46 @@
+package org.dbsyncer.biz.checker.impl.connector;
+
+import org.dbsyncer.biz.checker.ConnectorConfigChecker;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.config.FileConfig;
+import org.dbsyncer.connector.model.FileSchema;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 0:04
+ */
+@Component
+public class FileConfigChecker implements ConnectorConfigChecker<FileConfig> {
+
+    @Override
+    public void modify(FileConfig fileConfig, Map<String, String> params) {
+        String fileDir = params.get("fileDir");
+        String schema = params.get("schema");
+        String separator = StringUtil.trim(params.get("separator"));
+        Assert.hasText(fileDir, "fileDir is empty.");
+        Assert.hasText(schema, "schema is empty.");
+        Assert.hasText(separator, "separator is empty.");
+
+        List<FileSchema> fileSchemas = JsonUtil.jsonToArray(schema, FileSchema.class);
+        Assert.notEmpty(fileSchemas, "found not file schema.");
+
+        fileDir += !StringUtil.endsWith(fileDir, File.separator) ? File.separator : "";
+        for (FileSchema fileSchema : fileSchemas) {
+            String file = fileDir.concat(fileSchema.getFileName());
+            Assert.isTrue(new File(file).exists(), String.format("found not file '%s'", file));
+        }
+
+        fileConfig.setFileDir(fileDir);
+        fileConfig.setSeparator(separator.charAt(0));
+        fileConfig.setSchema(schema);
+    }
+
+}

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/Lexer.java → dbsyncer-common/src/main/java/org/dbsyncer/common/column/Lexer.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.listener.postgresql.column;
+package org.dbsyncer.common.column;
 
 /**
  * @author AE86

+ 8 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java

@@ -24,6 +24,14 @@ public abstract class StringUtil {
         return StringUtils.contains(seq, searchSeq);
     }
 
+    public static boolean endsWith(final CharSequence str, final CharSequence suffix) {
+        return StringUtils.endsWith(str, suffix);
+    }
+
+    public static String trim(String text) {
+        return StringUtils.trim(text);
+    }
+
     public static String replace(String text, String searchString, String replacement) {
         return StringUtils.replace(text, searchString, replacement);
     }

+ 10 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.connector;
 
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.model.MetaInfo;
@@ -118,8 +119,15 @@ public class ConnectorFactory implements DisposableBean {
         String sType = sourceCommandConfig.getType();
         String tType = targetCommandConfig.getType();
         Map<String, String> map = new HashMap<>();
-        map.putAll(getConnector(sType).getSourceCommand(sourceCommandConfig));
-        map.putAll(getConnector(tType).getTargetCommand(targetCommandConfig));
+        Map sCmd = getConnector(sType).getSourceCommand(sourceCommandConfig);
+        if (!CollectionUtils.isEmpty(sCmd)) {
+            map.putAll(sCmd);
+        }
+
+        Map tCmd = getConnector(tType).getTargetCommand(targetCommandConfig);
+        if (!CollectionUtils.isEmpty(sCmd)) {
+            map.putAll(tCmd);
+        }
         return map;
     }
 

+ 13 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/FileConfig.java

@@ -12,6 +12,11 @@ public class FileConfig extends ConnectorConfig {
      */
     private String fileDir;
 
+    /**
+     * 分隔符
+     */
+    private char separator;
+
     /**
      * 文件描述信息
      */
@@ -25,6 +30,14 @@ public class FileConfig extends ConnectorConfig {
         this.fileDir = fileDir;
     }
 
+    public char getSeparator() {
+        return separator;
+    }
+
+    public void setSeparator(char separator) {
+        this.separator = separator;
+    }
+
     public String getSchema() {
         return schema;
     }

+ 4 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -71,6 +71,10 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
 
     @Override
     public long getCount(DatabaseConnectorMapper connectorMapper, Map<String, String> command) {
+        if (CollectionUtils.isEmpty(command)) {
+            return 0L;
+        }
+
         // 1、获取select SQL
         String queryCountSql = command.get(ConnectorConstant.OPERTION_QUERY_COUNT);
         Assert.hasText(queryCountSql, "查询总数语句不能为空.");

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BitSetter.java

@@ -21,6 +21,11 @@ public class BitSetter extends AbstractSetter<byte[]> {
             ps.setBytes(i, bitSet.toByteArray());
             return;
         }
+        if (val instanceof Integer) {
+            Integer integer = (Integer) val;
+            ps.setInt(i, integer);
+            return;
+        }
         throw new ConnectorException(String.format("BitSetter can not find type [%s], val [%s]", type, val));
     }
 

+ 101 - 11
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnector.java

@@ -1,19 +1,34 @@
 package org.dbsyncer.connector.file;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
+import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.config.CommandConfig;
+import org.dbsyncer.connector.config.FileConfig;
+import org.dbsyncer.connector.config.ReaderConfig;
+import org.dbsyncer.connector.config.WriterBatchConfig;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.FileSchema;
 import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
 
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * @author AE86
@@ -24,6 +39,10 @@ public final class FileConnector extends AbstractConnector implements Connector<
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    private static final String FILE_NAME = "fileName";
+    private static final String FILE_PATH = "filePath";
+    private final FileResolver fileResolver = new FileResolver();
+
     @Override
     public ConnectorMapper connect(FileConfig config) {
         return new FileConnectorMapper(config);
@@ -36,7 +55,7 @@ public final class FileConnector extends AbstractConnector implements Connector<
 
     @Override
     public boolean isAlive(FileConnectorMapper connectorMapper) {
-        return false;
+        return connectorMapper.getConnection().exists();
     }
 
     @Override
@@ -48,27 +67,72 @@ public final class FileConnector extends AbstractConnector implements Connector<
             logger.error(e.getMessage());
             localIP = "127.0.0.1";
         }
-        return String.format("%s-%s", config.getConnectorType(), localIP, config.getFileDir());
+        return String.format("%s-%s", localIP, config.getFileDir());
     }
 
     @Override
     public List<Table> getTable(FileConnectorMapper connectorMapper) {
-        return null;
+        return getFileSchema(connectorMapper).stream().map(fileSchema -> new Table(fileSchema.getFileName())).collect(Collectors.toList());
     }
 
     @Override
     public MetaInfo getMetaInfo(FileConnectorMapper connectorMapper, String tableName) {
-        return null;
+        FileSchema fileSchema = getFileSchema(connectorMapper, tableName);
+        return new MetaInfo().setColumn(fileSchema.getFields());
     }
 
     @Override
     public long getCount(FileConnectorMapper connectorMapper, Map<String, String> command) {
-        return 0;
+        AtomicLong count = new AtomicLong();
+        FileReader reader = null;
+        try {
+            reader = new FileReader(new File(command.get(FILE_PATH)));
+            LineIterator lineIterator = IOUtils.lineIterator(reader);
+            while (lineIterator.hasNext()) {
+                lineIterator.next();
+                count.addAndGet(1);
+            }
+        } catch (IOException e) {
+            throw new ConnectorException(e.getCause());
+        } finally {
+            IOUtils.closeQuietly(reader);
+        }
+        return count.get();
     }
 
     @Override
     public Result reader(FileConnectorMapper connectorMapper, ReaderConfig config) {
-        return null;
+        List<Map<String, Object>> list = new ArrayList<>();
+        FileReader reader = null;
+        try {
+            FileConfig fileConfig = connectorMapper.getConfig();
+            FileSchema fileSchema = getFileSchema(connectorMapper, config.getCommand().get(FILE_NAME));
+            final List<Field> fields = fileSchema.getFields();
+            Assert.notEmpty(fields, "The fields of file schema is empty.");
+            final char separator = fileConfig.getSeparator();
+
+            reader = new FileReader(new File(config.getCommand().get(FILE_PATH)));
+            LineIterator lineIterator = IOUtils.lineIterator(reader);
+            int from = (config.getPageIndex() - 1) * config.getPageSize();
+            int to = from + config.getPageSize();
+            AtomicLong count = new AtomicLong();
+            while (lineIterator.hasNext()) {
+                if (count.get() >= from && count.get() < to) {
+                    list.add(fileResolver.parseMap(fields, separator, lineIterator.next()));
+                } else {
+                    lineIterator.next();
+                }
+                count.addAndGet(1);
+                if (count.get() >= to) {
+                    break;
+                }
+            }
+        } catch (IOException e) {
+            throw new ConnectorException(e.getCause());
+        } finally {
+            IOUtils.closeQuietly(reader);
+        }
+        return new Result(list);
     }
 
     @Override
@@ -78,11 +142,37 @@ public final class FileConnector extends AbstractConnector implements Connector<
 
     @Override
     public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
-        return null;
+        Map<String, String> command = new HashMap<>();
+        FileConfig fileConfig = (FileConfig) commandConfig.getConnectorConfig();
+        final String fileDir = fileConfig.getFileDir();
+        StringBuilder file = new StringBuilder(fileDir);
+        if (!StringUtil.endsWith(fileDir, File.separator)) {
+            file.append(File.separator);
+        }
+        file.append(commandConfig.getTable().getName());
+        command.put(FILE_PATH, file.toString());
+        command.put(FILE_NAME, commandConfig.getTable().getName());
+        return command;
     }
 
     @Override
     public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
-        return null;
+        return Collections.EMPTY_MAP;
+    }
+
+    private FileSchema getFileSchema(FileConnectorMapper connectorMapper, String tableName) {
+        List<FileSchema> fileSchemaList = getFileSchema(connectorMapper);
+        for (FileSchema fileSchema : fileSchemaList) {
+            if (StringUtil.equals(fileSchema.getFileName(), tableName)) {
+                return fileSchema;
+            }
+        }
+        throw new ConnectorException(String.format("can not find fileSchema by tableName '%s'", tableName));
+    }
+
+    private List<FileSchema> getFileSchema(FileConnectorMapper connectorMapper) {
+        List<FileSchema> fileSchemas = JsonUtil.jsonToArray(connectorMapper.getConfig().getSchema(), FileSchema.class);
+        Assert.notEmpty(fileSchemas, "The schema is empty.");
+        return fileSchemas;
     }
 }

+ 103 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileResolver.java

@@ -0,0 +1,103 @@
+package org.dbsyncer.connector.file;
+
+import org.dbsyncer.common.column.Lexer;
+import org.dbsyncer.connector.file.column.ColumnValue;
+import org.dbsyncer.connector.file.column.FileColumnValue;
+import org.dbsyncer.connector.model.Field;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 15:46
+ */
+public class FileResolver {
+
+    private ColumnValue value = new FileColumnValue();
+
+    public Map<String, Object> parseMap(List<Field> fields, char separator, String line) {
+        Map<String, Object> row = new LinkedHashMap<>();
+        parse(fields, separator, line, (key, value) -> row.put(key, value));
+        return row;
+    }
+
+    public List<Object> parseList(List<Field> fields, char separator, String line) {
+        List<Object> data = new ArrayList<>();
+        parse(fields, separator, line, (key, value) -> data.add(value));
+        return data;
+    }
+
+    /**
+     * Resolve the value of a {@link ColumnValue}.
+     *
+     * @param typeName
+     * @param columnValue
+     * @return
+     */
+    private Object resolveValue(String typeName, String columnValue) {
+        value.setValue(columnValue);
+
+        if (value.isNull()) {
+            return null;
+        }
+
+        switch (typeName) {
+            case "string":
+                return value.asString();
+
+            case "integer":
+                return value.asInteger();
+
+            case "date":
+                return value.asDate();
+
+            case "timestamp":
+                return value.asTimestamp();
+
+            case "boolean":
+                return value.asBoolean();
+
+            case "long":
+                return value.asLong();
+
+            case "float":
+                return value.asFloat();
+
+            case "double":
+                return value.asDouble();
+
+            case "time":
+                return value.asTime();
+
+            case "bytea":
+                return value.asByteArray();
+
+            default:
+                return null;
+        }
+
+    }
+
+    private void parse(List<Field> fields, char separator, String line, ResultSetMapper mapper) {
+        int fieldSize = fields.size();
+        int i = 0;
+        Lexer lexer = new Lexer(line);
+        while (i < fieldSize) {
+            if (lexer.hasNext()) {
+                mapper.apply(fields.get(i).getName(), resolveValue(fields.get(i).getTypeName(), lexer.nextToken(separator)));
+            } else {
+                mapper.apply(fields.get(i).getName(), null);
+            }
+            i++;
+        }
+    }
+
+    private interface ResultSetMapper {
+        void apply(String key, Object value);
+    }
+
+}

+ 37 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/column/ColumnValue.java

@@ -0,0 +1,37 @@
+package org.dbsyncer.connector.file.column;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/5 22:39
+ */
+public interface ColumnValue {
+
+    void setValue(String value);
+
+    boolean isNull();
+
+    String asString();
+
+    Boolean asBoolean();
+
+    Integer asInteger();
+
+    Long asLong();
+
+    Float asFloat();
+
+    Double asDouble();
+
+    Date asDate();
+
+    Timestamp asTimestamp();
+
+    Object asTime();
+
+    byte[] asByteArray();
+
+}

+ 87 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/column/FileColumnValue.java

@@ -0,0 +1,87 @@
+package org.dbsyncer.connector.file.column;
+
+import org.dbsyncer.common.util.DateFormatUtil;
+import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.common.util.StringUtil;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 15:48
+ */
+public class FileColumnValue implements ColumnValue {
+
+    private String value;
+
+    @Override
+    public void setValue(String value) {
+        this.value = value;
+    }
+
+    @Override
+    public boolean isNull() {
+        return value == null;
+    }
+
+    @Override
+    public String asString() {
+        return value;
+    }
+
+    @Override
+    public Boolean asBoolean() {
+        return "true".equalsIgnoreCase(value);
+    }
+
+    @Override
+    public Integer asInteger() {
+        return Integer.valueOf(value);
+    }
+
+    @Override
+    public Long asLong() {
+        return NumberUtil.toLong(value);
+    }
+
+    @Override
+    public Float asFloat() {
+        return Float.valueOf(value);
+    }
+
+    @Override
+    public Double asDouble() {
+        return Double.valueOf(value);
+    }
+
+    @Override
+    public Date asDate() {
+        return DateFormatUtil.stringToDate(asString());
+    }
+
+    @Override
+    public Timestamp asTimestamp() {
+        try {
+            if (NumberUtil.isCreatable(value)) {
+                return new Timestamp(asLong());
+            }
+
+            return DateFormatUtil.stringToTimestamp(value);
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    @Override
+    public Object asTime() {
+        return asString();
+    }
+
+    @Override
+    public byte[] asByteArray() {
+        return StringUtil.hexStringToByteArray(value.substring(2));
+    }
+}

+ 36 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/FileSchema.java

@@ -0,0 +1,36 @@
+package org.dbsyncer.connector.model;
+
+import java.util.List;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 0:04
+ */
+public class FileSchema {
+
+    /**
+     * 文件名
+     */
+    private String fileName;
+    /**
+     * 字段信息
+     */
+    private List<Field> fields;
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public void setFileName(String fileName) {
+        this.fileName = fileName;
+    }
+
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    public void setFields(List<Field> fields) {
+        this.fields = fields;
+    }
+}

+ 5 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java

@@ -3,6 +3,7 @@ package org.dbsyncer.listener.enums;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.listener.ListenerException;
+import org.dbsyncer.listener.file.FileExtractor;
 import org.dbsyncer.listener.kafka.KafkaExtractor;
 import org.dbsyncer.listener.mysql.MysqlExtractor;
 import org.dbsyncer.listener.oracle.OracleExtractor;
@@ -40,6 +41,10 @@ public enum ListenerEnum {
      * log_Kafka
      */
     LOG_KAFKA(ListenerTypeEnum.LOG.getType() + ConnectorEnum.KAFKA.getType(), KafkaExtractor.class),
+    /**
+     * log_File
+     */
+    LOG_FILE(ListenerTypeEnum.LOG.getType() + ConnectorEnum.FILE.getType(), FileExtractor.class),
     /**
      * timing_Mysql
      */

+ 331 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/BufferedRandomAccessFile.java

@@ -0,0 +1,331 @@
+package org.dbsyncer.listener.file;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/7 22:27
+ */
+public class BufferedRandomAccessFile extends RandomAccessFile {
+    static final int LogBuffSz_ = 16; // 64K buffer
+    public static final int BuffSz_ = (1 << LogBuffSz_);
+    static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+
+    private String path_;
+
+    /*
+     * This implementation is based on the buffer implementation in Modula-3's
+     * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+     */
+    private boolean dirty_; // true iff unflushed bytes exist
+    private boolean syncNeeded_; // dirty_ can be cleared by e.g. seek, so track sync separately
+    private long curr_; // current position in file
+    private long lo_, hi_; // bounds on characters in "buff"
+    private byte[] buff_; // local buffer
+    private long maxHi_; // this.lo + this.buff.length
+    private boolean hitEOF_; // buffer contains last file block?
+    private long diskPos_; // disk position
+
+    /*
+     * To describe the above fields, we introduce the following abstractions for
+     * the file "f":
+     *
+     * len(f) the length of the file curr(f) the current position in the file
+     * c(f) the abstract contents of the file disk(f) the contents of f's
+     * backing disk file closed(f) true iff the file is closed
+     *
+     * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+     * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+     * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+     * operation has the effect of making "disk(f)" identical to "c(f)".
+     *
+     * A file is said to be *valid* if the following conditions hold:
+     *
+     * V1. The "closed" and "curr" fields are correct:
+     *
+     * f.closed == closed(f) f.curr == curr(f)
+     *
+     * V2. The current position is either contained in the buffer, or just past
+     * the buffer:
+     *
+     * f.lo <= f.curr <= f.hi
+     *
+     * V3. Any (possibly) unflushed characters are stored in "f.buff":
+     *
+     * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+     *
+     * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+     *
+     * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+     * disk(f)[i])
+     *
+     * V5. "f.dirty" is true iff the buffer contains bytes that should be
+     * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+     *
+     * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+     *
+     * V6. this.maxHi == this.lo + this.buff.length
+     *
+     * Note that "f.buff" can be "null" in a valid file, since the range of
+     * characters in V3 is empty when "f.lo == f.curr".
+     *
+     * A file is said to be *ready* if the buffer contains the current position,
+     * i.e., when:
+     *
+     * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+     *
+     * When a file is ready, reading or writing a single byte can be performed
+     * by reading or writing the in-memory buffer without performing a disk
+     * operation.
+     */
+
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
+     * in mode <code>mode</code>, which should be "r" for reading only, or
+     * "rw" for reading and writing.
+     */
+    public BufferedRandomAccessFile(File file, String mode) throws IOException {
+        this(file, mode, 0);
+    }
+
+    public BufferedRandomAccessFile(File file, String mode, int size) throws IOException {
+        super(file, mode);
+        path_ = file.getAbsolutePath();
+        this.init(size);
+    }
+
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on the file named
+     * <code>name</code> in mode <code>mode</code>, which should be "r" for
+     * reading only, or "rw" for reading and writing.
+     */
+    public BufferedRandomAccessFile(String name, String mode) throws IOException {
+        this(name, mode, 0);
+    }
+
+    public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException {
+        super(name, mode);
+        path_ = name;
+        this.init(size);
+    }
+
+    private void init(int size) {
+        this.dirty_ = false;
+        this.lo_ = this.curr_ = this.hi_ = 0;
+        this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+        this.maxHi_ = (long) BuffSz_;
+        this.hitEOF_ = false;
+        this.diskPos_ = 0L;
+    }
+
+    public String getPath() {
+        return path_;
+    }
+
+    public void sync() throws IOException {
+        if (syncNeeded_) {
+            flush();
+            getChannel().force(true);
+            syncNeeded_ = false;
+        }
+    }
+
+//      public boolean isEOF() throws IOException
+//      {
+//          assert getFilePointer() <= length();
+//          return getFilePointer() == length();
+//      }
+
+    public void close() throws IOException {
+        this.flush();
+        this.buff_ = null;
+        super.close();
+    }
+
+    /**
+     * Flush any bytes in the file's buffer that have not yet been written to
+     * disk. If the file was created read-only, this method is a no-op.
+     */
+    public void flush() throws IOException {
+        this.flushBuffer();
+    }
+
+    /* Flush any dirty bytes in the buffer to disk. */
+    private void flushBuffer() throws IOException {
+        if (this.dirty_) {
+            if (this.diskPos_ != this.lo_)
+                super.seek(this.lo_);
+            int len = (int) (this.curr_ - this.lo_);
+            super.write(this.buff_, 0, len);
+            this.diskPos_ = this.curr_;
+            this.dirty_ = false;
+        }
+    }
+
+    /*
+     * Read at most "this.buff.length" bytes into "this.buff", returning the
+     * number of bytes read. If the return result is less than
+     * "this.buff.length", then EOF was read.
+     */
+    private int fillBuffer() throws IOException {
+        int cnt = 0;
+        int rem = this.buff_.length;
+        while (rem > 0) {
+            int n = super.read(this.buff_, cnt, rem);
+            if (n < 0)
+                break;
+            cnt += n;
+            rem -= n;
+        }
+        if ((cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length))) {
+            // make sure buffer that wasn't read is initialized with -1
+            Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
+        }
+        this.diskPos_ += cnt;
+        return cnt;
+    }
+
+    /*
+     * This method positions <code>this.curr</code> at position <code>pos</code>.
+     * If <code>pos</code> does not fall in the current buffer, it flushes the
+     * current buffer and loads the correct one.<p>
+     *
+     * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
+     * is at or past the end-of-file, which can only happen if the file was
+     * opened in read-only mode.
+     */
+    public void seek(long pos) throws IOException {
+        if (pos >= this.hi_ || pos < this.lo_) {
+            // seeking outside of current buffer -- flush and read
+            this.flushBuffer();
+            this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+            this.maxHi_ = this.lo_ + (long) this.buff_.length;
+            if (this.diskPos_ != this.lo_) {
+                super.seek(this.lo_);
+                this.diskPos_ = this.lo_;
+            }
+            int n = this.fillBuffer();
+            this.hi_ = this.lo_ + (long) n;
+        } else {
+            // seeking inside current buffer -- no read required
+            if (pos < this.curr_) {
+                // if seeking backwards, we must flush to maintain V4
+                this.flushBuffer();
+            }
+        }
+        this.curr_ = pos;
+    }
+
+    public long getFilePointer() {
+        return this.curr_;
+    }
+
+    public long length() throws IOException {
+        // max accounts for the case where we have written past the old file length, but not yet flushed our buffer
+        return Math.max(this.curr_, super.length());
+    }
+
+    public int read() throws IOException {
+        if (this.curr_ >= this.hi_) {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        byte res = this.buff_[(int) (this.curr_ - this.lo_)];
+        this.curr_++;
+        return ((int) res) & 0xFF; // convert byte -> int
+    }
+
+    public int read(byte[] b) throws IOException {
+        return this.read(b, 0, b.length);
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (this.curr_ >= this.hi_) {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(this.buff_, buffOff, b, off, len);
+        this.curr_ += len;
+        return len;
+    }
+
+    public void write(int b) throws IOException {
+        if (this.curr_ >= this.hi_) {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_) {
+                // at EOF -- bump "hi"
+                this.hi_++;
+            } else {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_) {
+                    // appending to EOF -- bump "hi"
+                    this.hi_++;
+                }
+            }
+        }
+        this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
+        this.curr_++;
+        this.dirty_ = true;
+        syncNeeded_ = true;
+    }
+
+    public void write(byte[] b) throws IOException {
+        this.write(b, 0, b.length);
+    }
+
+    public void write(byte[] b, int off, int len) throws IOException {
+        while (len > 0) {
+            int n = this.writeAtMost(b, off, len);
+            off += n;
+            len -= n;
+            this.dirty_ = true;
+            syncNeeded_ = true;
+        }
+    }
+
+    /*
+     * Write at most "len" bytes to "b" starting at position "off", and return
+     * the number of bytes written.
+     */
+    private int writeAtMost(byte[] b, int off, int len) throws IOException {
+        if (this.curr_ >= this.hi_) {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_) {
+                // at EOF -- bump "hi"
+                this.hi_ = this.maxHi_;
+            } else {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_) {
+                    // appending to EOF -- bump "hi"
+                    this.hi_ = this.maxHi_;
+                }
+            }
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(b, off, this.buff_, buffOff, len);
+        this.curr_ += len;
+        return len;
+    }
+}

+ 223 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -0,0 +1,223 @@
+package org.dbsyncer.listener.file;
+
+import org.apache.commons.io.IOUtils;
+import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.common.util.RandomUtil;
+import org.dbsyncer.connector.config.FileConfig;
+import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.file.FileConnectorMapper;
+import org.dbsyncer.connector.file.FileResolver;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.FileSchema;
+import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.ListenerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 21:42
+ */
+public class FileExtractor extends AbstractExtractor {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private static final String POS_PREFIX = "pos_";
+    private static final String CHARSET_NAME = "UTF-8";
+    private final Lock connectLock = new ReentrantLock();
+    private volatile boolean connected;
+    private FileConnectorMapper connectorMapper;
+    private WatchService watchService;
+    private Worker worker;
+    private Map<String, PipelineResolver> pipeline = new ConcurrentHashMap<>();
+    private final FileResolver fileResolver = new FileResolver();
+    private char separator;
+
+    @Override
+    public void start() {
+        try {
+            connectLock.lock();
+            if (connected) {
+                logger.error("FileExtractor is already started");
+                return;
+            }
+
+            connectorMapper = (FileConnectorMapper) connectorFactory.connect(connectorConfig);
+            final FileConfig config = connectorMapper.getConfig();
+            final String mapperCacheKey = connectorFactory.getConnector(connectorMapper).getConnectorMapperCacheKey(connectorConfig);
+            connected = true;
+
+            separator = config.getSeparator();
+            initPipeline(config.getFileDir(), config.getSchema());
+            watchService = FileSystems.getDefault().newWatchService();
+            Path p = Paths.get(config.getFileDir());
+            p.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
+
+            for (String fileName : pipeline.keySet()) {
+                parseEvent(fileName);
+            }
+            forceFlushEvent();
+
+            worker = new Worker();
+            worker.setName(new StringBuilder("file-parser-").append(mapperCacheKey).append("_").append(RandomUtil.nextInt(1, 100)).toString());
+            worker.setDaemon(false);
+            worker.start();
+        } catch (Exception e) {
+            logger.error("启动失败:{}", e.getMessage());
+            closePipelineAndWatch();
+            throw new ListenerException(e);
+        } finally {
+            connectLock.unlock();
+        }
+    }
+
+    private void initPipeline(String fileDir, String schema) throws IOException {
+        List<FileSchema> fileSchemas = JsonUtil.jsonToArray(schema, FileSchema.class);
+        Assert.notEmpty(fileSchemas, "found not file schema.");
+        for (FileSchema fileSchema : fileSchemas) {
+            String fileName = fileSchema.getFileName();
+            String file = fileDir.concat(fileName);
+            Assert.isTrue(new File(file).exists(), String.format("found not file '%s'", file));
+
+            final RandomAccessFile raf = new BufferedRandomAccessFile(file, "r");
+            final String filePosKey = getFilePosKey(fileName);
+            if (snapshot.containsKey(filePosKey)) {
+                raf.seek(NumberUtil.toLong(snapshot.get(filePosKey), 0L));
+            } else {
+                raf.seek(raf.length());
+            }
+
+            pipeline.put(fileName, new PipelineResolver(fileSchema.getFields(), raf));
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            closePipelineAndWatch();
+            connected = false;
+            if (null != worker && !worker.isInterrupted()) {
+                worker.interrupt();
+                worker = null;
+            }
+        } catch (Exception e) {
+            logger.error("关闭失败:{}", e.getMessage());
+        }
+    }
+
+    private void closePipelineAndWatch() {
+        try {
+            pipeline.values().forEach(pipelineResolver -> IOUtils.closeQuietly(pipelineResolver.raf));
+            pipeline.clear();
+
+            if (null != watchService) {
+                watchService.close();
+            }
+        } catch (IOException ex) {
+            logger.error(ex.getMessage());
+        }
+    }
+
+    private String getFilePosKey(String fileName) {
+        return POS_PREFIX.concat(fileName);
+    }
+
+    private void parseEvent(String fileName) throws IOException {
+        if (pipeline.containsKey(fileName)) {
+            PipelineResolver pipelineResolver = pipeline.get(fileName);
+            final RandomAccessFile raf = pipelineResolver.raf;
+
+            final String filePosKey = getFilePosKey(fileName);
+            String line;
+            while (null != (line = pipelineResolver.readLine())) {
+                snapshot.put(filePosKey, String.valueOf(raf.getFilePointer()));
+                List<Object> row = fileResolver.parseList(pipelineResolver.fields, separator, line);
+                changedEvent(new RowChangedEvent(fileName, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, row));
+            }
+
+        }
+    }
+
+    final class PipelineResolver {
+        List<Field> fields;
+        RandomAccessFile raf;
+        byte[] b;
+        long filePointer;
+
+        public PipelineResolver(List<Field> fields, RandomAccessFile raf) {
+            this.fields = fields;
+            this.raf = raf;
+        }
+
+        public String readLine() throws IOException {
+            this.filePointer = raf.getFilePointer();
+            if (filePointer >= raf.length()) {
+                b = new byte[0];
+                return null;
+            }
+            if (b == null || b.length == 0) {
+                b = new byte[(int) (raf.length() - filePointer)];
+            }
+            raf.read(b);
+
+            ByteArrayOutputStream stream = new ByteArrayOutputStream();
+            int read = 0;
+            for (int i = 0; i < b.length; i++) {
+                read++;
+                if (b[i] == '\n' || b[i] == '\r') {
+                    break;
+                }
+                stream.write(b[i]);
+            }
+            b = Arrays.copyOfRange(b, read, b.length);
+
+            raf.seek(this.filePointer + read);
+            byte[] _b = stream.toByteArray();
+            stream.close();
+            stream = null;
+            return new String(_b, CHARSET_NAME);
+        }
+    }
+
+    final class Worker extends Thread {
+
+        @Override
+        public void run() {
+            while (!isInterrupted() && connected) {
+                WatchKey watchKey = null;
+                try {
+                    watchKey = watchService.take();
+                    List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
+                    for (WatchEvent<?> event : watchEvents) {
+                        parseEvent(event.context().toString());
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                } finally {
+                    if (null != watchKey) {
+                        watchKey.reset();
+                    }
+                }
+            }
+        }
+
+    }
+
+}

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/TestDecodingMessageDecoder.java

@@ -3,7 +3,7 @@ package org.dbsyncer.listener.postgresql.decoder;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
-import org.dbsyncer.listener.postgresql.column.Lexer;
+import org.dbsyncer.common.column.Lexer;
 import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;

+ 83 - 0
dbsyncer-listener/src/main/test/FileWatchTest.java

@@ -0,0 +1,83 @@
+import org.apache.commons.io.IOUtils;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.*;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 21:32
+ */
+public class FileWatchTest {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private String path = "d:/test/";
+    private WatchService watchService;
+
+    @After
+    public void close() throws IOException {
+        if (null != watchService) {
+            watchService.close();
+        }
+    }
+
+    @Test
+    public void testFileWatch() throws IOException, InterruptedException {
+        watchService = FileSystems.getDefault().newWatchService();
+        Path p = Paths.get(path);
+        p.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
+
+        logger.info("启动监听");
+        long count = 0L;
+        while (count < 30) {
+            WatchKey watchKey = watchService.take();
+            List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
+            for (WatchEvent<?> event : watchEvents) {
+                Object context = event.context();
+                logger.info("[{}{}] 文件发生了[{}]事件", path, context, event.kind());
+            }
+            watchKey.reset();
+
+            TimeUnit.SECONDS.sleep(1);
+            count++;
+        }
+    }
+
+    @Test
+    public void testReadFile() {
+        read(path + "test.txt");
+    }
+
+    private void read(String file) {
+        RandomAccessFile raf = null;
+        byte[] buffer = new byte[4096];
+        try {
+            raf = new RandomAccessFile(file, "r");
+            raf.seek(raf.length());
+            logger.info("offset:{}", raf.getFilePointer());
+
+            while (true) {
+                int len = raf.read(buffer);
+                if (-1 != len) {
+                    logger.info("offset:{}, len:{}", raf.getFilePointer(), len);
+                    logger.info(new String(buffer, 1, len, "UTF-8"));
+                }
+                TimeUnit.SECONDS.sleep(1);
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage());
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        } finally {
+            IOUtils.closeQuietly(raf);
+        }
+    }
+
+}

+ 1 - 1
dbsyncer-listener/src/main/test/KafkaClientTest.java

@@ -1,12 +1,12 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.dbsyncer.common.util.JsonUtil;
-import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.KafkaConfig;
 import org.dbsyncer.connector.enums.KafkaFieldTypeEnum;
 import org.dbsyncer.connector.kafka.KafkaClient;
 import org.dbsyncer.connector.kafka.serialization.JsonToMapDeserializer;
 import org.dbsyncer.connector.kafka.serialization.MapToJsonSerializer;
+import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.util.KafkaUtil;
 import org.junit.After;
 import org.junit.Before;

+ 8 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -284,7 +284,14 @@ public class ParserFactory implements Parser {
             // 6、更新结果
             flush(task, writer);
 
-            // 7、更新分页数
+            // 7、判断尾页
+            if (data.size() < pageSize) {
+                params.clear();
+                logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
+                break;
+            }
+
+            // 8、更新分页数
             params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
         }
     }

+ 64 - 13
dbsyncer-web/src/main/resources/public/connector/addFile.html

@@ -11,12 +11,17 @@
             <input class="form-control" name="fileDir" type="text" maxlength="512" dbsyncer-valid="require"
                    th:value="${connector?.config?.fileDir}?:'/soft'"/>
         </div>
-        <div class="col-sm-6"></div>
+        <label class="col-sm-2 control-label">分割符 <strong
+                class="driverVerifcateRequired">*</strong></label>
+        <div class="col-sm-4">
+            <input class="form-control" name="separator" type="text" maxlength="512" dbsyncer-valid="require"
+                   th:value="${connector?.config?.separator}?:'|'"/>
+        </div>
     </div>
 
     <div class="form-group">
         <label class="col-sm-2 control-label">schema <i class="fa fa-question-circle fa_gray" aria-hidden="true"
-                                                        title="支持11种字段类型。name字段名, typeName类型名称, type类型编码, pk是否为主键"></i><strong
+                                                        title="支持10种字段类型。name字段名, typeName类型名称, type类型编码, pk是否为主键"></i><strong
                 class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-9">
             <textarea id="schema" name="schema" class="form-control dbsyncer_textarea_resize_none" maxlength="4096"
@@ -34,17 +39,63 @@
             const $text = $("#schema");
             if ("" == $text.text()) {
                 const data = [
-                    {"name": "id", "typeName": "String", "type": 12, "pk": true},
-                    {"name": "age", "typeName": "Integer", "type": 4},
-                    {"name": "count", "typeName": "Long", "type": -5},
-                    {"name": "type", "typeName": "Short", "type": 5},
-                    {"name": "money", "typeName": "Float", "type": 6},
-                    {"name": "score", "typeName": "Double", "type": 8},
-                    {"name": "status", "typeName": "Boolean", "type": -7},
-                    // {"name":"photo","typeName":"byte[]","type":-2},
-                    {"name": "create_date", "typeName": "Date", "type": 91},
-                    {"name": "time", "typeName": "Time", "type": 92},
-                    {"name": "update_time", "typeName": "Timestamp", "type": 93}
+                    {
+                        "fileName": "test.unl",
+                        "separator": "|",
+                        "fields": [
+                            {
+                                "name": "id",
+                                "typeName": "String",
+                                "type": 12,
+                                "pk": true
+                            },
+                            {
+                                "name": "age",
+                                "typeName": "Integer",
+                                "type": 4
+                            },
+                            {
+                                "name": "count",
+                                "typeName": "Long",
+                                "type": -5
+                            },
+                            {
+                                "name": "type",
+                                "typeName": "Short",
+                                "type": 5
+                            },
+                            {
+                                "name": "money",
+                                "typeName": "Float",
+                                "type": 6
+                            },
+                            {
+                                "name": "score",
+                                "typeName": "Double",
+                                "type": 8
+                            },
+                            {
+                                "name": "status",
+                                "typeName": "Boolean",
+                                "type": -7
+                            },
+                            {
+                                "name": "create_date",
+                                "typeName": "Date",
+                                "type": 91
+                            },
+                            {
+                                "name": "time",
+                                "typeName": "Time",
+                                "type": 92
+                            },
+                            {
+                                "name": "update_time",
+                                "typeName": "Timestamp",
+                                "type": 93
+                            }
+                        ]
+                    }
                 ];
                 $text.val(JSON.stringify(data, null, 4));
                 return;

+ 7 - 2
dbsyncer-web/src/main/resources/static/css/common.css

@@ -1,7 +1,12 @@
 @CHARSET "UTF-8";
 
-.dbsyncer_img {border-radius: 5%; border:1px solid rosybrown;}
-.dbsyncer_pointer{cursor:pointer;}
+.dbsyncer_img {
+    border-radius: 5%;
+}
+
+.dbsyncer_pointer {
+    cursor: pointer;
+}
 .dbsyncer_over_hidden{overflow: hidden; text-overflow:ellipsis; white-space: nowrap;}
 .dbsyncer_block {border:1px solid #ffffff;}
 .dbsyncer_block:hover { cursor:pointer; background-color: #EBEBEB; -moz-box-shadow:2px 2px 5px; -webkit-box-shadow:2px 2px 5px; box-shadow:2px 2px 5px; -webkit-transform:translateY(-3px); transform: translateY(-3px); transition: all 0.3s ease-in-out;}

+ 9 - 2
dbsyncer-web/src/main/resources/static/css/index/index.css

@@ -1,6 +1,13 @@
 @charset "UTF-8";
-.connectorList img{width: 65px;height: 55px; border-radius: 20%;}
-.connectorList .well-sign-red{color: #ff0000;}
+.connectorList img {
+    width: 65px;
+    height: 65px;
+    border-radius: 20%;
+}
+
+.connectorList .well-sign-red {
+    color: #ff0000;
+}
 .connectorList .well-sign-operation{color: #999999; border-radius: 50%; }
 .connectorList .well-sign-operation:hover {color: #333333;}
 .connectorList .dropdown {position: absolute; right:16px; top: 0px;}

BIN
dbsyncer-web/src/main/resources/static/img/File.png