Forráskód Böngészése

!61 merge
Merge pull request !61 from AE86/V_1.0.0_Beta

AE86 3 éve
szülő
commit
f0966e295b
100 módosított fájl, 1969 hozzáadás és 512 törlés
  1. 25 8
      README.md
  2. 1 1
      dbsyncer-biz/pom.xml
  3. 1 1
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/AbstractChecker.java
  4. 1 1
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/ConnectorChecker.java
  5. 46 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/FileConfigChecker.java
  6. 3 3
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/OracleConfigChecker.java
  7. 3 3
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java
  8. 1 1
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/TableGroupServiceImpl.java
  9. 1 1
      dbsyncer-cache/pom.xml
  10. 1 1
      dbsyncer-cluster/pom.xml
  11. 1 1
      dbsyncer-common/pom.xml
  12. 1 1
      dbsyncer-common/src/main/java/org/dbsyncer/common/column/Lexer.java
  13. 11 5
      dbsyncer-common/src/main/java/org/dbsyncer/common/model/Result.java
  14. 7 11
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/DateFormatUtil.java
  15. 8 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java
  16. 1 1
      dbsyncer-connector/pom.xml
  17. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java
  18. 2 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java
  19. 12 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java
  20. 2 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorMapper.java
  21. 3 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/CommandConfig.java
  22. 48 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/FileConfig.java
  23. 6 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java
  24. 2 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterBatchConfig.java
  25. 16 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  26. 2 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/Database.java
  27. 5 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BitSetter.java
  28. 11 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/DateSetter.java
  29. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/TimestampSetter.java
  30. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderInsert.java
  31. 3 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQuery.java
  32. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderUpdate.java
  33. 6 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/ConnectorEnum.java
  34. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/KafkaFieldTypeEnum.java
  35. 4 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java
  36. 206 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnector.java
  37. 81 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnectorMapper.java
  38. 103 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileResolver.java
  39. 37 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/column/ColumnValue.java
  40. 87 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/column/FileColumnValue.java
  41. 3 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java
  42. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Field.java
  43. 36 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/FileSchema.java
  44. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Filter.java
  45. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/MetaInfo.java
  46. 3 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/PageSql.java
  47. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Table.java
  48. 3 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java
  49. 3 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java
  50. 3 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java
  51. 5 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java
  52. 2 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLMysqlConnector.java
  53. 2 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java
  54. 2 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLPostgreSQLConnector.java
  55. 2 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java
  56. 3 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java
  57. 69 0
      dbsyncer-connector/src/main/test/FileConnectionTest.java
  58. 1 1
      dbsyncer-listener/pom.xml
  59. 3 3
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java
  60. 5 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java
  61. 331 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/BufferedRandomAccessFile.java
  62. 224 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java
  63. 147 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/AbstractMessageDecoder.java
  64. 2 1
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/MessageDecoder.java
  65. 18 23
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java
  66. 12 7
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/AbstractColumnValue.java
  67. 6 4
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/ColumnValue.java
  68. 0 158
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/ColumnValueResolver.java
  69. 58 58
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/PgColumnValue.java
  70. 116 117
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/PgOutputMessageDecoder.java
  71. 2 5
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/TestDecodingMessageDecoder.java
  72. 1 1
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ESQuartzExtractor.java
  73. 3 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/filter/DateFilter.java
  74. 83 0
      dbsyncer-listener/src/main/test/FileWatchTest.java
  75. 1 1
      dbsyncer-listener/src/main/test/KafkaClientTest.java
  76. 1 1
      dbsyncer-manager/pom.xml
  77. 2 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java
  78. 2 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java
  79. 2 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/FieldPicker.java
  80. 2 2
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  81. 1 1
      dbsyncer-monitor/pom.xml
  82. 1 1
      dbsyncer-parser/pom.xml
  83. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java
  84. 11 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  85. 3 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/DateHandler.java
  86. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/DateToChineseStandardTimeHandler.java
  87. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/TimestampToDateHandler.java
  88. 21 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  89. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java
  90. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractWriter.java
  91. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterRequest.java
  92. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/AbstractConfigModel.java
  93. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/BatchWriter.java
  94. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Connector.java
  95. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/FieldMapping.java
  96. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Mapping.java
  97. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java
  98. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/TableGroup.java
  99. 2 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/PickerUtil.java
  100. 1 1
      dbsyncer-plugin/pom.xml

+ 25 - 8
README.md

@@ -1,5 +1,5 @@
 ## 介绍
-DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServer、PostgreSQL、Elasticsearch(ES)、Kafka、SQL(Mysql/Oracle/SqlServer/PostgreSQL)等同步场景。支持上传插件自定义同步转换业务,提供监控全量和增量数据统计图、应用性能预警等。
+DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServer、PostgreSQL、Elasticsearch(ES)、Kafka、File、SQL(Mysql/Oracle/SqlServer/PostgreSQL)等同步场景。支持上传插件自定义同步转换业务,提供监控全量和增量数据统计图、应用性能预警等。
 
 > 特点
 * 组合驱动,自定义库同步到库组合,关系型数据库与非关系型之间组合,任意搭配表同步映射关系
@@ -53,6 +53,12 @@ DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServ
                 <td>✔</td>
                 <td>2.10-0.9.0.0以上</td>
             </tr>
+            <tr>
+                <td>File</td>
+                <td>✔</td>
+                <td>✔</td>
+                <td>*.txt, *.unl</td>
+            </tr>
             <tr>
                 <td>SQL</td>
                 <td>✔</td>
@@ -60,7 +66,7 @@ DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServ
                 <td>支持以上关系型数据库</td>
             </tr>
             <tr>
-                <td>最近计划</td>
+                <td>后期计划</td>
                 <td colspan="3">Redis</td>
             </tr>
         </tbody>
@@ -79,8 +85,7 @@ DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServ
 
 ##### Mysql
 * Dump Binlog二进制日志。Master同步Slave, 创建IO线程读取数据,写入relaylog,基于消息订阅捕获增量数据。
-* 配置
-> 修改my.ini文件
+> 修改my.ini文件,重启服务
 ```bash
 #服务唯一ID
 server_id=1
@@ -95,7 +100,6 @@ replicate-do-db=test
 
 ##### Oracle
 * CDN注册订阅。监听增删改事件,得到rowid,根据rowid执行SQL查询,得到变化数据。
-* 配置
 > 授予账号监听权限, 同时要求目标源表必须定义一个长度为18的varchar字段,通过接收rowid值实现增删改操作。
 ```roomsql
 grant change notification to 你的账号
@@ -103,16 +107,25 @@ grant change notification to 你的账号
 
 ##### SqlServer
 * SQL Server 2008提供了内建的方法变更数据捕获(Change Data Capture 即CDC)以实现异步跟踪用户表的数据修改。
-* 配置
 > 要求2008版本以上, 启动代理服务(Agent服务), 连接账号具有 sysadmin 固定服务器角色或 db_owner 固定数据库角色的成员身份。对于所有其他用户,具有源表SELECT 权限;如果已定义捕获实例的访问控制角色,则还要求具有该数据库角色的成员身份。
 
+##### PostgreSQL
+* 通过复制流技术监听增量事件,基于内置插件pgoutput、test_decoding实现解析wal日志
+> 修改postgresql.conf文件,重启服务
+``` shell
+wal_level=logical
+```
+
+##### File
+* 监听文件修改时间得到变化文件,通过文件偏移量读取最新数据
+> [监听文件实现方案](https://gitee.com/ghi/dbsyncer/issues/I55EP5)
+
 ##### ES
 * 定时获取增量数据。
-* 配置
 > 账号具有访问权限。
 
 ##### 日志
-> 建议Mysql和SqlServer都使用日志
+> 建议Mysql、SqlServer、PostgreSQL都使用日志
 
 ![日志](https://images.gitee.com/uploads/images/2021/0906/181036_1f9a9e78_376718.png "日志.png")
 
@@ -138,6 +151,10 @@ grant change notification to 你的账号
 ### 上传插件
 ![上传插件](https://images.gitee.com/uploads/images/2021/0806/232643_9b1f3f64_376718.png "上传插件.png")
 
+## 🎨设计
+#### 架构图
+<img src="http://assets.processon.com/chart_image/5d63b0bce4b0ac2b61877037.png" />
+
 ## 🔗开发依赖
 * [JDK - 1.8.0_40](https://www.oracle.com/java/technologies/jdk8-downloads.html)(推荐版本以上)
 * [Maven - 3.3.9](https://dlcdn.apache.org/maven/maven-3/)(推荐版本以上)

+ 1 - 1
dbsyncer-biz/pom.xml

@@ -5,7 +5,7 @@
 	<parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.1.7-Beta</version>
+        <version>1.1.8-Beta</version>
     </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-biz</artifactId>

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/AbstractChecker.java

@@ -4,7 +4,7 @@ import org.dbsyncer.biz.BizException;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.config.Filter;
+import org.dbsyncer.connector.model.Filter;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.model.AbstractConfigModel;
 import org.dbsyncer.parser.model.ConfigModel;

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/ConnectorChecker.java

@@ -6,7 +6,7 @@ import org.dbsyncer.biz.checker.ConnectorConfigChecker;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.logger.LogService;

+ 46 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/FileConfigChecker.java

@@ -0,0 +1,46 @@
+package org.dbsyncer.biz.checker.impl.connector;
+
+import org.dbsyncer.biz.checker.ConnectorConfigChecker;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.config.FileConfig;
+import org.dbsyncer.connector.model.FileSchema;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 0:04
+ */
+@Component
+public class FileConfigChecker implements ConnectorConfigChecker<FileConfig> {
+
+    @Override
+    public void modify(FileConfig fileConfig, Map<String, String> params) {
+        String fileDir = params.get("fileDir");
+        String schema = params.get("schema");
+        String separator = StringUtil.trim(params.get("separator"));
+        Assert.hasText(fileDir, "fileDir is empty.");
+        Assert.hasText(schema, "schema is empty.");
+        Assert.hasText(separator, "separator is empty.");
+
+        List<FileSchema> fileSchemas = JsonUtil.jsonToArray(schema, FileSchema.class);
+        Assert.notEmpty(fileSchemas, "found not file schema.");
+
+        fileDir += !StringUtil.endsWith(fileDir, File.separator) ? File.separator : "";
+        for (FileSchema fileSchema : fileSchemas) {
+            String file = fileDir.concat(fileSchema.getFileName());
+            Assert.isTrue(new File(file).exists(), String.format("found not file '%s'", file));
+        }
+
+        fileConfig.setFileDir(fileDir);
+        fileConfig.setSeparator(separator.charAt(0));
+        fileConfig.setSchema(schema);
+    }
+
+}

+ 3 - 3
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/OracleConfigChecker.java

@@ -3,9 +3,9 @@ package org.dbsyncer.biz.checker.impl.connector;
 import org.dbsyncer.biz.enums.OracleIncrementEnum;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.config.Field;
-import org.dbsyncer.connector.config.MetaInfo;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.Mapping;

+ 3 - 3
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -5,9 +5,9 @@ import org.dbsyncer.biz.checker.AbstractChecker;
 import org.dbsyncer.biz.checker.ConnectorConfigChecker;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.config.Field;
-import org.dbsyncer.connector.config.MetaInfo;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.enums.ModelEnum;
 import org.dbsyncer.parser.model.ConfigModel;

+ 1 - 1
dbsyncer-biz/src/main/java/org/dbsyncer/biz/impl/TableGroupServiceImpl.java

@@ -4,7 +4,7 @@ import org.dbsyncer.biz.TableGroupService;
 import org.dbsyncer.biz.checker.Checker;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.parser.logger.LogType;
 import org.dbsyncer.parser.model.Mapping;
 import org.dbsyncer.parser.model.TableGroup;

+ 1 - 1
dbsyncer-cache/pom.xml

@@ -4,7 +4,7 @@
 	<parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.1.7-Beta</version>
+        <version>1.1.8-Beta</version>
     </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-cache</artifactId>

+ 1 - 1
dbsyncer-cluster/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.1.7-Beta</version>
+        <version>1.1.8-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-cluster</artifactId>

+ 1 - 1
dbsyncer-common/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.1.7-Beta</version>
+        <version>1.1.8-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-common</artifactId>

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/Lexer.java → dbsyncer-common/src/main/java/org/dbsyncer/common/column/Lexer.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.listener.postgresql.column;
+package org.dbsyncer.common.column;
 
 /**
  * @author AE86

+ 11 - 5
dbsyncer-common/src/main/java/org/dbsyncer/common/model/Result.java

@@ -5,13 +5,19 @@ import java.util.List;
 
 public class Result<T> {
 
-    // 成功数据
-    private List<T> successData = new LinkedList<>();
+    /**
+     * 成功数据
+     */
+    private final List<T> successData = new LinkedList<>();
 
-    // 错误数据
-    private List<T> failData = new LinkedList<>();
+    /**
+     * 错误数据
+     */
+    private final List<T> failData = new LinkedList<>();
 
-    // 错误日志
+    /**
+     * 错误日志
+     */
     private StringBuffer error = new StringBuffer();
 
     private final Object LOCK = new Object();

+ 7 - 11
dbsyncer-common/src/main/java/org/dbsyncer/common/util/DateFormatUtil.java

@@ -1,10 +1,11 @@
 package org.dbsyncer.common.util;
 
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.time.*;
 import java.time.format.*;
 import java.time.temporal.ChronoField;
 import java.time.temporal.TemporalAccessor;
-import java.util.Date;
 
 public abstract class DateFormatUtil {
 
@@ -79,19 +80,17 @@ public abstract class DateFormatUtil {
     }
 
     public static Date stringToDate(String s) {
-        LocalDate localDate = LocalDate.parse(s, DATE_FORMATTER);
-        Instant instant = localDate.atStartOfDay().atZone(zoneId).toInstant();
-        return Date.from(instant);
-    }
-
-    public static LocalDate stringToLocalDate(String s) {
-        return LocalDate.parse(s, DATE_FORMATTER);
+        return Date.valueOf(LocalDate.parse(s, DATE_FORMATTER));
     }
 
     public static LocalTime stringToLocalTime(String s) {
         return LocalTime.parse(s, CHINESE_STANDARD_TIME_FORMATTER);
     }
 
+    public static Timestamp stringToTimestamp(String s) {
+        return Timestamp.valueOf(LocalDateTime.from(TS_FORMAT.parse(s)));
+    }
+
     public static OffsetTime timeWithTimeZone(String s) {
         return OffsetTime.parse(s, TIME_TZ_FORMAT).withOffsetSameInstant(ZoneOffset.UTC);
     }
@@ -106,7 +105,4 @@ public abstract class DateFormatUtil {
         return OffsetDateTime.from(parsedTimestamp).withOffsetSameInstant(ZoneOffset.UTC);
     }
 
-    public static Instant timestampToInstant(String s) {
-        return LocalDateTime.from(TS_FORMAT.parse(s)).toInstant(ZoneOffset.UTC);
-    }
 }

+ 8 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java

@@ -24,6 +24,14 @@ public abstract class StringUtil {
         return StringUtils.contains(seq, searchSeq);
     }
 
+    public static boolean endsWith(final CharSequence str, final CharSequence suffix) {
+        return StringUtils.endsWith(str, suffix);
+    }
+
+    public static String trim(String text) {
+        return StringUtils.trim(text);
+    }
+
     public static String replace(String text, String searchString, String replacement) {
         return StringUtils.replace(text, searchString, replacement);
     }

+ 1 - 1
dbsyncer-connector/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.1.7-Beta</version>
+        <version>1.1.8-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-connector</artifactId>

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.connector;
 
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 
 import java.util.List;

+ 2 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java

@@ -2,6 +2,8 @@ package org.dbsyncer.connector;
 
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.Table;
 
 import java.util.List;
 import java.util.Map;

+ 12 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -1,8 +1,11 @@
 package org.dbsyncer.connector;
 
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.enums.ConnectorEnum;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.Table;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.util.Assert;
 
@@ -116,8 +119,15 @@ public class ConnectorFactory implements DisposableBean {
         String sType = sourceCommandConfig.getType();
         String tType = targetCommandConfig.getType();
         Map<String, String> map = new HashMap<>();
-        map.putAll(getConnector(sType).getSourceCommand(sourceCommandConfig));
-        map.putAll(getConnector(tType).getTargetCommand(targetCommandConfig));
+        Map sCmd = getConnector(sType).getSourceCommand(sourceCommandConfig);
+        if (!CollectionUtils.isEmpty(sCmd)) {
+            map.putAll(sCmd);
+        }
+
+        Map tCmd = getConnector(tType).getTargetCommand(targetCommandConfig);
+        if (!CollectionUtils.isEmpty(sCmd)) {
+            map.putAll(tCmd);
+        }
         return map;
     }
 

+ 2 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorMapper.java

@@ -23,9 +23,7 @@ public interface ConnectorMapper<K, V> {
 
     K getConfig();
 
-    default V getConnection() throws Exception {
-        throw new ConnectorException("Unsupported method.");
-    }
+    V getConnection() throws Exception;
 
-    default void close() {}
+    void close();
 }

+ 3 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/CommandConfig.java

@@ -1,5 +1,8 @@
 package org.dbsyncer.connector.config;
 
+import org.dbsyncer.connector.model.Filter;
+import org.dbsyncer.connector.model.Table;
+
 import java.util.List;
 
 /**

+ 48 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/FileConfig.java

@@ -0,0 +1,48 @@
+package org.dbsyncer.connector.config;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/5 23:19
+ */
+public class FileConfig extends ConnectorConfig {
+
+    /**
+     * 文件目录
+     */
+    private String fileDir;
+
+    /**
+     * 分隔符
+     */
+    private char separator;
+
+    /**
+     * 文件描述信息
+     */
+    private String schema;
+
+    public String getFileDir() {
+        return fileDir;
+    }
+
+    public void setFileDir(String fileDir) {
+        this.fileDir = fileDir;
+    }
+
+    public char getSeparator() {
+        return separator;
+    }
+
+    public void setSeparator(char separator) {
+        this.separator = separator;
+    }
+
+    public String getSchema() {
+        return schema;
+    }
+
+    public void setSchema(String schema) {
+        this.schema = schema;
+    }
+}

+ 6 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java

@@ -1,22 +1,23 @@
 package org.dbsyncer.connector.config;
 
 import org.dbsyncer.connector.database.Database;
+import org.dbsyncer.connector.model.Field;
 
 import java.util.List;
 
 public class SqlBuilderConfig {
 
-    private Database    database;
+    private Database database;
     // 表名
-    private String      tableName;
+    private String tableName;
     // 主键
-    private String      pk;
+    private String pk;
     // 字段
     private List<Field> fields;
     // 过滤条件
-    private String      queryFilter;
+    private String queryFilter;
     // 引号
-    private String      quotation;
+    private String quotation;
 
     public SqlBuilderConfig(String name, String pk) {
         this.tableName = name;

+ 2 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterBatchConfig.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.connector.config;
 
+import org.dbsyncer.connector.model.Field;
+
 import java.util.List;
 import java.util.Map;
 

+ 16 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -13,6 +13,10 @@ import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.enums.SetterEnum;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
 import org.dbsyncer.connector.enums.TableTypeEnum;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.Filter;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.util.DatabaseUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,6 +71,10 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
 
     @Override
     public long getCount(DatabaseConnectorMapper connectorMapper, Map<String, String> command) {
+        if (CollectionUtils.isEmpty(command)) {
+            return 0L;
+        }
+
         // 1、获取select SQL
         String queryCountSql = command.get(ConnectorConstant.OPERTION_QUERY_COUNT);
         Assert.hasText(queryCountSql, "查询总数语句不能为空.");
@@ -85,8 +93,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         Collections.addAll(config.getArgs(), getPageArgs(config.getPageIndex(), config.getPageSize()));
 
         // 3、执行SQL
-        List<Map<String, Object>> list = connectorMapper.execute(
-                databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
+        List<Map<String, Object>> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
 
         // 4、返回结果集
         return new Result(list);
@@ -136,17 +143,19 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
                     })
             );
         } catch (Exception e) {
-            if (!config.isForceUpdate()) {
-                result.addFailData(data);
-                result.getError().append(e.getMessage());
-            }
+            result.addFailData(data);
+            result.getError().append(e.getMessage());
         }
 
         if (null != execute) {
             int batchSize = execute.length;
             for (int i = 0; i < batchSize; i++) {
                 if (execute[i] == 0) {
-                    forceUpdate(result, connectorMapper, config, pkField, data.get(i));
+                    if (config.isForceUpdate()) {
+                        forceUpdate(result, connectorMapper, config, pkField, data.get(i));
+                    } else {
+                        result.getFailData().add(data.get(i));
+                    }
                     continue;
                 }
                 result.getSuccessData().add(data.get(i));

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/Database.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.connector.database;
 
-import org.dbsyncer.connector.config.PageSqlConfig;
+import org.dbsyncer.connector.model.PageSql;
 
 public interface Database {
 
@@ -10,7 +10,7 @@ public interface Database {
      * @param config
      * @return
      */
-    String getPageSql(PageSqlConfig config);
+    String getPageSql(PageSql config);
 
     /**
      * 获取分页SQL

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BitSetter.java

@@ -21,6 +21,11 @@ public class BitSetter extends AbstractSetter<byte[]> {
             ps.setBytes(i, bitSet.toByteArray());
             return;
         }
+        if (val instanceof Integer) {
+            Integer integer = (Integer) val;
+            ps.setInt(i, integer);
+            return;
+        }
         throw new ConnectorException(String.format("BitSetter can not find type [%s], val [%s]", type, val));
     }
 

+ 11 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/DateSetter.java

@@ -1,10 +1,12 @@
 package org.dbsyncer.connector.database.setter;
 
+import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.database.AbstractSetter;
 
 import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.sql.Timestamp;
 
 public class DateSetter extends AbstractSetter<Date> {
 
@@ -13,4 +15,13 @@ public class DateSetter extends AbstractSetter<Date> {
         ps.setDate(i, val);
     }
 
+    @Override
+    protected void setIfValueTypeNotMatch(PreparedFieldMapper mapper, PreparedStatement ps, int i, int type, Object val) throws SQLException {
+        if (val instanceof Timestamp) {
+            Timestamp timestamp = (Timestamp) val;
+            ps.setDate(i, Date.valueOf(timestamp.toLocalDateTime().toLocalDate()));
+            return;
+        }
+        throw new ConnectorException(String.format("DateSetter can not find type [%s], val [%s]", type, val));
+    }
 }

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/TimestampSetter.java

@@ -2,7 +2,7 @@ package org.dbsyncer.connector.database.setter;
 
 import org.dbsyncer.connector.database.AbstractSetter;
 
-import java.util.Date;
+import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Timestamp;

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderInsert.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.connector.database.sqlbuilder;
 
-import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
 

+ 3 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQuery.java

@@ -1,8 +1,8 @@
 package org.dbsyncer.connector.database.sqlbuilder;
 
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.config.Field;
-import org.dbsyncer.connector.config.PageSqlConfig;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
 import org.dbsyncer.connector.database.Database;
@@ -20,7 +20,7 @@ public class SqlBuilderQuery extends AbstractSqlBuilder {
     public String buildSql(SqlBuilderConfig config) {
         // 分页语句
         Database database = config.getDatabase();
-        return database.getPageSql(new PageSqlConfig(buildQuerySql(config), config.getPk()));
+        return database.getPageSql(new PageSql(buildQuerySql(config), config.getPk()));
     }
 
     @Override

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderUpdate.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.connector.database.sqlbuilder;
 
-import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
 import org.slf4j.Logger;

+ 6 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/ConnectorEnum.java

@@ -5,8 +5,10 @@ import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.ESConfig;
+import org.dbsyncer.connector.config.FileConfig;
 import org.dbsyncer.connector.config.KafkaConfig;
 import org.dbsyncer.connector.es.ESConnector;
+import org.dbsyncer.connector.file.FileConnector;
 import org.dbsyncer.connector.kafka.KafkaConnector;
 import org.dbsyncer.connector.mysql.MysqlConnector;
 import org.dbsyncer.connector.oracle.OracleConnector;
@@ -50,6 +52,10 @@ public enum ConnectorEnum {
      * Kafka 连接器
      */
     KAFKA("Kafka", new KafkaConnector(), KafkaConfig.class),
+    /**
+     * File 连接器
+     */
+    FILE("File", new FileConnector(), FileConfig.class),
     /**
      * DqlMysql 连接器
      */

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/KafkaFieldTypeEnum.java

@@ -3,10 +3,10 @@ package org.dbsyncer.connector.enums;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
 
+import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
-import java.util.Date;
 
 /**
  * Kafka字段类型

+ 4 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -13,6 +13,10 @@ import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.ESFieldTypeEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.Filter;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.util.ESUtil;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;

+ 206 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnector.java

@@ -0,0 +1,206 @@
+package org.dbsyncer.connector.file;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.AbstractConnector;
+import org.dbsyncer.connector.Connector;
+import org.dbsyncer.connector.ConnectorException;
+import org.dbsyncer.connector.ConnectorMapper;
+import org.dbsyncer.connector.config.CommandConfig;
+import org.dbsyncer.connector.config.FileConfig;
+import org.dbsyncer.connector.config.ReaderConfig;
+import org.dbsyncer.connector.config.WriterBatchConfig;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.FileSchema;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/5 23:19
+ */
+public final class FileConnector extends AbstractConnector implements Connector<FileConnectorMapper, FileConfig> {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private static final String FILE_NAME = "fileName";
+    private static final String FILE_PATH = "filePath";
+    private final FileResolver fileResolver = new FileResolver();
+
+    @Override
+    public ConnectorMapper connect(FileConfig config) {
+        return new FileConnectorMapper(config);
+    }
+
+    @Override
+    public void disconnect(FileConnectorMapper connectorMapper) {
+
+    }
+
+    @Override
+    public boolean isAlive(FileConnectorMapper connectorMapper) {
+        String fileDir = connectorMapper.getConnection();
+        boolean alive = new File(fileDir).exists();
+        if (!alive) {
+            logger.warn("can not find fileDir:{}", fileDir);
+            return false;
+        }
+        for (FileSchema fileSchema : connectorMapper.getFileSchemaList()) {
+            String filePath = connectorMapper.getFilePath(fileSchema.getFileName());
+            if (!new File(filePath).exists()) {
+                logger.warn("can not find file:{}", filePath);
+                alive = false;
+            }
+        }
+        return alive;
+    }
+
+    @Override
+    public String getConnectorMapperCacheKey(FileConfig config) {
+        String localIP;
+        try {
+            localIP = InetAddress.getLocalHost().getHostAddress();
+        } catch (UnknownHostException e) {
+            logger.error(e.getMessage());
+            localIP = "127.0.0.1";
+        }
+        return String.format("%s-%s", localIP, config.getFileDir());
+    }
+
+    @Override
+    public List<Table> getTable(FileConnectorMapper connectorMapper) {
+        return connectorMapper.getFileSchemaList().stream().map(fileSchema -> new Table(fileSchema.getFileName())).collect(Collectors.toList());
+    }
+
+    @Override
+    public MetaInfo getMetaInfo(FileConnectorMapper connectorMapper, String tableName) {
+        FileSchema fileSchema = connectorMapper.getFileSchema(tableName);
+        return new MetaInfo().setColumn(fileSchema.getFields());
+    }
+
+    @Override
+    public long getCount(FileConnectorMapper connectorMapper, Map<String, String> command) {
+        AtomicLong count = new AtomicLong();
+        FileReader reader = null;
+        try {
+            reader = new FileReader(new File(command.get(FILE_PATH)));
+            LineIterator lineIterator = IOUtils.lineIterator(reader);
+            while (lineIterator.hasNext()) {
+                lineIterator.next();
+                count.addAndGet(1);
+            }
+        } catch (IOException e) {
+            throw new ConnectorException(e.getCause());
+        } finally {
+            IOUtils.closeQuietly(reader);
+        }
+        return count.get();
+    }
+
+    @Override
+    public Result reader(FileConnectorMapper connectorMapper, ReaderConfig config) {
+        List<Map<String, Object>> list = new ArrayList<>();
+        FileReader reader = null;
+        try {
+            FileConfig fileConfig = connectorMapper.getConfig();
+            FileSchema fileSchema = connectorMapper.getFileSchema(config.getCommand().get(FILE_NAME));
+            final List<Field> fields = fileSchema.getFields();
+            Assert.notEmpty(fields, "The fields of file schema is empty.");
+            final char separator = fileConfig.getSeparator();
+
+            reader = new FileReader(new File(config.getCommand().get(FILE_PATH)));
+            LineIterator lineIterator = IOUtils.lineIterator(reader);
+            int from = (config.getPageIndex() - 1) * config.getPageSize();
+            int to = from + config.getPageSize();
+            AtomicLong count = new AtomicLong();
+            while (lineIterator.hasNext()) {
+                if (count.get() >= from && count.get() < to) {
+                    list.add(fileResolver.parseMap(fields, separator, lineIterator.next()));
+                } else {
+                    lineIterator.next();
+                }
+                count.addAndGet(1);
+                if (count.get() >= to) {
+                    break;
+                }
+            }
+        } catch (IOException e) {
+            throw new ConnectorException(e.getCause());
+        } finally {
+            IOUtils.closeQuietly(reader);
+        }
+        return new Result(list);
+    }
+
+    @Override
+    public Result writer(FileConnectorMapper connectorMapper, WriterBatchConfig config) {
+        List<Map> data = config.getData();
+        if (CollectionUtils.isEmpty(data)) {
+            logger.error("writer data can not be empty.");
+            throw new ConnectorException("writer data can not be empty.");
+        }
+
+        final List<Field> fields = config.getFields();
+        final String separator = new String(new char[] {connectorMapper.getConfig().getSeparator()});
+
+        Result result = new Result();
+        OutputStream output = null;
+        try {
+            final String filePath = connectorMapper.getFilePath(config.getCommand().get(FILE_NAME));
+            output = new FileOutputStream(filePath, true);
+            List<String> lines = data.stream().map(row -> {
+                List<String> array = new ArrayList<>();
+                fields.forEach(field -> {
+                    Object o = row.get(field.getName());
+                    array.add(null != o ? String.valueOf(o) : "");
+                });
+                return StringUtil.join(array.toArray(), separator);
+            }).collect(Collectors.toList());
+            IOUtils.writeLines(lines, System.lineSeparator(), output, "UTF-8");
+        } catch (Exception e) {
+            result.addFailData(data);
+            result.getError().append(e.getMessage()).append(System.lineSeparator());
+            logger.error(e.getMessage());
+        } finally {
+            IOUtils.closeQuietly(output);
+        }
+        return result;
+    }
+
+    @Override
+    public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
+        Map<String, String> command = new HashMap<>();
+        FileConfig fileConfig = (FileConfig) commandConfig.getConnectorConfig();
+        final String fileDir = fileConfig.getFileDir();
+        StringBuilder file = new StringBuilder(fileDir);
+        if (!StringUtil.endsWith(fileDir, File.separator)) {
+            file.append(File.separator);
+        }
+        file.append(commandConfig.getTable().getName());
+        command.put(FILE_PATH, file.toString());
+        command.put(FILE_NAME, commandConfig.getTable().getName());
+        return command;
+    }
+
+    @Override
+    public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
+        Map<String, String> command = new HashMap<>();
+        command.put(FILE_NAME, commandConfig.getTable().getName());
+        return command;
+    }
+
+}

+ 81 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileConnectorMapper.java

@@ -0,0 +1,81 @@
+package org.dbsyncer.connector.file;
+
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.connector.ConnectorMapper;
+import org.dbsyncer.connector.config.FileConfig;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.FileSchema;
+import org.springframework.util.Assert;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/5 23:19
+ */
+public final class FileConnectorMapper implements ConnectorMapper<FileConfig, String> {
+
+    private FileConfig config;
+    private List<FileSchema> fileSchemaList;
+    private Map<String, FileResolver> fileSchemaMap = new ConcurrentHashMap<>();
+
+    public FileConnectorMapper(FileConfig config) {
+        this.config = config;
+        fileSchemaList = JsonUtil.jsonToArray(config.getSchema(), FileSchema.class);
+        Assert.notEmpty(fileSchemaList, "The schema is empty.");
+        for (FileSchema fileSchema : fileSchemaList) {
+            final List<Field> fields = fileSchema.getFields();
+            Assert.notEmpty(fields, "The fields of file schema is empty.");
+
+            if (!fileSchemaMap.containsKey(fileSchema.getFileName())) {
+                fileSchemaMap.put(fileSchema.getFileName(), new FileResolver(fileSchema));
+            }
+        }
+    }
+
+    public FileConfig getConfig() {
+        return config;
+    }
+
+    @Override
+    public String getConnection() {
+        return config.getFileDir();
+    }
+
+    @Override
+    public void close() {
+        fileSchemaMap.clear();
+    }
+
+    public List<FileSchema> getFileSchemaList() {
+        return fileSchemaList;
+    }
+
+    public FileSchema getFileSchema(String tableName) {
+        FileResolver fileResolver = fileSchemaMap.get(tableName);
+        Assert.notNull(fileResolver, String.format("can not find fileSchema by tableName '%s'", tableName));
+        return fileResolver.fileSchema;
+    }
+
+    public String getFilePath(String tableName) {
+        FileResolver fileResolver = fileSchemaMap.get(tableName);
+        Assert.notNull(fileResolver, String.format("can not find fileSchema by tableName '%s'", tableName));
+        return fileResolver.filePath;
+    }
+
+    class FileResolver {
+        FileSchema fileSchema;
+        String filePath;
+
+        public FileResolver(FileSchema fileSchema) {
+            this.fileSchema = fileSchema;
+            this.filePath = config.getFileDir().concat(fileSchema.getFileName());
+            File file = new File(filePath);
+            Assert.isTrue(file.exists(), String.format("found not file '%s'", filePath));
+        }
+    }
+}

+ 103 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/FileResolver.java

@@ -0,0 +1,103 @@
+package org.dbsyncer.connector.file;
+
+import org.dbsyncer.common.column.Lexer;
+import org.dbsyncer.connector.file.column.ColumnValue;
+import org.dbsyncer.connector.file.column.FileColumnValue;
+import org.dbsyncer.connector.model.Field;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 15:46
+ */
+public class FileResolver {
+
+    private ColumnValue value = new FileColumnValue();
+
+    public Map<String, Object> parseMap(List<Field> fields, char separator, String line) {
+        Map<String, Object> row = new LinkedHashMap<>();
+        parse(fields, separator, line, (key, value) -> row.put(key, value));
+        return row;
+    }
+
+    public List<Object> parseList(List<Field> fields, char separator, String line) {
+        List<Object> data = new ArrayList<>();
+        parse(fields, separator, line, (key, value) -> data.add(value));
+        return data;
+    }
+
+    /**
+     * Resolve the value of a {@link ColumnValue}.
+     *
+     * @param typeName
+     * @param columnValue
+     * @return
+     */
+    private Object resolveValue(String typeName, String columnValue) {
+        value.setValue(columnValue);
+
+        if (value.isNull()) {
+            return null;
+        }
+
+        switch (typeName) {
+            case "string":
+                return value.asString();
+
+            case "integer":
+                return value.asInteger();
+
+            case "date":
+                return value.asDate();
+
+            case "timestamp":
+                return value.asTimestamp();
+
+            case "boolean":
+                return value.asBoolean();
+
+            case "long":
+                return value.asLong();
+
+            case "float":
+                return value.asFloat();
+
+            case "double":
+                return value.asDouble();
+
+            case "time":
+                return value.asTime();
+
+            case "bytea":
+                return value.asByteArray();
+
+            default:
+                return null;
+        }
+
+    }
+
+    private void parse(List<Field> fields, char separator, String line, ResultSetMapper mapper) {
+        int fieldSize = fields.size();
+        int i = 0;
+        Lexer lexer = new Lexer(line);
+        while (i < fieldSize) {
+            if (lexer.hasNext()) {
+                mapper.apply(fields.get(i).getName(), resolveValue(fields.get(i).getTypeName(), lexer.nextToken(separator)));
+            } else {
+                mapper.apply(fields.get(i).getName(), null);
+            }
+            i++;
+        }
+    }
+
+    private interface ResultSetMapper {
+        void apply(String key, Object value);
+    }
+
+}

+ 37 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/column/ColumnValue.java

@@ -0,0 +1,37 @@
+package org.dbsyncer.connector.file.column;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/5 22:39
+ */
+public interface ColumnValue {
+
+    void setValue(String value);
+
+    boolean isNull();
+
+    String asString();
+
+    Boolean asBoolean();
+
+    Integer asInteger();
+
+    Long asLong();
+
+    Float asFloat();
+
+    Double asDouble();
+
+    Date asDate();
+
+    Timestamp asTimestamp();
+
+    Object asTime();
+
+    byte[] asByteArray();
+
+}

+ 87 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/file/column/FileColumnValue.java

@@ -0,0 +1,87 @@
+package org.dbsyncer.connector.file.column;
+
+import org.dbsyncer.common.util.DateFormatUtil;
+import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.common.util.StringUtil;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 15:48
+ */
+public class FileColumnValue implements ColumnValue {
+
+    private String value;
+
+    @Override
+    public void setValue(String value) {
+        this.value = value;
+    }
+
+    @Override
+    public boolean isNull() {
+        return value == null;
+    }
+
+    @Override
+    public String asString() {
+        return value;
+    }
+
+    @Override
+    public Boolean asBoolean() {
+        return "true".equalsIgnoreCase(value);
+    }
+
+    @Override
+    public Integer asInteger() {
+        return Integer.valueOf(value);
+    }
+
+    @Override
+    public Long asLong() {
+        return NumberUtil.toLong(value);
+    }
+
+    @Override
+    public Float asFloat() {
+        return Float.valueOf(value);
+    }
+
+    @Override
+    public Double asDouble() {
+        return Double.valueOf(value);
+    }
+
+    @Override
+    public Date asDate() {
+        return DateFormatUtil.stringToDate(asString());
+    }
+
+    @Override
+    public Timestamp asTimestamp() {
+        try {
+            if (NumberUtil.isCreatable(value)) {
+                return new Timestamp(asLong());
+            }
+
+            return DateFormatUtil.stringToTimestamp(value);
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    @Override
+    public Object asTime() {
+        return asString();
+    }
+
+    @Override
+    public byte[] asByteArray() {
+        return StringUtil.hexStringToByteArray(value.substring(2));
+    }
+}

+ 3 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -8,6 +8,9 @@ import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/Field.java → dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Field.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.connector.config;
+package org.dbsyncer.connector.model;
 
 import org.dbsyncer.common.util.JsonUtil;
 

+ 36 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/FileSchema.java

@@ -0,0 +1,36 @@
+package org.dbsyncer.connector.model;
+
+import java.util.List;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 0:04
+ */
+public class FileSchema {
+
+    /**
+     * 文件名
+     */
+    private String fileName;
+    /**
+     * 字段信息
+     */
+    private List<Field> fields;
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public void setFileName(String fileName) {
+        this.fileName = fileName;
+    }
+
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    public void setFields(List<Field> fields) {
+        this.fields = fields;
+    }
+}

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/Filter.java → dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Filter.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.connector.config;
+package org.dbsyncer.connector.model;
 
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/MetaInfo.java → dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/MetaInfo.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.connector.config;
+package org.dbsyncer.connector.model;
 
 import java.util.List;
 

+ 3 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/PageSqlConfig.java → dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/PageSql.java

@@ -1,12 +1,12 @@
-package org.dbsyncer.connector.config;
+package org.dbsyncer.connector.model;
 
-public class PageSqlConfig {
+public class PageSql {
 
     private String querySql;
 
     private String pk;
 
-    public PageSqlConfig(String querySql, String pk) {
+    public PageSql(String querySql, String pk) {
         this.querySql = querySql;
         this.pk = pk;
     }

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/Table.java → dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Table.java

@@ -1,4 +1,4 @@
-package org.dbsyncer.connector.config;
+package org.dbsyncer.connector.model;
 
 import org.dbsyncer.connector.enums.TableTypeEnum;
 

+ 3 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.connector.mysql;
 
-import org.dbsyncer.connector.config.PageSqlConfig;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
@@ -11,7 +11,7 @@ import java.util.List;
 public final class MysqlConnector extends AbstractDatabaseConnector {
 
     @Override
-    public String getPageSql(PageSqlConfig config) {
+    public String getPageSql(PageSql config) {
         return config.getQuerySql() + DatabaseConstant.MYSQL_PAGE_SQL;
     }
 

+ 3 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -1,8 +1,8 @@
 package org.dbsyncer.connector.oracle;
 
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.connector.config.PageSqlConfig;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
@@ -25,7 +25,7 @@ public final class OracleConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public String getPageSql(PageSqlConfig config) {
+    public String getPageSql(PageSql config) {
         return DatabaseConstant.ORACLE_PAGE_SQL_START + config.getQuerySql() + DatabaseConstant.ORACLE_PAGE_SQL_END;
     }
 

+ 3 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -2,8 +2,8 @@ package org.dbsyncer.connector.postgresql;
 
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.PageSqlConfig;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
@@ -33,7 +33,7 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public String getPageSql(PageSqlConfig config) {
+    public String getPageSql(PageSql config) {
         return config.getQuerySql() + DatabaseConstant.POSTGRESQL_PAGE_SQL;
     }
 

+ 5 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java

@@ -6,6 +6,10 @@ import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
+import org.dbsyncer.connector.model.Filter;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.model.Table;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -66,7 +70,7 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
         }
         String quotation = buildSqlWithQuotation();
         String pk = findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
-        map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(new PageSqlConfig(querySql, pk)));
+        map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(new PageSql(querySql, pk)));
 
         // 获取查询总数SQL
         StringBuilder queryCount = new StringBuilder();

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLMysqlConnector.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.connector.sql;
 
 import org.dbsyncer.connector.config.CommandConfig;
-import org.dbsyncer.connector.config.PageSqlConfig;
+import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 
 import java.util.Map;
@@ -9,7 +9,7 @@ import java.util.Map;
 public final class DQLMysqlConnector extends AbstractDQLConnector {
 
     @Override
-    public String getPageSql(PageSqlConfig config) {
+    public String getPageSql(PageSql config) {
         return config.getQuerySql() + DatabaseConstant.MYSQL_PAGE_SQL;
     }
 

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java

@@ -1,12 +1,12 @@
 package org.dbsyncer.connector.sql;
 
-import org.dbsyncer.connector.config.PageSqlConfig;
+import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 
 public final class DQLOracleConnector extends AbstractDQLConnector {
 
     @Override
-    public String getPageSql(PageSqlConfig config) {
+    public String getPageSql(PageSql config) {
         return DatabaseConstant.ORACLE_PAGE_SQL_START + config.getQuerySql() + DatabaseConstant.ORACLE_PAGE_SQL_END;
     }
 

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLPostgreSQLConnector.java

@@ -1,12 +1,12 @@
 package org.dbsyncer.connector.sql;
 
-import org.dbsyncer.connector.config.PageSqlConfig;
+import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 
 public final class DQLPostgreSQLConnector extends AbstractDQLConnector {
 
     @Override
-    public String getPageSql(PageSqlConfig config) {
+    public String getPageSql(PageSql config) {
         return config.getQuerySql() + DatabaseConstant.POSTGRESQL_PAGE_SQL;
     }
 

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java

@@ -2,7 +2,7 @@ package org.dbsyncer.connector.sql;
 
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
-import org.dbsyncer.connector.config.PageSqlConfig;
+import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -12,7 +12,7 @@ public final class DQLSqlServerConnector extends AbstractDQLConnector {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Override
-    public String getPageSql(PageSqlConfig config) {
+    public String getPageSql(PageSql config) {
         if (StringUtil.isBlank(config.getPk())) {
             logger.error("Table primary key can not be empty.");
             throw new ConnectorException("Table primary key can not be empty.");

+ 3 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -4,8 +4,8 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.PageSqlConfig;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
@@ -28,7 +28,7 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public String getPageSql(PageSqlConfig config) {
+    public String getPageSql(PageSql config) {
         if (StringUtil.isBlank(config.getPk())) {
             logger.error("Table primary key can not be empty.");
             throw new ConnectorException("Table primary key can not be empty.");

+ 69 - 0
dbsyncer-connector/src/main/test/FileConnectionTest.java

@@ -0,0 +1,69 @@
+import org.apache.commons.io.IOUtils;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/11 20:19
+ */
+public class FileConnectionTest {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private CountDownLatch latch;
+
+    @Test
+    public void testConnection() throws InterruptedException, IOException {
+        File f = new File("D:\\test\\abc.txt");
+        RandomAccessFile file = new RandomAccessFile(f, "rw");
+
+        int count = 10;
+        latch = new CountDownLatch(count);
+        for (int i = 0; i < count; i++) {
+            Thread t = new Thread(new WritingThread(i, file.getChannel()));
+            t.start();
+        }
+        latch.await();
+        file.close();
+        InputStream fileR = new FileInputStream(f);
+        List<String> strings = IOUtils.readLines(fileR, Charset.defaultCharset());
+        strings.forEach(line -> logger.info("{}", line));
+        fileR.close();
+    }
+
+    class WritingThread implements Runnable {
+
+        private FileChannel channel;
+
+        private int id;
+
+        public WritingThread(int id, FileChannel channel) {
+            this.channel = channel;
+            this.id = id;
+        }
+
+        @Override
+        public void run() {
+            logger.info("Thread {} is Writing", id);
+            try {
+                for (int i = 1; i <= 5; i++) {
+                    String msg = String.format("%s, %s, %s\n", Thread.currentThread().getName(), id, i);
+                    this.channel.write(ByteBuffer.wrap(msg.getBytes()));
+                }
+            } catch (IOException e) {
+                logger.error(e.getMessage());
+            }
+            latch.countDown();
+        }
+    }
+
+}

+ 1 - 1
dbsyncer-listener/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.1.7-Beta</version>
+        <version>1.1.8-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-listener</artifactId>

+ 3 - 3
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -31,7 +31,7 @@ public abstract class AbstractExtractor implements Extractor {
     protected ListenerConfig listenerConfig;
     protected Map<String, String> snapshot;
     protected Set<String> filterTable;
-    private List<Event> watcher = new CopyOnWriteArrayList<>();
+    private final List<Event> watcher = new CopyOnWriteArrayList<>();
 
     @Override
     public void addListener(Event event) {
@@ -48,7 +48,7 @@ public abstract class AbstractExtractor implements Extractor {
     @Override
     public void changedEvent(RowChangedEvent event) {
         if(null != event){
-            taskExecutor.execute(() -> watcher.forEach(w -> w.changedEvent(event)));
+            watcher.forEach(w -> w.changedEvent(event));
         }
     }
 
@@ -77,7 +77,7 @@ public abstract class AbstractExtractor implements Extractor {
         try {
             TimeUnit.MILLISECONDS.sleep(timeout);
         } catch (InterruptedException e) {
-            logger.error(e.getMessage());
+            logger.info(e.getMessage());
         }
     }
 

+ 5 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java

@@ -3,6 +3,7 @@ package org.dbsyncer.listener.enums;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.listener.ListenerException;
+import org.dbsyncer.listener.file.FileExtractor;
 import org.dbsyncer.listener.kafka.KafkaExtractor;
 import org.dbsyncer.listener.mysql.MysqlExtractor;
 import org.dbsyncer.listener.oracle.OracleExtractor;
@@ -40,6 +41,10 @@ public enum ListenerEnum {
      * log_Kafka
      */
     LOG_KAFKA(ListenerTypeEnum.LOG.getType() + ConnectorEnum.KAFKA.getType(), KafkaExtractor.class),
+    /**
+     * log_File
+     */
+    LOG_FILE(ListenerTypeEnum.LOG.getType() + ConnectorEnum.FILE.getType(), FileExtractor.class),
     /**
      * timing_Mysql
      */

+ 331 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/BufferedRandomAccessFile.java

@@ -0,0 +1,331 @@
+package org.dbsyncer.listener.file;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/7 22:27
+ */
+public class BufferedRandomAccessFile extends RandomAccessFile {
+    static final int LogBuffSz_ = 16; // 64K buffer
+    public static final int BuffSz_ = (1 << LogBuffSz_);
+    static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+
+    private String path_;
+
+    /*
+     * This implementation is based on the buffer implementation in Modula-3's
+     * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+     */
+    private boolean dirty_; // true iff unflushed bytes exist
+    private boolean syncNeeded_; // dirty_ can be cleared by e.g. seek, so track sync separately
+    private long curr_; // current position in file
+    private long lo_, hi_; // bounds on characters in "buff"
+    private byte[] buff_; // local buffer
+    private long maxHi_; // this.lo + this.buff.length
+    private boolean hitEOF_; // buffer contains last file block?
+    private long diskPos_; // disk position
+
+    /*
+     * To describe the above fields, we introduce the following abstractions for
+     * the file "f":
+     *
+     * len(f) the length of the file curr(f) the current position in the file
+     * c(f) the abstract contents of the file disk(f) the contents of f's
+     * backing disk file closed(f) true iff the file is closed
+     *
+     * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+     * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+     * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+     * operation has the effect of making "disk(f)" identical to "c(f)".
+     *
+     * A file is said to be *valid* if the following conditions hold:
+     *
+     * V1. The "closed" and "curr" fields are correct:
+     *
+     * f.closed == closed(f) f.curr == curr(f)
+     *
+     * V2. The current position is either contained in the buffer, or just past
+     * the buffer:
+     *
+     * f.lo <= f.curr <= f.hi
+     *
+     * V3. Any (possibly) unflushed characters are stored in "f.buff":
+     *
+     * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+     *
+     * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+     *
+     * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+     * disk(f)[i])
+     *
+     * V5. "f.dirty" is true iff the buffer contains bytes that should be
+     * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+     *
+     * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+     *
+     * V6. this.maxHi == this.lo + this.buff.length
+     *
+     * Note that "f.buff" can be "null" in a valid file, since the range of
+     * characters in V3 is empty when "f.lo == f.curr".
+     *
+     * A file is said to be *ready* if the buffer contains the current position,
+     * i.e., when:
+     *
+     * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+     *
+     * When a file is ready, reading or writing a single byte can be performed
+     * by reading or writing the in-memory buffer without performing a disk
+     * operation.
+     */
+
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
+     * in mode <code>mode</code>, which should be "r" for reading only, or
+     * "rw" for reading and writing.
+     */
+    public BufferedRandomAccessFile(File file, String mode) throws IOException {
+        this(file, mode, 0);
+    }
+
+    public BufferedRandomAccessFile(File file, String mode, int size) throws IOException {
+        super(file, mode);
+        path_ = file.getAbsolutePath();
+        this.init(size);
+    }
+
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on the file named
+     * <code>name</code> in mode <code>mode</code>, which should be "r" for
+     * reading only, or "rw" for reading and writing.
+     */
+    public BufferedRandomAccessFile(String name, String mode) throws IOException {
+        this(name, mode, 0);
+    }
+
+    public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException {
+        super(name, mode);
+        path_ = name;
+        this.init(size);
+    }
+
+    private void init(int size) {
+        this.dirty_ = false;
+        this.lo_ = this.curr_ = this.hi_ = 0;
+        this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+        this.maxHi_ = (long) BuffSz_;
+        this.hitEOF_ = false;
+        this.diskPos_ = 0L;
+    }
+
+    public String getPath() {
+        return path_;
+    }
+
+    public void sync() throws IOException {
+        if (syncNeeded_) {
+            flush();
+            getChannel().force(true);
+            syncNeeded_ = false;
+        }
+    }
+
+//      public boolean isEOF() throws IOException
+//      {
+//          assert getFilePointer() <= length();
+//          return getFilePointer() == length();
+//      }
+
+    public void close() throws IOException {
+        this.flush();
+        this.buff_ = null;
+        super.close();
+    }
+
+    /**
+     * Flush any bytes in the file's buffer that have not yet been written to
+     * disk. If the file was created read-only, this method is a no-op.
+     */
+    public void flush() throws IOException {
+        this.flushBuffer();
+    }
+
+    /* Flush any dirty bytes in the buffer to disk. */
+    private void flushBuffer() throws IOException {
+        if (this.dirty_) {
+            if (this.diskPos_ != this.lo_)
+                super.seek(this.lo_);
+            int len = (int) (this.curr_ - this.lo_);
+            super.write(this.buff_, 0, len);
+            this.diskPos_ = this.curr_;
+            this.dirty_ = false;
+        }
+    }
+
+    /*
+     * Read at most "this.buff.length" bytes into "this.buff", returning the
+     * number of bytes read. If the return result is less than
+     * "this.buff.length", then EOF was read.
+     */
+    private int fillBuffer() throws IOException {
+        int cnt = 0;
+        int rem = this.buff_.length;
+        while (rem > 0) {
+            int n = super.read(this.buff_, cnt, rem);
+            if (n < 0)
+                break;
+            cnt += n;
+            rem -= n;
+        }
+        if ((cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length))) {
+            // make sure buffer that wasn't read is initialized with -1
+            Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
+        }
+        this.diskPos_ += cnt;
+        return cnt;
+    }
+
+    /*
+     * This method positions <code>this.curr</code> at position <code>pos</code>.
+     * If <code>pos</code> does not fall in the current buffer, it flushes the
+     * current buffer and loads the correct one.<p>
+     *
+     * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
+     * is at or past the end-of-file, which can only happen if the file was
+     * opened in read-only mode.
+     */
+    public void seek(long pos) throws IOException {
+        if (pos >= this.hi_ || pos < this.lo_) {
+            // seeking outside of current buffer -- flush and read
+            this.flushBuffer();
+            this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+            this.maxHi_ = this.lo_ + (long) this.buff_.length;
+            if (this.diskPos_ != this.lo_) {
+                super.seek(this.lo_);
+                this.diskPos_ = this.lo_;
+            }
+            int n = this.fillBuffer();
+            this.hi_ = this.lo_ + (long) n;
+        } else {
+            // seeking inside current buffer -- no read required
+            if (pos < this.curr_) {
+                // if seeking backwards, we must flush to maintain V4
+                this.flushBuffer();
+            }
+        }
+        this.curr_ = pos;
+    }
+
+    public long getFilePointer() {
+        return this.curr_;
+    }
+
+    public long length() throws IOException {
+        // max accounts for the case where we have written past the old file length, but not yet flushed our buffer
+        return Math.max(this.curr_, super.length());
+    }
+
+    public int read() throws IOException {
+        if (this.curr_ >= this.hi_) {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        byte res = this.buff_[(int) (this.curr_ - this.lo_)];
+        this.curr_++;
+        return ((int) res) & 0xFF; // convert byte -> int
+    }
+
+    public int read(byte[] b) throws IOException {
+        return this.read(b, 0, b.length);
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (this.curr_ >= this.hi_) {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(this.buff_, buffOff, b, off, len);
+        this.curr_ += len;
+        return len;
+    }
+
+    public void write(int b) throws IOException {
+        if (this.curr_ >= this.hi_) {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_) {
+                // at EOF -- bump "hi"
+                this.hi_++;
+            } else {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_) {
+                    // appending to EOF -- bump "hi"
+                    this.hi_++;
+                }
+            }
+        }
+        this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
+        this.curr_++;
+        this.dirty_ = true;
+        syncNeeded_ = true;
+    }
+
+    public void write(byte[] b) throws IOException {
+        this.write(b, 0, b.length);
+    }
+
+    public void write(byte[] b, int off, int len) throws IOException {
+        while (len > 0) {
+            int n = this.writeAtMost(b, off, len);
+            off += n;
+            len -= n;
+            this.dirty_ = true;
+            syncNeeded_ = true;
+        }
+    }
+
+    /*
+     * Write at most "len" bytes to "b" starting at position "off", and return
+     * the number of bytes written.
+     */
+    private int writeAtMost(byte[] b, int off, int len) throws IOException {
+        if (this.curr_ >= this.hi_) {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_) {
+                // at EOF -- bump "hi"
+                this.hi_ = this.maxHi_;
+            } else {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_) {
+                    // appending to EOF -- bump "hi"
+                    this.hi_ = this.maxHi_;
+                }
+            }
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(b, off, this.buff_, buffOff, len);
+        this.curr_ += len;
+        return len;
+    }
+}

+ 224 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/file/FileExtractor.java

@@ -0,0 +1,224 @@
+package org.dbsyncer.listener.file;
+
+import org.apache.commons.io.IOUtils;
+import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.common.util.JsonUtil;
+import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.common.util.RandomUtil;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.config.FileConfig;
+import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.file.FileConnectorMapper;
+import org.dbsyncer.connector.file.FileResolver;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.FileSchema;
+import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.ListenerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 21:42
+ */
+public class FileExtractor extends AbstractExtractor {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private static final String POS_PREFIX = "pos_";
+    private static final String CHARSET_NAME = "UTF-8";
+    private final Lock connectLock = new ReentrantLock();
+    private volatile boolean connected;
+    private FileConnectorMapper connectorMapper;
+    private WatchService watchService;
+    private Worker worker;
+    private Map<String, PipelineResolver> pipeline = new ConcurrentHashMap<>();
+    private final FileResolver fileResolver = new FileResolver();
+    private char separator;
+
+    @Override
+    public void start() {
+        try {
+            connectLock.lock();
+            if (connected) {
+                logger.error("FileExtractor is already started");
+                return;
+            }
+
+            connectorMapper = (FileConnectorMapper) connectorFactory.connect(connectorConfig);
+            final FileConfig config = connectorMapper.getConfig();
+            final String mapperCacheKey = connectorFactory.getConnector(connectorMapper).getConnectorMapperCacheKey(connectorConfig);
+            connected = true;
+
+            separator = config.getSeparator();
+            initPipeline(config.getFileDir());
+            watchService = FileSystems.getDefault().newWatchService();
+            Path p = Paths.get(config.getFileDir());
+            p.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
+
+            for (String fileName : pipeline.keySet()) {
+                parseEvent(fileName);
+            }
+            forceFlushEvent();
+
+            worker = new Worker();
+            worker.setName(new StringBuilder("file-parser-").append(mapperCacheKey).append("_").append(RandomUtil.nextInt(1, 100)).toString());
+            worker.setDaemon(false);
+            worker.start();
+        } catch (Exception e) {
+            logger.error("启动失败:{}", e.getMessage());
+            closePipelineAndWatch();
+            throw new ListenerException(e);
+        } finally {
+            connectLock.unlock();
+        }
+    }
+
+    private void initPipeline(String fileDir) throws IOException {
+        for (FileSchema fileSchema : connectorMapper.getFileSchemaList()) {
+            String fileName = fileSchema.getFileName();
+            String file = fileDir.concat(fileName);
+            Assert.isTrue(new File(file).exists(), String.format("found not file '%s'", file));
+
+            final RandomAccessFile raf = new BufferedRandomAccessFile(file, "r");
+            final String filePosKey = getFilePosKey(fileName);
+            if (snapshot.containsKey(filePosKey)) {
+                raf.seek(NumberUtil.toLong(snapshot.get(filePosKey), 0L));
+            } else {
+                raf.seek(raf.length());
+            }
+
+            pipeline.put(fileName, new PipelineResolver(fileSchema.getFields(), raf));
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            closePipelineAndWatch();
+            connected = false;
+            if (null != worker && !worker.isInterrupted()) {
+                worker.interrupt();
+                worker = null;
+            }
+        } catch (Exception e) {
+            logger.error("关闭失败:{}", e.getMessage());
+        }
+    }
+
+    private void closePipelineAndWatch() {
+        try {
+            pipeline.values().forEach(pipelineResolver -> IOUtils.closeQuietly(pipelineResolver.raf));
+            pipeline.clear();
+
+            if (null != watchService) {
+                watchService.close();
+            }
+        } catch (IOException ex) {
+            logger.error(ex.getMessage());
+        }
+    }
+
+    private String getFilePosKey(String fileName) {
+        return POS_PREFIX.concat(fileName);
+    }
+
+    private void parseEvent(String fileName) throws IOException {
+        if (pipeline.containsKey(fileName)) {
+            PipelineResolver pipelineResolver = pipeline.get(fileName);
+            final RandomAccessFile raf = pipelineResolver.raf;
+
+            final String filePosKey = getFilePosKey(fileName);
+            String line;
+            while (null != (line = pipelineResolver.readLine())) {
+                snapshot.put(filePosKey, String.valueOf(raf.getFilePointer()));
+                if (StringUtil.isNotBlank(line)) {
+                    List<Object> row = fileResolver.parseList(pipelineResolver.fields, separator, line);
+                    changedEvent(new RowChangedEvent(fileName, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, row));
+                }
+            }
+
+        }
+    }
+
+    final class PipelineResolver {
+        List<Field> fields;
+        RandomAccessFile raf;
+        byte[] b;
+        long filePointer;
+
+        public PipelineResolver(List<Field> fields, RandomAccessFile raf) {
+            this.fields = fields;
+            this.raf = raf;
+        }
+
+        public String readLine() throws IOException {
+            this.filePointer = raf.getFilePointer();
+            if (filePointer >= raf.length()) {
+                b = new byte[0];
+                return null;
+            }
+            if (b == null || b.length == 0) {
+                b = new byte[(int) (raf.length() - filePointer)];
+            }
+            raf.read(b);
+
+            ByteArrayOutputStream stream = new ByteArrayOutputStream();
+            int read = 0;
+            for (int i = 0; i < b.length; i++) {
+                read++;
+                if (b[i] == '\n' || b[i] == '\r') {
+                    break;
+                }
+                stream.write(b[i]);
+            }
+            b = Arrays.copyOfRange(b, read, b.length);
+
+            raf.seek(this.filePointer + read);
+            byte[] _b = stream.toByteArray();
+            stream.close();
+            stream = null;
+            return new String(_b, CHARSET_NAME);
+        }
+    }
+
+    final class Worker extends Thread {
+
+        @Override
+        public void run() {
+            while (!isInterrupted() && connected) {
+                WatchKey watchKey = null;
+                try {
+                    watchKey = watchService.take();
+                    List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
+                    for (WatchEvent<?> event : watchEvents) {
+                        parseEvent(event.context().toString());
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                } finally {
+                    if (null != watchKey) {
+                        watchKey.reset();
+                    }
+                }
+            }
+        }
+
+    }
+
+}

+ 147 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/AbstractMessageDecoder.java

@@ -1,8 +1,11 @@
 package org.dbsyncer.listener.postgresql;
 
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.listener.postgresql.column.ColumnValue;
+import org.dbsyncer.listener.postgresql.column.PgColumnValue;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
 import org.postgresql.replication.LogSequenceNumber;
+import org.postgresql.util.PGmoney;
 
 import java.nio.ByteBuffer;
 
@@ -15,6 +18,8 @@ public abstract class AbstractMessageDecoder implements MessageDecoder {
 
     protected DatabaseConfig config;
 
+    private ColumnValue value = new PgColumnValue();
+
     @Override
     public boolean skipMessage(ByteBuffer buffer, LogSequenceNumber startLsn, LogSequenceNumber lastReceiveLsn) {
         if (null == lastReceiveLsn || lastReceiveLsn.asLong() == 0 || startLsn.equals(lastReceiveLsn)) {
@@ -51,4 +56,146 @@ public abstract class AbstractMessageDecoder implements MessageDecoder {
     public void setConfig(DatabaseConfig config) {
         this.config = config;
     }
+
+    /**
+     * Resolve the value of a {@link ColumnValue}.
+     *
+     * @param typeName
+     * @param columnValue
+     * @return
+     */
+    protected Object resolveValue(String typeName, String columnValue) {
+        value.setValue(columnValue);
+
+        if (value.isNull()) {
+            // nulls are null
+            return null;
+        }
+
+        switch (typeName) {
+            // include all types from https://www.postgresql.org/docs/current/static/datatype.html#DATATYPE-TABLE
+            case "boolean":
+            case "bool":
+                return value.asBoolean();
+
+            case "integer":
+            case "int":
+            case "int4":
+            case "smallint":
+            case "int2":
+            case "smallserial":
+            case "serial":
+            case "serial2":
+            case "serial4":
+                return value.asInteger();
+
+            case "bigint":
+            case "bigserial":
+            case "int8":
+            case "oid":
+                return value.asLong();
+
+            case "real":
+            case "float4":
+                return value.asFloat();
+
+            case "double precision":
+            case "float8":
+                return value.asDouble();
+
+            case "numeric":
+            case "decimal":
+                return value.asDecimal();
+
+            case "character":
+            case "char":
+            case "character varying":
+            case "varchar":
+            case "bpchar":
+            case "text":
+            case "hstore":
+                return value.asString();
+
+            case "date":
+                return value.asDate();
+
+            case "timestamp with time zone":
+            case "timestamptz":
+                return value.asOffsetDateTimeAtUtc();
+
+            case "timestamp":
+            case "timestamp without time zone":
+                return value.asTimestamp();
+
+            case "time":
+                return value.asTime();
+
+            case "time without time zone":
+                return value.asLocalTime();
+
+            case "time with time zone":
+            case "timetz":
+                return value.asOffsetTimeUtc();
+
+            case "bytea":
+                return value.asByteArray();
+
+            // these are all PG-specific types and we use the JDBC representations
+            // note that, with the exception of point, no converters for these types are implemented yet,
+            // i.e. those values won't actually be propagated to the outbound message until that's the case
+            case "box":
+                return value.asBox();
+            case "circle":
+                return value.asCircle();
+            case "interval":
+                return value.asInterval();
+            case "line":
+                return value.asLine();
+            case "lseg":
+                return value.asLseg();
+            case "money":
+                final Object v = value.asMoney();
+                return (v instanceof PGmoney) ? ((PGmoney) v).val : v;
+            case "path":
+                return value.asPath();
+            case "point":
+                return value.asPoint();
+            case "polygon":
+                return value.asPolygon();
+
+            // PostGIS types are HexEWKB strings
+            // ValueConverter turns them into the correct types
+            case "geometry":
+            case "geography":
+            case "citext":
+            case "bit":
+            case "bit varying":
+            case "varbit":
+            case "json":
+            case "jsonb":
+            case "xml":
+            case "uuid":
+            case "tsrange":
+            case "tstzrange":
+            case "daterange":
+            case "inet":
+            case "cidr":
+            case "macaddr":
+            case "macaddr8":
+            case "int4range":
+            case "numrange":
+            case "int8range":
+                return value.asString();
+
+            // catch-all for other known/builtin PG types
+            case "pg_lsn":
+            case "tsquery":
+            case "tsvector":
+            case "txid_snapshot":
+                // catch-all for unknown (extension module/custom) types
+            default:
+                return null;
+        }
+
+    }
 }

+ 2 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/MessageDecoder.java

@@ -1,6 +1,7 @@
 package org.dbsyncer.listener.postgresql;
 
 import org.dbsyncer.common.event.RowChangedEvent;
+import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.postgresql.replication.LogSequenceNumber;
@@ -15,7 +16,7 @@ import java.nio.ByteBuffer;
  */
 public interface MessageDecoder {
 
-    default void postProcessBeforeInitialization(DatabaseConnectorMapper connectorMapper) {
+    default void postProcessBeforeInitialization(ConnectorFactory connectorFactory, DatabaseConnectorMapper connectorMapper) {
     }
 
     boolean skipMessage(ByteBuffer buffer, LogSequenceNumber startLsn, LogSequenceNumber lastReceiveLsn);

+ 18 - 23
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java

@@ -40,6 +40,7 @@ public class PostgreSQLExtractor extends AbstractExtractor {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     private static final String GET_SLOT = "select count(1) from pg_replication_slots where database = ? and slot_name = ? and plugin = ?";
+    private static final String GET_RESTART_LSN = "select restart_lsn from pg_replication_slots where database = ? and slot_name = ? and plugin = ?";
     private static final String GET_ROLE = "SELECT r.rolcanlogin AS login, r.rolreplication AS replication, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rds_superuser') AS BOOL) IS TRUE AS superuser, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rdsadmin') AS BOOL) IS TRUE AS admin, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rdsrepladmin') AS BOOL) IS TRUE AS rep_admin FROM pg_roles r WHERE r.rolname = current_user";
     private static final String GET_DATABASE = "SELECT current_database()";
     private static final String GET_WAL_LEVEL = "SHOW WAL_LEVEL";
@@ -57,6 +58,7 @@ public class PostgreSQLExtractor extends AbstractExtractor {
     private MessageDecoder messageDecoder;
     private Worker worker;
     private LogSequenceNumber startLsn;
+    private String database;
 
     @Override
     public void start() {
@@ -88,9 +90,10 @@ public class PostgreSQLExtractor extends AbstractExtractor {
                 throw new ListenerException(String.format("Postgres roles LOGIN and REPLICATION are not assigned to user: %s", config.getUsername()));
             }
 
+            database = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_DATABASE, String.class));
             messageDecoder = MessageDecoderEnum.getMessageDecoder(config.getProperty(PLUGIN_NAME));
             messageDecoder.setConfig(config);
-            messageDecoder.postProcessBeforeInitialization(connectorMapper);
+            messageDecoder.postProcessBeforeInitialization(connectorFactory, connectorMapper);
             dropSlotOnClose = BooleanUtil.toBoolean(config.getProperty(DROP_SLOT_ON_CLOSE, "true"));
 
             connect();
@@ -144,20 +147,7 @@ public class PostgreSQLExtractor extends AbstractExtractor {
         sleepInMills(10L);
     }
 
-    private LogSequenceNumber readLastLsn() throws SQLException {
-        if (!snapshot.containsKey(LSN_POSITION)) {
-            LogSequenceNumber lsn = currentXLogLocation();
-            if (null == lsn || lsn.asLong() == 0) {
-                throw new ListenerException("No maximum LSN recorded in the database");
-            }
-            snapshot.put(LSN_POSITION, lsn.asString());
-        }
-
-        return LogSequenceNumber.valueOf(snapshot.get(LSN_POSITION));
-    }
-
     private void createReplicationStream(PGConnection pgConnection) throws SQLException {
-        this.startLsn = readLastLsn();
         ChainedLogicalStreamBuilder streamBuilder = pgConnection
                 .getReplicationAPI()
                 .replicationStream()
@@ -171,7 +161,6 @@ public class PostgreSQLExtractor extends AbstractExtractor {
     }
 
     private void createReplicationSlot(PGConnection pgConnection) throws SQLException {
-        String database = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_DATABASE, String.class));
         String slotName = messageDecoder.getSlotName();
         String plugin = messageDecoder.getOutputPlugin();
         boolean existSlot = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_SLOT, new Object[]{database, slotName, plugin}, Integer.class) > 0);
@@ -182,7 +171,20 @@ public class PostgreSQLExtractor extends AbstractExtractor {
                     .withSlotName(slotName)
                     .withOutputPlugin(plugin)
                     .make();
+
+            // wait for create replication slot to have finished
+            sleepInMills(300);
         }
+
+        if (!snapshot.containsKey(LSN_POSITION)) {
+            LogSequenceNumber lsn = connectorMapper.execute(databaseTemplate -> LogSequenceNumber.valueOf(databaseTemplate.queryForObject(GET_RESTART_LSN, new Object[] {database, slotName, plugin}, String.class)));
+            if (null == lsn || lsn.asLong() == 0) {
+                throw new ListenerException("No maximum LSN recorded in the database");
+            }
+            snapshot.put(LSN_POSITION, lsn.asString());
+        }
+
+        this.startLsn = LogSequenceNumber.valueOf(snapshot.get(LSN_POSITION));
     }
 
     private void dropReplicationSlot() {
@@ -222,12 +224,6 @@ public class PostgreSQLExtractor extends AbstractExtractor {
         }
     }
 
-    private LogSequenceNumber currentXLogLocation() throws SQLException {
-        int majorVersion = connection.getMetaData().getDatabaseMajorVersion();
-        String sql = majorVersion >= 10 ? "select * from pg_current_wal_lsn()" : "select * from pg_current_xlog_location()";
-        return connectorMapper.execute(databaseTemplate -> LogSequenceNumber.valueOf(databaseTemplate.queryForObject(sql, String.class)));
-    }
-
     private void recover() {
         connectLock.lock();
         try {
@@ -283,13 +279,12 @@ public class PostgreSQLExtractor extends AbstractExtractor {
                     flushLsn(lsn);
                     // process decoder
                     changedEvent(messageDecoder.processMessage(msg));
-                    forceFlushEvent();
 
                     // feedback
                     stream.setAppliedLSN(lsn);
                     stream.setFlushedLSN(lsn);
                     stream.forceUpdateStatus();
-                } catch (IllegalStateException e) {
+                } catch (IllegalStateException | ListenerException e) {
                     logger.error(e.getMessage());
                 } catch (Exception e) {
                     logger.error(e.getMessage());

+ 12 - 7
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/AbstractColumnValue.java

@@ -10,7 +10,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.SQLException;
-import java.time.*;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.OffsetTime;
+import java.time.ZoneOffset;
 import java.util.concurrent.TimeUnit;
 
 public abstract class AbstractColumnValue implements ColumnValue {
@@ -18,8 +23,8 @@ public abstract class AbstractColumnValue implements ColumnValue {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Override
-    public LocalDate asLocalDate() {
-        return DateFormatUtil.stringToLocalDate(asString());
+    public Date asDate() {
+        return DateFormatUtil.stringToDate(asString());
     }
 
     @Override
@@ -48,13 +53,13 @@ public abstract class AbstractColumnValue implements ColumnValue {
     }
 
     @Override
-    public Instant asInstant() {
+    public Timestamp asTimestamp() {
         if ("infinity".equals(asString())) {
-            return toInstantFromMicros(PGStatement.DATE_POSITIVE_INFINITY);
+            return Timestamp.from(toInstantFromMicros(PGStatement.DATE_POSITIVE_INFINITY));
         } else if ("-infinity".equals(asString())) {
-            return toInstantFromMicros(PGStatement.DATE_NEGATIVE_INFINITY);
+            return Timestamp.from(toInstantFromMicros(PGStatement.DATE_NEGATIVE_INFINITY));
         }
-        return DateFormatUtil.timestampToInstant(asString());
+        return DateFormatUtil.stringToTimestamp(asString());
     }
 
     @Override

+ 6 - 4
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/ColumnValue.java

@@ -3,8 +3,8 @@ package org.dbsyncer.listener.postgresql.column;
 import org.postgresql.geometric.*;
 import org.postgresql.util.PGmoney;
 
-import java.time.Instant;
-import java.time.LocalDate;
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.time.OffsetDateTime;
 import java.time.OffsetTime;
 
@@ -16,6 +16,8 @@ import java.time.OffsetTime;
  */
 public interface ColumnValue {
 
+    void setValue(String value);
+
     boolean isNull();
 
     String asString();
@@ -32,11 +34,11 @@ public interface ColumnValue {
 
     Object asDecimal();
 
-    LocalDate asLocalDate();
+    Date asDate();
 
     OffsetDateTime asOffsetDateTimeAtUtc();
 
-    Instant asInstant();
+    Timestamp asTimestamp();
 
     Object asTime();
 

+ 0 - 158
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/ColumnValueResolver.java

@@ -1,158 +0,0 @@
-package org.dbsyncer.listener.postgresql.column;
-
-import org.postgresql.util.PGmoney;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/4/23 22:45
- */
-public class ColumnValueResolver {
-
-    /**
-     * Resolve the value of a {@link ColumnValue}.
-     *
-     * @param type
-     * @param value
-     * @return
-     */
-    public Object resolveValue(String type, ColumnValue value) {
-        if (value.isNull()) {
-            // nulls are null
-            return null;
-        }
-
-        switch (type) {
-            // include all types from https://www.postgresql.org/docs/current/static/datatype.html#DATATYPE-TABLE
-            // plus aliases from the shorter names produced by older wal2json
-            case "boolean":
-            case "bool":
-                return value.asBoolean();
-
-            case "hstore":
-                return value.asString();
-
-            case "integer":
-            case "int":
-            case "int4":
-            case "smallint":
-            case "int2":
-            case "smallserial":
-            case "serial":
-            case "serial2":
-            case "serial4":
-                return value.asInteger();
-
-            case "bigint":
-            case "bigserial":
-            case "int8":
-            case "oid":
-                return value.asLong();
-
-            case "real":
-            case "float4":
-                return value.asFloat();
-
-            case "double precision":
-            case "float8":
-                return value.asDouble();
-
-            case "numeric":
-            case "decimal":
-                return value.asDecimal();
-
-            case "character":
-            case "char":
-            case "character varying":
-            case "varchar":
-            case "bpchar":
-            case "text":
-                return value.asString();
-
-            case "date":
-                return value.asLocalDate();
-
-            case "timestamp with time zone":
-            case "timestamptz":
-                return value.asOffsetDateTimeAtUtc();
-
-            case "timestamp":
-            case "timestamp without time zone":
-                return value.asInstant();
-
-            case "time":
-                return value.asTime();
-
-            case "time without time zone":
-                return value.asLocalTime();
-
-            case "time with time zone":
-            case "timetz":
-                return value.asOffsetTimeUtc();
-
-            case "bytea":
-                return value.asByteArray();
-
-            // these are all PG-specific types and we use the JDBC representations
-            // note that, with the exception of point, no converters for these types are implemented yet,
-            // i.e. those values won't actually be propagated to the outbound message until that's the case
-            case "box":
-                return value.asBox();
-            case "circle":
-                return value.asCircle();
-            case "interval":
-                return value.asInterval();
-            case "line":
-                return value.asLine();
-            case "lseg":
-                return value.asLseg();
-            case "money":
-                final Object v = value.asMoney();
-                return (v instanceof PGmoney) ? ((PGmoney) v).val : v;
-            case "path":
-                return value.asPath();
-            case "point":
-                return value.asPoint();
-            case "polygon":
-                return value.asPolygon();
-
-            // PostGIS types are HexEWKB strings
-            // ValueConverter turns them into the correct types
-            case "geometry":
-            case "geography":
-                return value.asString();
-
-            case "citext":
-            case "bit":
-            case "bit varying":
-            case "varbit":
-            case "json":
-            case "jsonb":
-            case "xml":
-            case "uuid":
-            case "tsrange":
-            case "tstzrange":
-            case "daterange":
-            case "inet":
-            case "cidr":
-            case "macaddr":
-            case "macaddr8":
-            case "int4range":
-            case "numrange":
-            case "int8range":
-                return value.asString();
-
-            // catch-all for other known/builtin PG types
-            // TODO: improve with more specific/useful classes here?
-            case "pg_lsn":
-            case "tsquery":
-            case "tsvector":
-            case "txid_snapshot":
-                // catch-all for unknown (extension module/custom) types
-            default:
-                return null;
-        }
-
-    }
-
-}

+ 58 - 58
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/TestDecodingColumnValue.java → dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/column/PgColumnValue.java

@@ -1,59 +1,59 @@
-package org.dbsyncer.listener.postgresql.column;
-
-import org.dbsyncer.common.util.StringUtil;
-
-import java.math.BigDecimal;
-
-public final class TestDecodingColumnValue extends AbstractColumnValue {
-
-    private String value;
-
-    public TestDecodingColumnValue(String value) {
-        this.value = value;
-    }
-
-    @Override
-    public boolean isNull() {
-        return value == null;
-    }
-
-    @Override
-    public String asString() {
-        return value;
-    }
-
-    @Override
-    public Boolean asBoolean() {
-        return "t".equalsIgnoreCase(value);
-    }
-
-    @Override
-    public Integer asInteger() {
-        return Integer.valueOf(value);
-    }
-
-    @Override
-    public Long asLong() {
-        return Long.valueOf(value);
-    }
-
-    @Override
-    public Float asFloat() {
-        return Float.valueOf(value);
-    }
-
-    @Override
-    public Double asDouble() {
-        return Double.valueOf(value);
-    }
-
-    @Override
-    public Object asDecimal() {
-        return new BigDecimal(value);
-    }
-
-    @Override
-    public byte[] asByteArray() {
-        return StringUtil.hexStringToByteArray(value.substring(2));
-    }
+package org.dbsyncer.listener.postgresql.column;
+
+import org.dbsyncer.common.util.StringUtil;
+
+import java.math.BigDecimal;
+
+public final class PgColumnValue extends AbstractColumnValue {
+
+    private String value;
+
+    public void setValue(String value) {
+        this.value = value;
+    }
+
+    @Override
+    public boolean isNull() {
+        return value == null;
+    }
+
+    @Override
+    public String asString() {
+        return value;
+    }
+
+    @Override
+    public Boolean asBoolean() {
+        return "t".equalsIgnoreCase(value);
+    }
+
+    @Override
+    public Integer asInteger() {
+        return Integer.valueOf(value);
+    }
+
+    @Override
+    public Long asLong() {
+        return Long.valueOf(value);
+    }
+
+    @Override
+    public Float asFloat() {
+        return Float.valueOf(value);
+    }
+
+    @Override
+    public Double asDouble() {
+        return Double.valueOf(value);
+    }
+
+    @Override
+    public Object asDecimal() {
+        return new BigDecimal(value);
+    }
+
+    @Override
+    public byte[] asByteArray() {
+        return StringUtil.hexStringToByteArray(value.substring(2));
+    }
 }

+ 116 - 117
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/PgOutputMessageDecoder.java

@@ -1,7 +1,10 @@
 package org.dbsyncer.listener.postgresql.decoder;
 
 import org.dbsyncer.common.event.RowChangedEvent;
-import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
@@ -10,12 +13,11 @@ import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
 
 import java.nio.ByteBuffer;
 import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 
 /**
  * @author AE86
@@ -27,29 +29,17 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     private static final LocalDateTime PG_EPOCH = LocalDateTime.of(2000, 1, 1, 0, 0, 0);
+    private static final String GET_TABLE_SCHEMA = "select oid,relname as tableName from pg_class t inner join (select ns.oid as nspoid, ns.nspname from pg_namespace ns where ns.nspname = (select (current_schemas(false))[s.r] from generate_series(1, array_upper(current_schemas(false), 1)) as s(r))) as n on n.nspoid = t.relnamespace where relkind = 'r'";
+    private static final Map<Integer, TableId> tables = new LinkedHashMap<>();
+    private ConnectorFactory connectorFactory;
+    private DatabaseConnectorMapper connectorMapper;
 
     @Override
-    public void postProcessBeforeInitialization(DatabaseConnectorMapper connectorMapper) {
-        String pubName = getPubName();
-        String selectPublication = String.format("SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", pubName);
-        Integer count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(selectPublication, Integer.class));
-        if (0 < count) {
-            return;
-        }
-
-        logger.info("Creating new publication '{}' for plugin '{}'", pubName, getOutputPlugin());
-        try {
-            String createPublication = String.format("CREATE PUBLICATION %s FOR ALL TABLES", pubName);
-            logger.info("Creating Publication with statement '{}'", createPublication);
-            connectorMapper.execute(databaseTemplate -> {
-                databaseTemplate.execute(createPublication);
-                return true;
-            });
-        } catch (Exception e) {
-            throw new ListenerException(e.getCause());
-        }
-
-        // TODO read table schema
+    public void postProcessBeforeInitialization(ConnectorFactory connectorFactory, DatabaseConnectorMapper connectorMapper) {
+        this.connectorFactory = connectorFactory;
+        this.connectorMapper = connectorMapper;
+        initPublication();
+        readSchema();
     }
 
     @Override
@@ -58,20 +48,12 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
             throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
         }
 
-        RowChangedEvent event = null;
         MessageTypeEnum type = MessageTypeEnum.getType((char) buffer.get());
         switch (type) {
             case UPDATE:
-                event = parseUpdate(buffer);
-                break;
-
             case INSERT:
-                event = parseInsert(buffer);
-                break;
-
             case DELETE:
-                event = parseDelete(buffer);
-                break;
+                return parseData(type, buffer);
 
             case BEGIN:
                 long beginLsn = buffer.getLong();
@@ -92,73 +74,9 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
                 logger.info("Type {} not implemented", type.name());
         }
 
-        if (null != event) {
-            logger.info(event.toString());
-        }
-
         return null;
     }
 
-    private RowChangedEvent parseDelete(ByteBuffer buffer) {
-        int relationId = buffer.getInt();
-        logger.info("Delete table {}", relationId);
-
-        List<Object> data = new ArrayList<>();
-        String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
-
-        switch (newTuple) {
-            case "K":
-                readTupleData(buffer, data);
-                break;
-            default:
-                logger.info("K not set, got instead {}", newTuple);
-        }
-        return new RowChangedEvent(String.valueOf(relationId), ConnectorConstant.OPERTION_INSERT, data, Collections.EMPTY_LIST);
-    }
-
-    private RowChangedEvent parseInsert(ByteBuffer buffer) {
-        int relationId = buffer.getInt();
-        logger.info("Insert table {}", relationId);
-
-        List<Object> data = new ArrayList<>();
-        String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
-        switch (newTuple) {
-            case "N":
-                readTupleData(buffer, data);
-                break;
-            default:
-                logger.info("N not set, got instead {}", newTuple);
-        }
-        return new RowChangedEvent(String.valueOf(relationId), ConnectorConstant.OPERTION_INSERT, Collections.EMPTY_LIST, data);
-    }
-
-    private RowChangedEvent parseUpdate(ByteBuffer buffer) {
-        int relationId = buffer.getInt();
-        logger.info("Update table {}", relationId);
-
-        List<Object> data = new ArrayList<>();
-        String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
-        switch (newTuple) {
-            case "K":
-                logger.info("Key update");
-                logger.info("Old Key");
-                readTupleData(buffer, data);
-                break;
-            case "O":
-                logger.info("Value update");
-                logger.info("Old Value");
-                readTupleData(buffer, data);
-                break;
-            case "N":
-                readTupleData(buffer, data);
-                break;
-            default:
-                logger.info("K or O Byte1 not set, got instead {}", newTuple);
-        }
-
-        return new RowChangedEvent(String.valueOf(relationId), ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_LIST, data);
-    }
-
     @Override
     public String getOutputPlugin() {
         return MessageDecoderEnum.PG_OUTPUT.getType();
@@ -174,31 +92,112 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder {
         return String.format("dbs_pub_%s_%s", config.getSchema(), config.getUsername());
     }
 
-    private void readTupleData(ByteBuffer msg, List<Object> data) {
-        short nColumn = msg.getShort();
-        for (int n = 0; n < nColumn; n++) {
-            String tupleContentType = new String(new byte[]{msg.get()}, 0, 1);
-            if (tupleContentType.equals("t")) {
-                int size = msg.getInt();
-                byte[] text = new byte[size];
-
-                for (int z = 0; z < size; z++) {
-                    text[z] = msg.get();
-                }
-                String content = new String(text, 0, size);
-                data.add(content);
-                continue;
+    private void initPublication() {
+        String pubName = getPubName();
+        String selectPublication = String.format("SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", pubName);
+        Integer count = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(selectPublication, Integer.class));
+        if (0 < count) {
+            return;
+        }
+
+        logger.info("Creating new publication '{}' for plugin '{}'", pubName, getOutputPlugin());
+        try {
+            String createPublication = String.format("CREATE PUBLICATION %s FOR ALL TABLES", pubName);
+            logger.info("Creating Publication with statement '{}'", createPublication);
+            connectorMapper.execute(databaseTemplate -> {
+                databaseTemplate.execute(createPublication);
+                return true;
+            });
+        } catch (Exception e) {
+            throw new ListenerException(e.getCause());
+        }
+    }
+
+    private void readSchema() {
+        List<Map> schemas = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(GET_TABLE_SCHEMA));
+        if (!CollectionUtils.isEmpty(schemas)) {
+            schemas.forEach(map -> {
+                Long oid = (Long) map.get("oid");
+                String tableName = (String) map.get("tableName");
+                MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableName);
+                Assert.notEmpty(metaInfo.getColumn(), String.format("The table column for '%s' must not be empty.", tableName));
+                tables.put(oid.intValue(), new TableId(oid.intValue(), tableName, metaInfo.getColumn()));
+            });
+        }
+    }
+
+    private RowChangedEvent parseData(MessageTypeEnum type, ByteBuffer buffer) {
+        final int relationId = buffer.getInt();
+        final TableId tableId = tables.get(relationId);
+        if (null != tableId) {
+            String newTuple = new String(new byte[]{buffer.get()}, 0, 1);
+            switch (newTuple) {
+                case "N":
+                case "K":
+                case "O":
+                    List<Object> data = new ArrayList<>();
+                    readTupleData(tableId, buffer, data);
+                    if (MessageTypeEnum.DELETE == type) {
+                        return new RowChangedEvent(tableId.tableName, type.name(), data, Collections.EMPTY_LIST);
+                    }
+                    return new RowChangedEvent(tableId.tableName, type.name(), Collections.EMPTY_LIST, data);
+
+                default:
+                    logger.info("N, K, O not set, got instead {}", newTuple);
             }
+        }
+        return null;
+    }
+
+    private void readTupleData(TableId tableId, ByteBuffer msg, List<Object> data) {
+        short nColumn = msg.getShort();
+        if (nColumn != tableId.fields.size()) {
+            logger.warn("The column size of table '{}' is {}, but we has been received column size is {}.", tableId.tableName, tableId.fields.size(), nColumn);
 
-            if (tupleContentType.equals("n")) {
-                data.add(null);
-                continue;
+            // The table schema has been changed, we should be get a new table schema from db.
+            MetaInfo metaInfo = connectorFactory.getMetaInfo(connectorMapper, tableId.tableName);
+            if (CollectionUtils.isEmpty(metaInfo.getColumn())) {
+                throw new ListenerException(String.format("The table column for '%s' is empty.", tableId.tableName));
             }
+            tableId.fields = metaInfo.getColumn();
+            return;
+        }
 
-            if (tupleContentType.equals("u")) {
-                data.add("TOASTED");
+        for (int n = 0; n < nColumn; n++) {
+            String type = new String(new byte[]{msg.get()}, 0, 1);
+            switch (type) {
+                case "t":
+                    int size = msg.getInt();
+                    byte[] text = new byte[size];
+                    for (int z = 0; z < size; z++) {
+                        text[z] = msg.get();
+                    }
+                    data.add(resolveValue(tableId.fields.get(n).getTypeName(), new String(text, 0, size)));
+                    break;
+
+                case "n":
+                    data.add(null);
+                    break;
+
+                case "u":
+                    data.add("TOASTED");
+                    break;
+                default:
+                    logger.info("t, n, u not set, got instead {}", type);
             }
         }
     }
 
+    final class TableId {
+        Integer oid;
+        String tableName;
+        List<Field> fields;
+
+        public TableId(Integer oid, String tableName, List<Field> fields) {
+            this.oid = oid;
+            this.tableName = tableName;
+            this.fields = fields;
+        }
+    }
+
 }

+ 2 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/decoder/TestDecodingMessageDecoder.java

@@ -3,9 +3,7 @@ package org.dbsyncer.listener.postgresql.decoder;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.postgresql.AbstractMessageDecoder;
-import org.dbsyncer.listener.postgresql.column.ColumnValueResolver;
-import org.dbsyncer.listener.postgresql.column.Lexer;
-import org.dbsyncer.listener.postgresql.column.TestDecodingColumnValue;
+import org.dbsyncer.common.column.Lexer;
 import org.dbsyncer.listener.postgresql.enums.MessageDecoderEnum;
 import org.dbsyncer.listener.postgresql.enums.MessageTypeEnum;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
@@ -25,7 +23,6 @@ import java.util.List;
 public class TestDecodingMessageDecoder extends AbstractMessageDecoder {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
-    private static final ColumnValueResolver resolver = new ColumnValueResolver();
 
     @Override
     public RowChangedEvent processMessage(ByteBuffer buffer) {
@@ -77,7 +74,7 @@ public class TestDecodingMessageDecoder extends AbstractMessageDecoder {
             String type = parseType(lexer);
             lexer.skip(1);
             String value = parseValue(lexer);
-            data.add(resolver.resolveValue(type, new TestDecodingColumnValue(value)));
+            data.add(resolveValue(type, value));
         }
 
         RowChangedEvent event = null;

+ 1 - 1
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/ESQuartzExtractor.java

@@ -3,7 +3,7 @@ package org.dbsyncer.listener.quartz;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.config.Filter;
+import org.dbsyncer.connector.model.Filter;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.ListenerException;
 import org.dbsyncer.listener.enums.QuartzFilterEnum;

+ 3 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/filter/DateFilter.java

@@ -3,7 +3,8 @@ package org.dbsyncer.listener.quartz.filter;
 import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.listener.quartz.QuartzFilter;
 
-import java.util.Date;
+import java.sql.Date;
+import java.time.LocalDate;
 
 public class DateFilter implements QuartzFilter {
 
@@ -15,7 +16,7 @@ public class DateFilter implements QuartzFilter {
 
     @Override
     public Object getObject() {
-        return new Date();
+        return Date.valueOf(LocalDate.now());
     }
 
     @Override

+ 83 - 0
dbsyncer-listener/src/main/test/FileWatchTest.java

@@ -0,0 +1,83 @@
+import org.apache.commons.io.IOUtils;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.*;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/5/6 21:32
+ */
+public class FileWatchTest {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private String path = "d:/test/";
+    private WatchService watchService;
+
+    @After
+    public void close() throws IOException {
+        if (null != watchService) {
+            watchService.close();
+        }
+    }
+
+    @Test
+    public void testFileWatch() throws IOException, InterruptedException {
+        watchService = FileSystems.getDefault().newWatchService();
+        Path p = Paths.get(path);
+        p.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
+
+        logger.info("启动监听");
+        long count = 0L;
+        while (count < 30) {
+            WatchKey watchKey = watchService.take();
+            List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
+            for (WatchEvent<?> event : watchEvents) {
+                Object context = event.context();
+                logger.info("[{}{}] 文件发生了[{}]事件", path, context, event.kind());
+            }
+            watchKey.reset();
+
+            TimeUnit.SECONDS.sleep(1);
+            count++;
+        }
+    }
+
+    @Test
+    public void testReadFile() {
+        read(path + "test.txt");
+    }
+
+    private void read(String file) {
+        RandomAccessFile raf = null;
+        byte[] buffer = new byte[4096];
+        try {
+            raf = new RandomAccessFile(file, "r");
+            raf.seek(raf.length());
+            logger.info("offset:{}", raf.getFilePointer());
+
+            while (true) {
+                int len = raf.read(buffer);
+                if (-1 != len) {
+                    logger.info("offset:{}, len:{}", raf.getFilePointer(), len);
+                    logger.info(new String(buffer, 1, len, "UTF-8"));
+                }
+                TimeUnit.SECONDS.sleep(1);
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage());
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        } finally {
+            IOUtils.closeQuietly(raf);
+        }
+    }
+
+}

+ 1 - 1
dbsyncer-listener/src/main/test/KafkaClientTest.java

@@ -1,12 +1,12 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.dbsyncer.common.util.JsonUtil;
-import org.dbsyncer.connector.config.Field;
 import org.dbsyncer.connector.config.KafkaConfig;
 import org.dbsyncer.connector.enums.KafkaFieldTypeEnum;
 import org.dbsyncer.connector.kafka.KafkaClient;
 import org.dbsyncer.connector.kafka.serialization.JsonToMapDeserializer;
 import org.dbsyncer.connector.kafka.serialization.MapToJsonSerializer;
+import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.util.KafkaUtil;
 import org.junit.After;
 import org.junit.Before;

+ 1 - 1
dbsyncer-manager/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.1.7-Beta</version>
+        <version>1.1.8-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-manager</artifactId>

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/Manager.java

@@ -3,8 +3,8 @@ package org.dbsyncer.manager;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.config.MetaInfo;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/ManagerFactory.java

@@ -5,8 +5,8 @@ import org.dbsyncer.manager.event.ClosedEvent;
 import org.dbsyncer.common.model.Paging;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.config.MetaInfo;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/config/FieldPicker.java

@@ -3,8 +3,8 @@ package org.dbsyncer.manager.config;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.CompareFilter;
-import org.dbsyncer.connector.config.Field;
-import org.dbsyncer.connector.config.Filter;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.Filter;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.parser.model.FieldMapping;

+ 2 - 2
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -8,8 +8,8 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.config.Field;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.dbsyncer.listener.Extractor;

+ 1 - 1
dbsyncer-monitor/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.1.7-Beta</version>
+        <version>1.1.8-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-monitor</artifactId>

+ 1 - 1
dbsyncer-parser/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.1.7-Beta</version>
+        <version>1.1.8-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-parser</artifactId>

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -4,8 +4,8 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.config.MetaInfo;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;

+ 11 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -13,6 +13,9 @@ import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.ConnectorEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.listener.enums.QuartzFilterEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
 import org.dbsyncer.parser.enums.ParserEnum;
@@ -281,7 +284,14 @@ public class ParserFactory implements Parser {
             // 6、更新结果
             flush(task, writer);
 
-            // 7、更新分页数
+            // 7、判断尾页
+            if (data.size() < pageSize) {
+                params.clear();
+                logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
+                break;
+            }
+
+            // 8、更新分页数
             params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
         }
     }

+ 3 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/DateHandler.java

@@ -3,7 +3,8 @@ package org.dbsyncer.parser.convert.handler;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.parser.convert.Handler;
 
-import java.util.Date;
+import java.sql.Date;
+import java.time.LocalDate;
 
 /**
  * 系统日期
@@ -16,6 +17,6 @@ public class DateHandler implements Handler {
 
     @Override
     public Object handle(String args, Object value) {
-        return null == value || StringUtil.isBlank(String.valueOf(value)) ? new Date() : value;
+        return null == value || StringUtil.isBlank(String.valueOf(value)) ? Date.valueOf(LocalDate.now()) : value;
     }
 }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/DateToChineseStandardTimeHandler.java

@@ -3,7 +3,7 @@ package org.dbsyncer.parser.convert.handler;
 import org.dbsyncer.common.util.DateFormatUtil;
 import org.dbsyncer.parser.convert.AbstractHandler;
 
-import java.util.Date;
+import java.sql.Date;
 
 /**
  * Date转中国标准时间

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/convert/handler/TimestampToDateHandler.java

@@ -2,9 +2,9 @@ package org.dbsyncer.parser.convert.handler;
 
 import org.dbsyncer.parser.convert.AbstractHandler;
 
+import java.sql.Date;
 import java.sql.Timestamp;
 import java.time.ZoneId;
-import java.util.Date;
 
 /**
  * 时间戳转日期

+ 21 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -11,7 +11,8 @@ import java.time.Instant;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -28,9 +29,11 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     @Autowired
     private ScheduledTaskService scheduledTaskService;
 
-    private Queue<Request> buffer = new ConcurrentLinkedQueue();
+    private static final int CAPACITY = 10_0000;
 
-    private Queue<Request> temp = new ConcurrentLinkedQueue();
+    private Queue<Request> buffer = new LinkedBlockingQueue(CAPACITY);
+
+    private Queue<Request> temp = new LinkedBlockingQueue(CAPACITY);
 
     private final Lock lock = new ReentrantLock(true);
 
@@ -82,11 +85,24 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
 
     @Override
     public void offer(BufferRequest request) {
+        int size = 0;
         if (running) {
             temp.offer((Request) request);
-            return;
+            size = temp.size();
+        } else {
+            buffer.offer((Request) request);
+            size = temp.size();
+        }
+
+        // TODO 临时解决方案:生产大于消费问题,限制生产速度
+        if (size >= CAPACITY) {
+            try {
+                TimeUnit.SECONDS.sleep(30);
+                logger.warn("当前任务队列大小{}已达上限{},请稍等{}秒", size, CAPACITY, 30);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage());
+            }
         }
-        buffer.offer((Request) request);
     }
 
     @Override

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -22,7 +22,7 @@ public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest
 
     @Override
     protected long getPeriod() {
-        return 3000;
+        return 500;
     }
 
     @Override

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractWriter.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.parser.flush.model;
 
-import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.model.Field;
 
 import java.util.List;
 import java.util.Map;

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterRequest.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.parser.flush.model;
 
-import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.parser.flush.BufferRequest;
 
 import java.util.List;

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/AbstractConfigModel.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.parser.model;
 
-import org.dbsyncer.connector.config.Filter;
+import org.dbsyncer.connector.model.Filter;
 import org.dbsyncer.plugin.config.Plugin;
 
 import java.util.List;

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/BatchWriter.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.parser.model;
 
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.model.Field;
 
 import java.util.List;
 import java.util.Map;

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Connector.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.parser.model;
 
 import org.dbsyncer.connector.config.ConnectorConfig;
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.model.Table;
 
 import java.util.List;
 

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/FieldMapping.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.parser.model;
 
-import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.model.Field;
 
 /**
  * 字段映射关系

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Mapping.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.parser.model;
 
-import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.dbsyncer.parser.enums.ModelEnum;
 

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Picker.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.parser.model;
 
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.model.Field;
 
 import java.util.*;
 

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/TableGroup.java

@@ -1,6 +1,6 @@
 package org.dbsyncer.parser.model;
 
-import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.model.Table;
 
 import java.util.List;
 import java.util.Map;

+ 2 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/util/PickerUtil.java

@@ -2,8 +2,8 @@ package org.dbsyncer.parser.util;
 
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.config.Field;
-import org.dbsyncer.connector.config.Filter;
+import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.model.Filter;
 import org.dbsyncer.parser.model.Convert;
 import org.dbsyncer.parser.model.FieldMapping;
 import org.dbsyncer.parser.model.Mapping;

+ 1 - 1
dbsyncer-plugin/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.1.7-Beta</version>
+        <version>1.1.8-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-plugin</artifactId>

Nem az összes módosított fájl került megjelenítésre, mert túl sok fájl változott