浏览代码

!57 merge
Merge pull request !57 from AE86/V_1.0.0_Beta

AE86 3 年之前
父节点
当前提交
e7f846666c
共有 68 个文件被更改,包括 1084 次插入947 次删除
  1. 10 4
      README.md
  2. 6 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/AbstractDataBaseConfigChecker.java
  3. 22 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/DqlPostgreSQLConfigChecker.java
  4. 1 7
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/DqlSqlServerConfigChecker.java
  5. 20 0
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/PostgreSQLConfigChecker.java
  6. 1 7
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/SqlServerConfigChecker.java
  7. 16 25
      dbsyncer-common/src/main/java/org/dbsyncer/common/event/RowChangedEvent.java
  8. 34 29
      dbsyncer-common/src/main/java/org/dbsyncer/common/model/Result.java
  9. 2 2
      dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java
  10. 6 0
      dbsyncer-connector/pom.xml
  11. 0 8
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java
  12. 0 6
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java
  13. 15 10
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/DatabaseConfig.java
  14. 0 22
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlServerDatabaseConfig.java
  15. 49 8
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterBatchConfig.java
  16. 0 47
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterConfig.java
  17. 0 71
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterSingleConfig.java
  18. 5 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/constant/DatabaseConstant.java
  19. 144 215
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  20. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseConnectorMapper.java
  21. 4 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleDataSource.java
  22. 15 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/ConnectorEnum.java
  23. 23 71
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java
  24. 2 19
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java
  25. 55 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java
  26. 88 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java
  27. 3 20
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLMysqlConnector.java
  28. 3 28
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java
  29. 22 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLPostgreSQLConnector.java
  30. 3 28
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java
  31. 0 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServer.java
  32. 7 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java
  33. 8 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java
  34. 25 8
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/BinaryLogRemoteClient.java
  35. 2 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java
  36. 1 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java
  37. 1 1
      dbsyncer-listener/src/main/test/DBChangeNotificationTest.java
  38. 9 4
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  39. 3 12
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java
  40. 31 59
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  41. 31 16
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java
  42. 16 19
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java
  43. 1 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java
  44. 10 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferRequest.java
  45. 3 3
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferResponse.java
  46. 2 7
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushStrategy.java
  47. 4 6
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/DisableFullFlushStrategy.java
  48. 1 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/EnableFlushStrategy.java
  49. 4 7
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java
  50. 9 9
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java
  51. 35 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java
  52. 0 10
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractRequest.java
  53. 84 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractWriter.java
  54. 4 2
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/StorageRequest.java
  55. 3 1
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/StorageResponse.java
  56. 11 38
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterRequest.java
  57. 11 63
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterResponse.java
  58. 68 0
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/BatchWriter.java
  59. 5 9
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java
  60. 11 4
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java
  61. 7 7
      dbsyncer-web/src/main/java/org/dbsyncer/web/config/WebAppConfig.java
  62. 3 3
      dbsyncer-web/src/main/resources/application.properties
  63. 62 0
      dbsyncer-web/src/main/resources/public/connector/addDqlPostgreSQL.html
  64. 50 0
      dbsyncer-web/src/main/resources/public/connector/addPostgreSQL.html
  65. 4 2
      dbsyncer-web/src/main/resources/public/mapping/editTable.html
  66. 二进制
      dbsyncer-web/src/main/resources/static/img/DqlPostgreSQL.png
  67. 二进制
      dbsyncer-web/src/main/resources/static/img/PostgreSQL.png
  68. 8 0
      pom.xml

+ 10 - 4
README.md

@@ -1,5 +1,5 @@
 ## 介绍
-DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServer、Elasticsearch(ES)、Kafka、SQL(Mysql/Oracle/SqlServer)等同步场景。支持上传插件自定义同步转换业务,提供监控全量和增量数据统计图、应用性能预警等。
+DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServer、PostgreSQL、Elasticsearch(ES)、Kafka、SQL(Mysql/Oracle/SqlServer/PostgreSQL)等同步场景。支持上传插件自定义同步转换业务,提供监控全量和增量数据统计图、应用性能预警等。
 
 > 特点
 * 组合驱动,自定义库同步到库组合,关系型数据库与非关系型之间组合,任意搭配表同步映射关系
@@ -35,11 +35,17 @@ DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServ
                 <td>✔</td>
                 <td>2008以上</td>
             </tr>
+            <tr>
+                <td>PostgreSQL</td>
+                <td>✔</td>
+                <td>✔</td>
+                <td>9.5.25以上</td>
+            </tr>
             <tr>
                 <td>ES</td>
                 <td>✔</td>
                 <td>✔</td>
-                <td>6.X以上</td>
+                <td>6.0以上</td>
             </tr>
             <tr>
                 <td>Kafka</td>
@@ -51,11 +57,11 @@ DBSyncer是一款开源的数据同步中间件,提供Mysql、Oracle、SqlServ
                 <td>SQL</td>
                 <td>✔</td>
                 <td></td>
-                <td></td>
+                <td>支持以上关系型数据库</td>
             </tr>
             <tr>
                 <td>最近计划</td>
-                <td colspan="3">PostgreSQL(设计中)、Redis</td>
+                <td colspan="3">Redis</td>
             </tr>
         </tbody>
     </table>

+ 6 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/AbstractDataBaseConfigChecker.java

@@ -39,4 +39,10 @@ public abstract class AbstractDataBaseConfigChecker implements ConnectorConfigCh
         connectorConfig.setTable(table);
     }
 
+    protected void modifySchema(DatabaseConfig connectorConfig, Map<String, String> params) {
+        String schema = params.get("schema");
+        Assert.hasText(schema, "Schema is empty.");
+        connectorConfig.setSchema(schema);
+    }
+
 }

+ 22 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/DqlPostgreSQLConfigChecker.java

@@ -0,0 +1,22 @@
+package org.dbsyncer.biz.checker.impl.connector;
+
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/4/5 22:14
+ */
+@Component
+public class DqlPostgreSQLConfigChecker extends AbstractDataBaseConfigChecker {
+
+    @Override
+    public void modify(DatabaseConfig connectorConfig, Map<String, String> params) {
+        super.modify(connectorConfig, params);
+        super.modifyDql(connectorConfig, params);
+        super.modifySchema(connectorConfig, params);
+    }
+}

+ 1 - 7
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/DqlSqlServerConfigChecker.java

@@ -1,9 +1,7 @@
 package org.dbsyncer.biz.checker.impl.connector;
 
 import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.SqlServerDatabaseConfig;
 import org.springframework.stereotype.Component;
-import org.springframework.util.Assert;
 
 import java.util.Map;
 
@@ -19,10 +17,6 @@ public class DqlSqlServerConfigChecker extends AbstractDataBaseConfigChecker {
     public void modify(DatabaseConfig connectorConfig, Map<String, String> params) {
         super.modify(connectorConfig, params);
         super.modifyDql(connectorConfig, params);
-        String schema = params.get("schema");
-        Assert.hasText(schema, "Schema is empty.");
-
-        SqlServerDatabaseConfig config = (SqlServerDatabaseConfig) connectorConfig;
-        config.setSchema(schema);
+        super.modifySchema(connectorConfig, params);
     }
 }

+ 20 - 0
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/PostgreSQLConfigChecker.java

@@ -0,0 +1,20 @@
+package org.dbsyncer.biz.checker.impl.connector;
+
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/4/5 22:14
+ */
+@Component
+public class PostgreSQLConfigChecker extends AbstractDataBaseConfigChecker {
+    @Override
+    public void modify(DatabaseConfig connectorConfig, Map<String, String> params) {
+        super.modify(connectorConfig, params);
+        super.modifySchema(connectorConfig, params);
+    }
+}

+ 1 - 7
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/SqlServerConfigChecker.java

@@ -1,9 +1,7 @@
 package org.dbsyncer.biz.checker.impl.connector;
 
 import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.SqlServerDatabaseConfig;
 import org.springframework.stereotype.Component;
-import org.springframework.util.Assert;
 
 import java.util.Map;
 
@@ -17,10 +15,6 @@ public class SqlServerConfigChecker extends AbstractDataBaseConfigChecker {
     @Override
     public void modify(DatabaseConfig connectorConfig, Map<String, String> params) {
         super.modify(connectorConfig, params);
-        String schema = params.get("schema");
-        Assert.hasText(schema, "Schema is empty.");
-
-        SqlServerDatabaseConfig config = (SqlServerDatabaseConfig) connectorConfig;
-        config.setSchema(schema);
+        super.modifySchema(connectorConfig, params);
     }
 }

+ 16 - 25
dbsyncer-common/src/main/java/org/dbsyncer/common/event/RowChangedEvent.java

@@ -18,13 +18,13 @@ import java.util.Map;
 public class RowChangedEvent {
 
     private int tableGroupIndex;
-    private String tableName;
+    private String sourceTableName;
+    private String targetTableName;
     private String event;
     private List<Object> beforeData;
     private List<Object> afterData;
     private Map<String, Object> before;
     private Map<String, Object> after;
-    private boolean forceUpdate;
 
     public RowChangedEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after) {
         this.tableGroupIndex = tableGroupIndex;
@@ -33,16 +33,8 @@ public class RowChangedEvent {
         this.after = after;
     }
 
-    public RowChangedEvent(int tableGroupIndex, String event, Map<String, Object> before, Map<String, Object> after, boolean forceUpdate) {
-        this.tableGroupIndex = tableGroupIndex;
-        this.event = event;
-        this.before = before;
-        this.after = after;
-        this.forceUpdate = forceUpdate;
-    }
-
-    public RowChangedEvent(String tableName, String event, List<Object> beforeData, List<Object> afterData) {
-        this.tableName = tableName;
+    public RowChangedEvent(String sourceTableName, String event, List<Object> beforeData, List<Object> afterData) {
+        this.sourceTableName = sourceTableName;
         this.event = event;
         this.beforeData = beforeData;
         this.afterData = afterData;
@@ -52,12 +44,20 @@ public class RowChangedEvent {
         return tableGroupIndex;
     }
 
-    public void setTableName(String tableName) {
-        this.tableName = tableName;
+    public String getSourceTableName() {
+        return sourceTableName;
+    }
+
+    public void setSourceTableName(String sourceTableName) {
+        this.sourceTableName = sourceTableName;
+    }
+
+    public String getTargetTableName() {
+        return targetTableName;
     }
 
-    public String getTableName() {
-        return tableName;
+    public void setTargetTableName(String targetTableName) {
+        this.targetTableName = targetTableName;
     }
 
     public String getEvent() {
@@ -88,15 +88,6 @@ public class RowChangedEvent {
         this.after = after;
     }
 
-    public boolean isForceUpdate() {
-        return forceUpdate;
-    }
-
-    public RowChangedEvent setForceUpdate(boolean forceUpdate) {
-        this.forceUpdate = forceUpdate;
-        return this;
-    }
-
     @Override
     public String toString() {
         return JsonUtil.objToJson(this);

+ 34 - 29
dbsyncer-common/src/main/java/org/dbsyncer/common/model/Result.java

@@ -1,54 +1,59 @@
 package org.dbsyncer.common.model;
 
+import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
 
-public class Result {
+public class Result<T> {
 
-    // 读取数据
-    private List<Map> data;
+    // 成功数据
+    private List<T> successData = new LinkedList<>();
 
     // 错误数据
-    private Queue<Map> failData;
-
-    // 错误数
-    private AtomicLong fail;
+    private List<T> failData = new LinkedList<>();
 
     // 错误日志
-    private StringBuffer error;
+    private StringBuffer error = new StringBuffer();
 
-    public Result() {
-        init();
-    }
+    private final Object LOCK = new Object();
 
-    public Result(List<Map> data) {
-        init();
-        this.data = data;
+    public Result() {
     }
 
-    private void init(){
-        this.failData = new ConcurrentLinkedQueue<>();
-        this.fail = new AtomicLong(0);
-        this.error = new StringBuffer();
+    public Result(List<T> data) {
+        this.successData.addAll(data);
     }
 
-    public List<Map> getData() {
-        return data;
+    public List<T> getSuccessData() {
+        return successData;
     }
 
-    public Queue<Map> getFailData() {
+    public List<T> getFailData() {
         return failData;
     }
 
-    public AtomicLong getFail() {
-        return fail;
-    }
-
     public StringBuffer getError() {
         return error;
     }
 
+    /**
+     * 线程安全添加集合
+     *
+     * @param failData
+     */
+    public void addFailData(List failData) {
+        synchronized (LOCK) {
+            this.failData.addAll(failData);
+        }
+    }
+
+    /**
+     * 线程安全添加集合
+     *
+     * @param successData
+     */
+    public void addSuccessData(List successData) {
+        synchronized (LOCK) {
+            this.successData.addAll(successData);
+        }
+    }
 }

+ 2 - 2
dbsyncer-common/src/main/java/org/dbsyncer/common/scheduled/ScheduledTaskServiceImpl.java

@@ -34,13 +34,13 @@ public class ScheduledTaskServiceImpl implements ScheduledTaskService, Disposabl
 
     @Override
     public void start(String key, String cron, ScheduledTaskJob job) {
-        logger.info("{}-[{}], Started task [{}]", key, cron, job.getClass().getName());
+        logger.info("[{}], Started task [{}]", cron, job.getClass().getName());
         apply(key, () -> taskScheduler.schedule(job, (trigger) -> new CronTrigger(cron).nextExecutionTime(trigger)));
     }
 
     @Override
     public void start(String key, long period, ScheduledTaskJob job) {
-        logger.info("[period={}], Started task [{}]", period, key);
+        logger.info("[period={}], Started task [{}]", period, job.getClass().getName());
         apply(key, () -> taskScheduler.scheduleAtFixedRate(job, period));
     }
 

+ 6 - 0
dbsyncer-connector/pom.xml

@@ -42,6 +42,12 @@
             <artifactId>mssql-jdbc</artifactId>
         </dependency>
 
+        <!-- postgresql -->
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+        </dependency>
+
         <!-- smartcn中文分词器 -->
         <dependency>
             <groupId>org.apache.lucene</groupId>

+ 0 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java

@@ -90,14 +90,6 @@ public interface Connector<M, C> {
      */
     Result writer(M connectorMapper, WriterBatchConfig config);
 
-    /**
-     * 写入目标源数据
-     *
-     * @param config
-     * @return
-     */
-    Result writer(M connectorMapper, WriterSingleConfig config);
-
     /**
      * 获取数据源同步参数
      *

+ 0 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -137,12 +137,6 @@ public class ConnectorFactory implements DisposableBean {
         return result;
     }
 
-    public Result writer(ConnectorMapper connectorMapper, WriterSingleConfig config) {
-        Result result = getConnector(connectorMapper).writer(connectorMapper, config);
-        Assert.notNull(result, "Connector writer single result can not null");
-        return result;
-    }
-
     public Connector getConnector(ConnectorMapper connectorMapper) {
         return getConnector(connectorMapper.getConnectorType());
     }

+ 15 - 10
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/DatabaseConfig.java

@@ -26,49 +26,47 @@ public class DatabaseConfig extends ConnectorConfig {
     // 通过SQL获取表信息
     private String sql;
 
+    // 构架名
+    private String schema;
+
     public String getDriverClassName() {
         return driverClassName;
     }
 
-    public DatabaseConfig setDriverClassName(String driverClassName) {
+    public void setDriverClassName(String driverClassName) {
         this.driverClassName = driverClassName;
-        return this;
     }
 
     public String getUrl() {
         return url;
     }
 
-    public DatabaseConfig setUrl(String url) {
+    public void setUrl(String url) {
         this.url = url;
-        return this;
     }
 
     public String getUsername() {
         return username;
     }
 
-    public DatabaseConfig setUsername(String username) {
+    public void setUsername(String username) {
         this.username = username;
-        return this;
     }
 
     public String getPassword() {
         return password;
     }
 
-    public DatabaseConfig setPassword(String password) {
+    public void setPassword(String password) {
         this.password = password;
-        return this;
     }
 
     public String getTable() {
         return table;
     }
 
-    public DatabaseConfig setTable(String table) {
+    public void setTable(String table) {
         this.table = table;
-        return this;
     }
 
     public String getSql() {
@@ -79,4 +77,11 @@ public class DatabaseConfig extends ConnectorConfig {
         this.sql = sql;
     }
 
+    public String getSchema() {
+        return schema;
+    }
+
+    public void setSchema(String schema) {
+        this.schema = schema;
+    }
 }

+ 0 - 22
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlServerDatabaseConfig.java

@@ -1,22 +0,0 @@
-package org.dbsyncer.connector.config;
-
-/**
- * SqlServer连接配置
- *
- * @author AE86
- * @version 1.0.0
- * @date 2022/1/10 23:57
- */
-public class SqlServerDatabaseConfig extends DatabaseConfig {
-
-    // 构架名
-    private String schema;
-
-    public String getSchema() {
-        return schema;
-    }
-
-    public void setSchema(String schema) {
-        this.schema = schema;
-    }
-}

+ 49 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterBatchConfig.java

@@ -3,26 +3,67 @@ package org.dbsyncer.connector.config;
 import java.util.List;
 import java.util.Map;
 
-public class WriterBatchConfig extends WriterConfig {
+public class WriterBatchConfig {
 
+    /**
+     * 表名
+     */
+    private String tableName;
+    /**
+     * 事件
+     */
+    private String event;
+    /**
+     * 执行命令
+     */
+    private Map<String, String> command;
+    /**
+     * 字段信息
+     */
+    private List<Field> fields;
     /**
      * 集合数据
      */
     private List<Map> data;
+    /**
+     * 强制更新
+     */
+    private boolean isForceUpdate;
 
-    public WriterBatchConfig(String event, Map<String, String> command, List<Field> fields, List<Map> data) {
-        setEvent(event);
-        setCommand(command);
-        setFields(fields);
+    public WriterBatchConfig(String tableName, String event, Map<String, String> command, List<Field> fields, List<Map> data) {
+        this(tableName, event, command, fields, data, false);
+    }
+
+    public WriterBatchConfig(String tableName, String event, Map<String, String> command, List<Field> fields, List<Map> data, boolean isForceUpdate) {
+        this.tableName = tableName;
+        this.event = event;
+        this.command = command;
+        this.fields = fields;
         this.data = data;
+        this.isForceUpdate = isForceUpdate;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+
+    public List<Field> getFields() {
+        return fields;
     }
 
     public List<Map> getData() {
         return data;
     }
 
-    public WriterBatchConfig setData(List<Map> data) {
-        this.data = data;
-        return this;
+    public boolean isForceUpdate() {
+        return isForceUpdate;
     }
 }

+ 0 - 47
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterConfig.java

@@ -1,47 +0,0 @@
-package org.dbsyncer.connector.config;
-
-import java.util.List;
-import java.util.Map;
-
-public class WriterConfig {
-
-    /**
-     * 事件
-     */
-    private String event;
-    /**
-     * 执行命令
-     */
-    private Map<String, String> command;
-    /**
-     * 字段信息
-     */
-    private List<Field> fields;
-
-    public String getEvent() {
-        return event;
-    }
-
-    public void setEvent(String event) {
-        this.event = event;
-    }
-
-    public Map<String, String> getCommand() {
-        return command;
-    }
-
-    public WriterConfig setCommand(Map<String, String> command) {
-        this.command = command;
-        return this;
-    }
-
-    public List<Field> getFields() {
-        return fields;
-    }
-
-    public WriterConfig setFields(List<Field> fields) {
-        this.fields = fields;
-        return this;
-    }
-
-}

+ 0 - 71
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/WriterSingleConfig.java

@@ -1,71 +0,0 @@
-package org.dbsyncer.connector.config;
-
-import java.util.List;
-import java.util.Map;
-
-public class WriterSingleConfig extends WriterConfig {
-
-    /**
-     * 行数据
-     */
-    private Map<String, Object> data;
-
-    /**
-     * 表名
-     */
-    private String table;
-
-    /**
-     * 重试标记
-     */
-    private boolean retry;
-
-    /**
-     * 更新失败转插入
-     */
-    private boolean forceUpdate;
-
-    public WriterSingleConfig(List<Field> fields, Map<String, String> command, String event, Map<String, Object> data, String table, boolean forceUpdate) {
-        setEvent(event);
-        setCommand(command);
-        setFields(fields);
-        this.data = data;
-        this.table = table;
-        this.forceUpdate = forceUpdate;
-    }
-
-    public Map<String, Object> getData() {
-        return data;
-    }
-
-    public WriterSingleConfig setData(Map<String, Object> data) {
-        this.data = data;
-        return this;
-    }
-
-    public String getTable() {
-        return table;
-    }
-
-    public WriterSingleConfig setTable(String table) {
-        this.table = table;
-        return this;
-    }
-
-    public boolean isRetry() {
-        return retry;
-    }
-
-    public void setRetry(boolean retry) {
-        this.retry = retry;
-    }
-
-    public boolean isForceUpdate() {
-        return forceUpdate;
-    }
-
-    public WriterSingleConfig setForceUpdate(boolean forceUpdate) {
-        this.forceUpdate = forceUpdate;
-        return this;
-    }
-}

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/constant/DatabaseConstant.java

@@ -30,4 +30,9 @@ public class DatabaseConstant {
      */
     public static final String SQLSERVER_PAGE_SQL = "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY %s) AS SQLSERVER_ROW_ID, * FROM (%s) S) A WHERE A.SQLSERVER_ROW_ID BETWEEN ? AND ?";
 
+    //*********************************** PostgreSQL **************************************//
+    /**
+     * PostgreSQL分页语句
+     */
+    public static final String POSTGRESQL_PAGE_SQL = " limit ? OFFSET ?";
 }

+ 144 - 215
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -26,7 +26,8 @@ import java.sql.*;
 import java.util.*;
 import java.util.stream.Collectors;
 
-public abstract class AbstractDatabaseConnector extends AbstractConnector implements Connector<DatabaseConnectorMapper, DatabaseConfig>, Database {
+public abstract class AbstractDatabaseConnector extends AbstractConnector
+        implements Connector<DatabaseConnectorMapper, DatabaseConfig>, Database {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -71,7 +72,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     @Override
     public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) {
         String quotation = buildSqlWithQuotation();
-        StringBuilder queryMetaSql = new StringBuilder("SELECT * FROM ").append(quotation).append(tableName).append(quotation).append(" WHERE 1 != 1");
+        StringBuilder queryMetaSql = new StringBuilder("SELECT * FROM ").append(quotation).append(tableName).append(quotation).append(
+                " WHERE 1 != 1");
         return connectorMapper.execute(databaseTemplate -> getMetaInfo(databaseTemplate, queryMetaSql.toString(), tableName));
     }
 
@@ -95,10 +97,11 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         Collections.addAll(config.getArgs(), getPageArgs(config.getPageIndex(), config.getPageSize()));
 
         // 3、执行SQL
-        List<Map<String, Object>> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
+        List<Map<String, Object>> list = connectorMapper.execute(
+                databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
 
         // 4、返回结果集
-        return new Result(new ArrayList<>(list));
+        return new Result(list);
     }
 
     @Override
@@ -127,98 +130,39 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             fields.add(pkField);
         }
 
-        final int size = data.size();
-        final int fSize = fields.size();
         Result result = new Result();
-        try {
-            // 2、设置参数
-            connectorMapper.execute(databaseTemplate -> {
-                databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
-                    @Override
-                    public void setValues(PreparedStatement preparedStatement, int i) {
-                        batchRowsSetter(databaseTemplate.getConnection(), preparedStatement, fields, fSize, data.get(i));
-                    }
-
-                    @Override
-                    public int getBatchSize() {
-                        return size;
-                    }
-                });
-                return true;
-            });
-        } catch (Exception e) {
-            // 记录错误数据
-            result.getFail().set(size);
-            result.getError().append(e.getMessage()).append(System.lineSeparator());
-            logger.error(e.getMessage());
-        }
-        return result;
-    }
-
-    @Override
-    public Result writer(DatabaseConnectorMapper connectorMapper, WriterSingleConfig config) {
-        String event = config.getEvent();
-        List<Field> fields = config.getFields();
-        Map<String, Object> data = config.getData();
-        // 1、获取 SQL
-        String sql = config.getCommand().get(event);
-        Assert.hasText(sql, "执行语句不能为空.");
-        if (CollectionUtils.isEmpty(data) || CollectionUtils.isEmpty(fields)) {
-            logger.error("writer data can not be empty.");
-            throw new ConnectorException("writer data can not be empty.");
-        }
-
-        Field pkField = getPrimaryKeyField(fields);
-        // Update / Delete
-        if (!isInsert(event)) {
-            if (isDelete(event)) {
-                fields.clear();
-            }
-            fields.add(pkField);
-        }
-
-        int size = fields.size();
-        Result result = new Result();
-        int execute = 0;
+        int[] execute = null;
         try {
             // 2、设置参数
             execute = connectorMapper.execute(databaseTemplate ->
-                    databaseTemplate.update(sql, (ps) ->
-                            batchRowsSetter(databaseTemplate.getConnection(), ps, fields, size, data)
-                    )
+                    databaseTemplate.batchUpdate(executeSql, new BatchPreparedStatementSetter() {
+                        @Override
+                        public void setValues(PreparedStatement preparedStatement, int i) {
+                            batchRowsSetter(databaseTemplate.getConnection(), preparedStatement, fields, data.get(i));
+                        }
+
+                        @Override
+                        public int getBatchSize() {
+                            return data.size();
+                        }
+                    })
             );
         } catch (Exception e) {
-            // 记录错误数据
             if (!config.isForceUpdate()) {
-                result.getFailData().add(data);
-                result.getFail().set(1);
-                result.getError().append("SQL:").append(sql).append(System.lineSeparator())
-                        .append("DATA:").append(data).append(System.lineSeparator())
-                        .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
-                logger.error("SQL:{}, DATA:{}, ERROR:{}", sql, data, e.getMessage());
+                result.addFailData(data);
+                result.getError().append(e.getMessage());
             }
         }
 
-        if (0 == execute && !config.isRetry() && null != pkField) {
-            // 不存在转insert
-            if (isUpdate(event)) {
-                String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
-                if (!existRow(connectorMapper, queryCount, data.get(pkField.getName()))) {
-                    fields.remove(fields.size() - 1);
-                    config.setEvent(ConnectorConstant.OPERTION_INSERT);
-                    config.setRetry(true);
-                    logger.warn("{}表执行{}失败, 尝试执行{}, {}", config.getTable(), event, config.getEvent(), data);
-                    return writer(connectorMapper, config);
+        if (null != execute) {
+            int batchSize = execute.length;
+            for (int i = 0; i < batchSize; i++) {
+                if (execute[i] == 0) {
+                    forceUpdate(result, connectorMapper, config, pkField, data.get(i));
+                    continue;
                 }
-                return result;
+                result.getSuccessData().add(data.get(i));
             }
-            // 存在转update
-            if (isInsert(event)) {
-                config.setEvent(ConnectorConstant.OPERTION_UPDATE);
-                config.setRetry(true);
-                return writer(connectorMapper, config);
-            }
-
         }
         return result;
     }
@@ -278,74 +222,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return map;
     }
 
-    /**
-     * 获取DQL表信息
-     *
-     * @param config
-     * @return
-     */
-    protected List<Table> getDqlTable(DatabaseConnectorMapper config) {
-        MetaInfo metaInfo = getDqlMetaInfo(config);
-        Assert.notNull(metaInfo, "SQL解析异常.");
-        DatabaseConfig cfg = config.getConfig();
-        List<Table> tables = new ArrayList<>();
-        tables.add(new Table(cfg.getSql()));
-        return tables;
-    }
-
-    /**
-     * 获取DQL元信息
-     *
-     * @param config
-     * @return
-     */
-    protected MetaInfo getDqlMetaInfo(DatabaseConnectorMapper config) {
-        DatabaseConfig cfg = config.getConfig();
-        String sql = cfg.getSql().toUpperCase();
-        String queryMetaSql = StringUtil.contains(sql, " WHERE ") ? sql + " AND 1!=1 " : sql + " WHERE 1!=1 ";
-        return config.execute(databaseTemplate -> getMetaInfo(databaseTemplate, queryMetaSql, cfg.getTable()));
-    }
-
-    /**
-     * 获取DQL源配置
-     *
-     * @param commandConfig
-     * @param appendGroupByPK
-     * @return
-     */
-    protected Map<String, String> getDqlSourceCommand(CommandConfig commandConfig, boolean appendGroupByPK) {
-        // 获取过滤SQL
-        List<Filter> filter = commandConfig.getFilter();
-        String queryFilterSql = getQueryFilterSql(filter);
-
-        // 获取查询SQL
-        Table table = commandConfig.getTable();
-        Map<String, String> map = new HashMap<>();
-        String querySql = table.getName();
-
-        // 存在条件
-        if (StringUtil.isNotBlank(queryFilterSql)) {
-            querySql += queryFilterSql;
-        }
-        String quotation = buildSqlWithQuotation();
-        String pk = findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
-        map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(new PageSqlConfig(querySql, pk)));
-
-        // 获取查询总数SQL
-        StringBuilder queryCount = new StringBuilder();
-        queryCount.append("SELECT COUNT(1) FROM (").append(table.getName());
-        if (StringUtil.isNotBlank(queryFilterSql)) {
-            queryCount.append(queryFilterSql);
-        }
-        // Mysql
-        if (appendGroupByPK) {
-            queryCount.append(" GROUP BY ").append(pk);
-        }
-        queryCount.append(") DBSYNCER_T");
-        map.put(ConnectorConstant.OPERTION_QUERY_COUNT, queryCount.toString());
-        return map;
-    }
-
     /**
      * 查询语句表名和字段带上引号(默认不加)
      *
@@ -452,67 +328,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return "select 1";
     }
 
-    /**
-     * 根据过滤条件获取查询SQL
-     *
-     * @param queryOperator and/or
-     * @param filter
-     * @return
-     */
-    private String getFilterSql(String queryOperator, List<Filter> filter) {
-        List<Filter> list = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), queryOperator)).collect(Collectors.toList());
-        if (CollectionUtils.isEmpty(list)) {
-            return "";
-        }
-
-        int size = list.size();
-        int end = size - 1;
-        StringBuilder sql = new StringBuilder();
-        sql.append("(");
-        Filter c = null;
-        String quotation = buildSqlWithQuotation();
-        for (int i = 0; i < size; i++) {
-            c = list.get(i);
-            // "USER" = 'zhangsan'
-            sql.append(quotation).append(c.getName()).append(quotation).append(c.getFilter()).append("'").append(c.getValue()).append("'");
-            if (i < end) {
-                sql.append(" ").append(queryOperator).append(" ");
-            }
-        }
-        sql.append(")");
-        return sql.toString();
-    }
-
-    /**
-     * @param connection 连接
-     * @param ps         参数构造器
-     * @param fields     同步字段,例如[{name=ID, type=4}, {name=NAME, type=12}]
-     * @param fSize      同步字段个数
-     * @param row        同步字段对应的值,例如{ID=123, NAME=张三11}
-     */
-    private void batchRowsSetter(Connection connection, PreparedStatement ps, List<Field> fields, int fSize, Map row) {
-        Field f = null;
-        int type;
-        Object val = null;
-        for (int i = 0; i < fSize; i++) {
-            // 取出字段和对应值
-            f = fields.get(i);
-            type = f.getType();
-            val = row.get(f.getName());
-            SetterEnum.getSetter(type).set(connection, ps, i + 1, type, val);
-        }
-    }
-
-    private boolean existRow(DatabaseConnectorMapper connectorMapper, String sql, Object value) {
-        int rowNum = 0;
-        try {
-            rowNum = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, new Object[]{value}, Integer.class));
-        } catch (Exception e) {
-            logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, value);
-        }
-        return rowNum > 0;
-    }
-
     /**
      * 获取数据库表元数据信息
      *
@@ -521,7 +336,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
      * @param tableName        表名
      * @return
      */
-    private MetaInfo getMetaInfo(DatabaseTemplate databaseTemplate, String metaSql, String tableName) throws SQLException {
+    protected MetaInfo getMetaInfo(DatabaseTemplate databaseTemplate, String metaSql, String tableName) throws SQLException {
         SqlRowSet sqlRowSet = databaseTemplate.queryForRowSet(metaSql);
         ResultSetWrappingSqlRowSet rowSet = (ResultSetWrappingSqlRowSet) sqlRowSet;
         SqlRowSetMetaData metaData = rowSet.getMetaData();
@@ -566,7 +381,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
      * @param quotation
      * @return
      */
-    private String findTablePrimaryKey(Table table, String quotation) {
+    protected String findTablePrimaryKey(Table table, String quotation) {
         if (null != table) {
             List<Field> column = table.getColumn();
             if (!CollectionUtils.isEmpty(column)) {
@@ -583,6 +398,120 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return "";
     }
 
+    /**
+     * 根据过滤条件获取查询SQL
+     *
+     * @param queryOperator and/or
+     * @param filter
+     * @return
+     */
+    private String getFilterSql(String queryOperator, List<Filter> filter) {
+        List<Filter> list = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), queryOperator)).collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(list)) {
+            return "";
+        }
+
+        int size = list.size();
+        int end = size - 1;
+        StringBuilder sql = new StringBuilder();
+        sql.append("(");
+        Filter c = null;
+        String quotation = buildSqlWithQuotation();
+        for (int i = 0; i < size; i++) {
+            c = list.get(i);
+            // "USER" = 'zhangsan'
+            sql.append(quotation).append(c.getName()).append(quotation).append(c.getFilter()).append("'").append(c.getValue()).append("'");
+            if (i < end) {
+                sql.append(" ").append(queryOperator).append(" ");
+            }
+        }
+        sql.append(")");
+        return sql.toString();
+    }
+
+    /**
+     * @param connection 连接
+     * @param ps         参数构造器
+     * @param fields     同步字段,例如[{name=ID, type=4}, {name=NAME, type=12}]
+     * @param row        同步字段对应的值,例如{ID=123, NAME=张三11}
+     */
+    private void batchRowsSetter(Connection connection, PreparedStatement ps, List<Field> fields, Map row) {
+        Field f = null;
+        int type;
+        Object val = null;
+        int size = fields.size();
+        for (int i = 0; i < size; i++) {
+            // 取出字段和对应值
+            f = fields.get(i);
+            type = f.getType();
+            val = row.get(f.getName());
+            SetterEnum.getSetter(type).set(connection, ps, i + 1, type, val);
+        }
+    }
+
+    private void forceUpdate(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField,
+                             Map row) {
+        // 不存在转insert
+        if (isUpdate(config.getEvent())) {
+            String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
+            if (!existRow(connectorMapper, queryCount, row.get(pkField.getName()))) {
+                logger.warn("{}表执行{}失败, 尝试执行{}, {}", config.getTableName(), config.getEvent(), ConnectorConstant.OPERTION_INSERT, row);
+                writer(result, connectorMapper, config, pkField, row, ConnectorConstant.OPERTION_INSERT);
+            }
+            return;
+        }
+
+        // 存在转update
+        if (isInsert(config.getEvent())) {
+            logger.warn("{}表执行{}失败, 尝试执行{}, {}", config.getTableName(), config.getEvent(), ConnectorConstant.OPERTION_UPDATE, row);
+            writer(result, connectorMapper, config, pkField, row, ConnectorConstant.OPERTION_UPDATE);
+        }
+    }
+
+    private void writer(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField, Map row,
+                        String event) {
+        // 1、获取 SQL
+        String sql = config.getCommand().get(event);
+
+        List<Field> fields = new ArrayList<>(config.getFields());
+        // Update / Delete
+        if (!isInsert(event)) {
+            if (isDelete(event)) {
+                fields.clear();
+            }
+            fields.add(pkField);
+        }
+
+        try {
+            // 2、设置参数
+            int execute = connectorMapper.execute(databaseTemplate ->
+                    databaseTemplate.update(sql, (ps) ->
+                            batchRowsSetter(databaseTemplate.getConnection(), ps, fields, row)
+                    )
+            );
+            if (execute == 0) {
+                throw new ConnectorException(String.format("尝试执行[%s]失败", event));
+            }
+            result.getSuccessData().add(row);
+        } catch (Exception e) {
+            result.getFailData().add(row);
+            result.getError().append("SQL:").append(sql).append(System.lineSeparator())
+                    .append("DATA:").append(row).append(System.lineSeparator())
+                    .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
+            logger.error("执行{}失败: {}, DATA:{}", event, e.getMessage(), row);
+        }
+    }
+
+    private boolean existRow(DatabaseConnectorMapper connectorMapper, String sql, Object value) {
+        int rowNum = 0;
+        try {
+            rowNum = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, new Object[]{value}, Integer.class));
+        } catch (Exception e) {
+            logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, value);
+        }
+        return rowNum > 0;
+    }
+
     private boolean isPk(Map<String, List<String>> tables, String tableName, String name) {
         List<String> pk = tables.get(tableName);
         return !CollectionUtils.isEmpty(pk) && pk.contains(name);

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/DatabaseConnectorMapper.java

@@ -18,7 +18,7 @@ public class DatabaseConnectorMapper implements ConnectorMapper<DatabaseConfig,
 
     public DatabaseConnectorMapper(DatabaseConfig config) {
         this.config = config;
-        this.dataSource = new SimpleDataSource(config.getUrl(), config.getUsername(), config.getPassword());
+        this.dataSource = new SimpleDataSource(config.getDriverClassName(), config.getUrl(), config.getUsername(), config.getPassword());
     }
 
     public <T> T execute(HandleCallback callback) {

+ 4 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleDataSource.java

@@ -16,11 +16,13 @@ public class SimpleDataSource implements DataSource, AutoCloseable {
 
     private final BlockingQueue<SimpleConnection> pool = new LinkedBlockingQueue<>(2000);
     private long lifeTime = 60 * 1000;
+    private String driverClassName;
     private String url;
     private String username;
     private String password;
 
-    public SimpleDataSource(String url, String username, String password) {
+    public SimpleDataSource(String driverClassName, String url, String username, String password) {
+        this.driverClassName = driverClassName;
         this.url = url;
         this.username = username;
         this.password = password;
@@ -93,7 +95,7 @@ public class SimpleDataSource implements DataSource, AutoCloseable {
     }
 
     private SimpleConnection createConnection() throws SQLException {
-        return new SimpleConnection(this, DatabaseUtil.getConnection(url, username, password));
+        return new SimpleConnection(this, DatabaseUtil.getConnection(driverClassName, url, username, password));
     }
 
     public BlockingQueue<SimpleConnection> getPool() {

+ 15 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/ConnectorEnum.java

@@ -3,13 +3,17 @@ package org.dbsyncer.connector.enums;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
-import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.ESConfig;
+import org.dbsyncer.connector.config.KafkaConfig;
 import org.dbsyncer.connector.es.ESConnector;
 import org.dbsyncer.connector.kafka.KafkaConnector;
 import org.dbsyncer.connector.mysql.MysqlConnector;
 import org.dbsyncer.connector.oracle.OracleConnector;
+import org.dbsyncer.connector.postgresql.PostgreSQLConnector;
 import org.dbsyncer.connector.sql.DQLMysqlConnector;
 import org.dbsyncer.connector.sql.DQLOracleConnector;
+import org.dbsyncer.connector.sql.DQLPostgreSQLConnector;
 import org.dbsyncer.connector.sql.DQLSqlServerConnector;
 import org.dbsyncer.connector.sqlserver.SqlServerConnector;
 
@@ -33,7 +37,11 @@ public enum ConnectorEnum {
     /**
      * SqlServer 连接器
      */
-    SQL_SERVER("SqlServer", new SqlServerConnector(), SqlServerDatabaseConfig.class),
+    SQL_SERVER("SqlServer", new SqlServerConnector(), DatabaseConfig.class),
+    /**
+     * PostgreSQL 连接器
+     */
+    POSTGRE_SQL("PostgreSQL", new PostgreSQLConnector(), DatabaseConfig.class),
     /**
      * Elasticsearch 连接器
      */
@@ -53,7 +61,11 @@ public enum ConnectorEnum {
     /**
      * DqlSqlServer 连接器
      */
-    DQL_SQL_SERVER("DqlSqlServer", new DQLSqlServerConnector(), SqlServerDatabaseConfig.class);
+    DQL_SQL_SERVER("DqlSqlServer", new DQLSqlServerConnector(), DatabaseConfig.class),
+    /**
+     * DqlPostgreSQL 连接器
+     */
+    DQL_POSTGRE_SQL("DqlPostgreSQL", new DQLPostgreSQLConnector(), DatabaseConfig.class);
 
     // 连接器名称
     private String type;

+ 23 - 71
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -14,7 +14,6 @@ import org.dbsyncer.connector.enums.ESFieldTypeEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.util.ESUtil;
-import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
@@ -160,7 +159,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             for (SearchHit hit : searchHits) {
                 list.add(hit.getSourceAsMap());
             }
-            return new Result(new ArrayList<>(list));
+            return new Result(list);
         } catch (IOException e) {
             logger.error(e.getMessage());
             throw new ConnectorException(e.getMessage());
@@ -175,71 +174,29 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             throw new ConnectorException("writer data can not be empty.");
         }
 
-        Result result = new Result();
+        final Result result = new Result();
         final ESConfig cfg = connectorMapper.getConfig();
-        Field pkField = getPrimaryKeyField(config.getFields());
+        final Field pkField = getPrimaryKeyField(config.getFields());
+        final String primaryKeyName = pkField.getName();
         try {
             BulkRequest request = new BulkRequest();
-            data.forEach(row -> {
-                IndexRequest r = new IndexRequest(cfg.getIndex(), cfg.getType(), String.valueOf(row.get(pkField.getName())));
-                r.source(row, XContentType.JSON);
-                request.add(r);
-            });
+            data.forEach(row -> addRequest(request, cfg.getIndex(), cfg.getType(), config.getEvent(), String.valueOf(row.get(primaryKeyName)), row));
 
             BulkResponse response = connectorMapper.getConnection().bulk(request, RequestOptions.DEFAULT);
             RestStatus restStatus = response.status();
             if (restStatus.getStatus() != RestStatus.OK.getStatus()) {
                 throw new ConnectorException(String.format("error code:%s", restStatus.getStatus()));
             }
+            result.addSuccessData(data);
         } catch (Exception e) {
             // 记录错误数据
-            result.getFail().set(data.size());
+            result.addFailData(data);
             result.getError().append(e.getMessage()).append(System.lineSeparator());
             logger.error(e.getMessage());
         }
         return result;
     }
 
-    @Override
-    public Result writer(ESConnectorMapper connectorMapper, WriterSingleConfig config) {
-        Map<String, Object> data = config.getData();
-        Field pkField = getPrimaryKeyField(config.getFields());
-        String pk = String.valueOf(data.get(pkField.getName()));
-
-        if (isUpdate(config.getEvent())) {
-            return execute(connectorMapper, data, pk, (index, type, id) -> {
-                try {
-                    UpdateRequest request = new UpdateRequest(index, type, id);
-                    request.doc(data, XContentType.JSON);
-                    connectorMapper.getConnection().update(request, RequestOptions.DEFAULT);
-                } catch (ElasticsearchStatusException e) {
-                    // 数据不存在则写入
-                    if (RestStatus.NOT_FOUND.getStatus() == e.status().getStatus()) {
-                        IndexRequest r = new IndexRequest(index, type, id);
-                        r.source(data, XContentType.JSON);
-                        connectorMapper.getConnection().index(r, RequestOptions.DEFAULT);
-                        return;
-                    }
-                    throw new ConnectorException(e.getMessage());
-                }
-            });
-        }
-        if (isInsert(config.getEvent())) {
-            return execute(connectorMapper, data, pk, (index, type, id) -> {
-                IndexRequest request = new IndexRequest(index, type, id);
-                request.source(data, XContentType.JSON);
-                connectorMapper.getConnection().index(request, RequestOptions.DEFAULT);
-            });
-        }
-        if (isDelete(config.getEvent())) {
-            return execute(connectorMapper, data, pk, (index, type, id) ->
-                    connectorMapper.getConnection().delete(new DeleteRequest(index, type, id), RequestOptions.DEFAULT)
-            );
-        }
-
-        throw new ConnectorException(String.format("Unsupported event: %s", config.getEvent()));
-    }
-
     @Override
     public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
         Map<String, String> command = new HashMap<>();
@@ -262,7 +219,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
     @Override
     public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
         Table table = commandConfig.getTable();
-        if(!CollectionUtils.isEmpty(table.getColumn())){
+        if (!CollectionUtils.isEmpty(table.getColumn())) {
             getPrimaryKeyField(table.getColumn());
         }
         return Collections.EMPTY_MAP;
@@ -323,27 +280,22 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         }
     }
 
-    private Result execute(ESConnectorMapper connectorMapper, Map<String, Object> data, String id, RequestMapper mapper) {
-        Result result = new Result();
-        final ESConfig config = connectorMapper.getConfig();
-        try {
-            mapper.apply(config.getIndex(), config.getType(), id);
-        } catch (Exception e) {
-            // 记录错误数据
-            result.getFailData().add(data);
-            result.getFail().set(1);
-            result.getError().append("INDEX:").append(config.getIndex()).append(System.lineSeparator())
-                    .append("TYPE:").append(config.getType()).append(System.lineSeparator())
-                    .append("ID:").append(id).append(System.lineSeparator())
-                    .append("DATA:").append(data).append(System.lineSeparator())
-                    .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
-            logger.error("INDEX:{}, TYPE:{}, ID:{}, DATA:{}, ERROR:{}", config.getIndex(), config.getType(), id, data, e.getMessage());
+    private void addRequest(BulkRequest request, String index, String type, String event, String id, Map data) {
+        if (isUpdate(event)) {
+            UpdateRequest req = new UpdateRequest(index, type, id);
+            req.doc(data, XContentType.JSON);
+            request.add(req);
+            return;
+        }
+        if (isInsert(event)) {
+            IndexRequest req = new IndexRequest(index, type, id);
+            req.source(data, XContentType.JSON);
+            request.add(req);
+            return;
+        }
+        if (isDelete(event)) {
+            request.add(new DeleteRequest(index, type, id));
         }
-        return result;
-    }
-
-    private interface RequestMapper {
-        void apply(String index, String type, String id) throws IOException;
     }
 
     private interface FilterMapper {

+ 2 - 19
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -83,27 +83,10 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
             String topic = cfg.getTopic();
             String pk = pkField.getName();
             data.forEach(row -> connectorMapper.getConnection().send(topic, String.valueOf(row.get(pk)), row));
+            result.addSuccessData(data);
         } catch (Exception e) {
             // 记录错误数据
-            result.getFail().set(data.size());
-            result.getError().append(e.getMessage()).append(System.lineSeparator());
-            logger.error(e.getMessage());
-        }
-        return result;
-    }
-
-    @Override
-    public Result writer(KafkaConnectorMapper connectorMapper, WriterSingleConfig config) {
-        Map<String, Object> data = config.getData();
-        Result result = new Result();
-        final KafkaConfig cfg = connectorMapper.getConfig();
-        Field pkField = getPrimaryKeyField(config.getFields());
-        try {
-            connectorMapper.getConnection().send(cfg.getTopic(), String.valueOf(data.get(pkField.getName())), data);
-        } catch (Exception e) {
-            // 记录错误数据
-            result.getFailData().add(data);
-            result.getFail().set(data.size());
+            result.addFailData(data);
             result.getError().append(e.getMessage()).append(System.lineSeparator());
             logger.error(e.getMessage());
         }

+ 55 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -0,0 +1,55 @@
+package org.dbsyncer.connector.postgresql;
+
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.PageSqlConfig;
+import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.constant.DatabaseConstant;
+import org.dbsyncer.connector.database.AbstractDatabaseConnector;
+import org.dbsyncer.connector.database.DatabaseConnectorMapper;
+import org.dbsyncer.connector.enums.TableTypeEnum;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public final class PostgreSQLConnector extends AbstractDatabaseConnector {
+
+    @Override
+    protected String getTableSql(DatabaseConfig config) {
+        return String.format("SELECT TABLENAME FROM PG_TABLES WHERE SCHEMANAME ='%s'", config.getSchema());
+    }
+
+    @Override
+    public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
+        List<Table> list = new LinkedList<>();
+        DatabaseConfig config = connectorMapper.getConfig();
+        List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(getTableSql(config), String.class));
+        if (!CollectionUtils.isEmpty(tableNames)) {
+            tableNames.forEach(name -> list.add(new Table(name, TableTypeEnum.TABLE.getCode())));
+        }
+        List<String> tableViewNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(getTableViewSql(config), String.class));
+        if (!CollectionUtils.isEmpty(tableViewNames)) {
+            tableViewNames.forEach(name -> list.add(new Table(name, TableTypeEnum.VIEW.getCode())));
+        }
+        return list;
+    }
+
+    @Override
+    public String getPageSql(PageSqlConfig config) {
+        return config.getQuerySql() + DatabaseConstant.POSTGRESQL_PAGE_SQL;
+    }
+
+    @Override
+    public Object[] getPageArgs(int pageIndex, int pageSize) {
+        return new Object[]{pageSize, (pageIndex - 1) * pageSize};
+    }
+
+    @Override
+    protected String buildSqlWithQuotation() {
+        return "\"";
+    }
+
+    private String getTableViewSql(DatabaseConfig config) {
+        return String.format("SELECT VIEWNAME FROM PG_VIEWS WHERE SCHEMANAME ='%s'", config.getSchema());
+    }
+}

+ 88 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java

@@ -0,0 +1,88 @@
+package org.dbsyncer.connector.sql;
+
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.ConnectorException;
+import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.database.AbstractDatabaseConnector;
+import org.dbsyncer.connector.database.DatabaseConnectorMapper;
+import org.dbsyncer.connector.enums.SqlBuilderEnum;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/4/6 22:16
+ */
+public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
+
+    @Override
+    protected String getTableSql(DatabaseConfig config) {
+        throw new ConnectorException("Unsupported method.");
+    }
+
+    @Override
+    public List<Table> getTable(DatabaseConnectorMapper config) {
+        DatabaseConfig cfg = config.getConfig();
+        List<Table> tables = new ArrayList<>();
+        tables.add(new Table(cfg.getSql()));
+        return tables;
+    }
+
+    @Override
+    public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
+        return getDqlSourceCommand(commandConfig, false);
+    }
+
+    @Override
+    public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) {
+        DatabaseConfig cfg = connectorMapper.getConfig();
+        String sql = cfg.getSql().toUpperCase();
+        String queryMetaSql = StringUtil.contains(sql, " WHERE ") ? sql + " AND 1!=1 " : sql + " WHERE 1!=1 ";
+        return connectorMapper.execute(databaseTemplate -> super.getMetaInfo(databaseTemplate, queryMetaSql, cfg.getTable()));
+    }
+
+    /**
+     * 获取DQL源配置
+     *
+     * @param commandConfig
+     * @param appendGroupByPK
+     * @return
+     */
+    protected Map<String, String> getDqlSourceCommand(CommandConfig commandConfig, boolean appendGroupByPK) {
+        // 获取过滤SQL
+        List<Filter> filter = commandConfig.getFilter();
+        String queryFilterSql = getQueryFilterSql(filter);
+
+        // 获取查询SQL
+        Table table = commandConfig.getTable();
+        Map<String, String> map = new HashMap<>();
+        String querySql = table.getName();
+
+        // 存在条件
+        if (StringUtil.isNotBlank(queryFilterSql)) {
+            querySql += queryFilterSql;
+        }
+        String quotation = buildSqlWithQuotation();
+        String pk = findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
+        map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(new PageSqlConfig(querySql, pk)));
+
+        // 获取查询总数SQL
+        StringBuilder queryCount = new StringBuilder();
+        queryCount.append("SELECT COUNT(1) FROM (").append(table.getName());
+        if (StringUtil.isNotBlank(queryFilterSql)) {
+            queryCount.append(queryFilterSql);
+        }
+        // Mysql
+        if (appendGroupByPK) {
+            queryCount.append(" GROUP BY ").append(pk);
+        }
+        queryCount.append(") DBSYNCER_T");
+        map.put(ConnectorConstant.OPERTION_QUERY_COUNT, queryCount.toString());
+        return map;
+    }
+}

+ 3 - 20
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLMysqlConnector.java

@@ -1,19 +1,12 @@
 package org.dbsyncer.connector.sql;
 
-import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.config.CommandConfig;
+import org.dbsyncer.connector.config.PageSqlConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
-import org.dbsyncer.connector.database.AbstractDatabaseConnector;
-import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 
-import java.util.List;
 import java.util.Map;
 
-public final class DQLMysqlConnector extends AbstractDatabaseConnector {
-
-    @Override
-    protected String getTableSql(DatabaseConfig config) {
-        return "show tables";
-    }
+public final class DQLMysqlConnector extends AbstractDQLConnector {
 
     @Override
     public String getPageSql(PageSqlConfig config) {
@@ -25,16 +18,6 @@ public final class DQLMysqlConnector extends AbstractDatabaseConnector {
         return new Object[]{(pageIndex - 1) * pageSize, pageSize};
     }
 
-    @Override
-    public List<Table> getTable(DatabaseConnectorMapper config) {
-        return super.getDqlTable(config);
-    }
-
-    @Override
-    public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) {
-        return super.getDqlMetaInfo(connectorMapper);
-    }
-
     @Override
     public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
         return super.getDqlSourceCommand(commandConfig, true);

+ 3 - 28
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java

@@ -1,19 +1,9 @@
 package org.dbsyncer.connector.sql;
 
-import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.config.PageSqlConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
-import org.dbsyncer.connector.database.AbstractDatabaseConnector;
-import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 
-import java.util.List;
-import java.util.Map;
-
-public final class DQLOracleConnector extends AbstractDatabaseConnector {
-
-    @Override
-    protected String getTableSql(DatabaseConfig config) {
-        return "SELECT TABLE_NAME,TABLE_TYPE FROM USER_TAB_COMMENTS";
-    }
+public final class DQLOracleConnector extends AbstractDQLConnector {
 
     @Override
     public String getPageSql(PageSqlConfig config) {
@@ -22,22 +12,7 @@ public final class DQLOracleConnector extends AbstractDatabaseConnector {
 
     @Override
     public Object[] getPageArgs(int pageIndex, int pageSize) {
-        return new Object[] {pageIndex * pageSize, (pageIndex - 1) * pageSize};
-    }
-
-    @Override
-    public List<Table> getTable(DatabaseConnectorMapper config) {
-        return super.getDqlTable(config);
-    }
-
-    @Override
-    public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) {
-        return super.getDqlMetaInfo(connectorMapper);
-    }
-
-    @Override
-    public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
-        return super.getDqlSourceCommand(commandConfig, false);
+        return new Object[]{pageIndex * pageSize, (pageIndex - 1) * pageSize};
     }
 
     @Override

+ 22 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLPostgreSQLConnector.java

@@ -0,0 +1,22 @@
+package org.dbsyncer.connector.sql;
+
+import org.dbsyncer.connector.config.PageSqlConfig;
+import org.dbsyncer.connector.constant.DatabaseConstant;
+
+public final class DQLPostgreSQLConnector extends AbstractDQLConnector {
+
+    @Override
+    public String getPageSql(PageSqlConfig config) {
+        return config.getQuerySql() + DatabaseConstant.POSTGRESQL_PAGE_SQL;
+    }
+
+    @Override
+    public Object[] getPageArgs(int pageIndex, int pageSize) {
+        return new Object[]{pageSize, (pageIndex - 1) * pageSize};
+    }
+
+    @Override
+    protected String buildSqlWithQuotation() {
+        return "\"";
+    }
+}

+ 3 - 28
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java

@@ -3,18 +3,14 @@ package org.dbsyncer.connector.sql;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.PageSqlConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
-import org.dbsyncer.connector.database.AbstractDatabaseConnector;
-import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.sqlserver.SqlServerConnectorMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.Map;
-
-public final class DQLSqlServerConnector extends AbstractDatabaseConnector {
+public final class DQLSqlServerConnector extends AbstractDQLConnector {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -28,12 +24,6 @@ public final class DQLSqlServerConnector extends AbstractDatabaseConnector {
         }
     }
 
-    @Override
-    protected String getTableSql(DatabaseConfig config) {
-        SqlServerDatabaseConfig cfg = (SqlServerDatabaseConfig) config;
-        return String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s')", cfg.getSchema());
-    }
-
     @Override
     public String getPageSql(PageSqlConfig config) {
         if (StringUtil.isBlank(config.getPk())) {
@@ -48,19 +38,4 @@ public final class DQLSqlServerConnector extends AbstractDatabaseConnector {
         return new Object[]{(pageIndex - 1) * pageSize + 1, pageIndex * pageSize};
     }
 
-    @Override
-    public List<Table> getTable(DatabaseConnectorMapper config) {
-        return super.getDqlTable(config);
-    }
-
-    @Override
-    public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) {
-        return super.getDqlMetaInfo(connectorMapper);
-    }
-
-    @Override
-    public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
-        return super.getDqlSourceCommand(commandConfig, false);
-    }
-
 }

+ 0 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServer.java

@@ -1,5 +0,0 @@
-package org.dbsyncer.connector.sqlserver;
-
-public interface SqlServer {
-
-}

+ 7 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -3,7 +3,10 @@ package org.dbsyncer.connector.sqlserver;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.config.CommandConfig;
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.PageSqlConfig;
+import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
@@ -13,7 +16,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 
-public final class SqlServerConnector extends AbstractDatabaseConnector implements SqlServer {
+public final class SqlServerConnector extends AbstractDatabaseConnector {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -29,8 +32,7 @@ public final class SqlServerConnector extends AbstractDatabaseConnector implemen
 
     @Override
     protected String getTableSql(DatabaseConfig config) {
-        SqlServerDatabaseConfig cfg = (SqlServerDatabaseConfig) config;
-        return String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s') AND IS_MS_SHIPPED = 0", cfg.getSchema());
+        return String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s') AND IS_MS_SHIPPED = 0", config.getSchema());
     }
 
     @Override
@@ -64,7 +66,7 @@ public final class SqlServerConnector extends AbstractDatabaseConnector implemen
         if (StringUtil.isNotBlank(queryFilterSql)) {
             queryCount.append("SELECT COUNT(*) FROM ").append(table.getName()).append(queryFilterSql);
         } else {
-            SqlServerDatabaseConfig cfg = (SqlServerDatabaseConfig) commandConfig.getConnectorConfig();
+            DatabaseConfig cfg = (DatabaseConfig) commandConfig.getConnectorConfig();
             // 从存储过程查询(定时更新总数,可能存在误差)
             queryCount.append("SELECT ROWS FROM SYSINDEXES WHERE ID = OBJECT_ID('").append(cfg.getSchema()).append(".").append(table.getName()).append(
                     "') AND INDID IN (0, 1)");

+ 8 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.connector.util;
 
+import org.dbsyncer.connector.ConnectorException;
+
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -9,7 +11,12 @@ public abstract class DatabaseUtil {
     private DatabaseUtil() {
     }
 
-    public static Connection getConnection(String url, String username, String password) throws SQLException {
+    public static Connection getConnection(String driverClassName, String url, String username, String password) throws SQLException {
+        try {
+            Class.forName(driverClassName);
+        } catch (ClassNotFoundException e) {
+            throw new ConnectorException(e.getCause());
+        }
         return DriverManager.getConnection(url, username, password);
     }
 

+ 25 - 8
dbsyncer-listener/src/main/java/org/dbsyncer/listener/mysql/BinaryLogRemoteClient.java

@@ -19,6 +19,7 @@ import javax.net.ssl.X509TrustManager;
 import java.io.EOFException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
 import java.security.GeneralSecurityException;
@@ -36,7 +37,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
 
         @Override
         protected void initSSLContext(SSLContext sc) throws GeneralSecurityException {
-            sc.init(null, new TrustManager[] {
+            sc.init(null, new TrustManager[]{
                     new X509TrustManager() {
 
                         @Override
@@ -93,10 +94,10 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
     /**
      * Alias for BinaryLogRemoteClient(hostname, port, &lt;no schema&gt; = null, username, password).
      *
-     * @see BinaryLogRemoteClient#BinaryLogRemoteClient(String, int, String, String, String)
+     * @see BinaryLogRemoteClient#BinaryLogRemoteClient(String, int, String, String, String, long)
      */
-    public BinaryLogRemoteClient(String hostname, int port, String username, String password) {
-        this(hostname, port, null, username, password);
+    public BinaryLogRemoteClient(String hostname, int port, String username, String password) throws IOException {
+        this(hostname, port, null, username, password, 0L);
     }
 
     /**
@@ -106,13 +107,15 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
      *                 authentication.
      * @param username login name
      * @param password password
+     * @param serverId serverId
      */
-    public BinaryLogRemoteClient(String hostname, int port, String schema, String username, String password) {
+    public BinaryLogRemoteClient(String hostname, int port, String schema, String username, String password, long serverId) throws IOException {
         this.hostname = hostname;
         this.port = port;
         this.schema = schema;
         this.username = username;
         this.password = password;
+        this.serverId = randomPort(serverId);
     }
 
     @Override
@@ -207,8 +210,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         }
         synchronized (gtidSetAccessLock) {
             String position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;
-            logger.info("Connected to {}:{} at {} ({}cid:{})", hostname, port, position, (blocking ? "sid:" + serverId + ", " : ""),
-                    connectionId);
+            logger.info("Connected to {}:{} at {} (sid:{}, cid:{})", hostname, port, position, serverId, connectionId);
         }
         connected = true;
     }
@@ -535,7 +537,7 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         }
 
         IdentityHashMap eventDataDeserializers = new IdentityHashMap();
-        if(null == eventDeserializer){
+        if (null == eventDeserializer) {
             this.eventDeserializer = new EventDeserializer(new EventHeaderV4Deserializer(), new NullEventDataDeserializer(), eventDataDeserializers, tableMapEventByTableId);
         }
 
@@ -577,6 +579,21 @@ public class BinaryLogRemoteClient implements BinaryLogClient {
         }
     }
 
+    private long randomPort(long serverId) throws IOException {
+        if (0 == serverId) {
+            ServerSocket serverSocket = null;
+            try {
+                serverSocket = new ServerSocket(0);
+                return serverSocket.getLocalPort();
+            } finally {
+                if (null != serverSocket) {
+                    serverSocket.close();
+                }
+            }
+        }
+        return serverId;
+    }
+
     @Override
     public String getBinlogFilename() {
         return binlogFilename;

+ 2 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -100,7 +100,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
         int pageIndex = 1;
         for (; ; ) {
             Result reader = connectorFactory.reader(connectionMapper, new ReaderConfig(point.getCommand(), point.getArgs(), pageIndex++, readNum));
-            List<Map> data = reader.getData();
+            List<Map> data = reader.getSuccessData();
             if (CollectionUtils.isEmpty(data)) {
                 break;
             }
@@ -108,7 +108,7 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
             Object event = null;
             for (Map<String, Object> row : data) {
                 if(StringUtil.isBlank(eventFieldName)){
-                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_MAP, row, true));
+                    changedEvent(new RowChangedEvent(index, ConnectorConstant.OPERTION_UPDATE, Collections.EMPTY_MAP, row));
                     continue;
                 }
 

+ 1 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -5,7 +5,6 @@ import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.RandomUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.config.SqlServerDatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.listener.AbstractExtractor;
@@ -121,7 +120,7 @@ public class SqlServerExtractor extends AbstractExtractor {
     }
 
     private void connect() {
-        SqlServerDatabaseConfig cfg = (SqlServerDatabaseConfig) connectorConfig;
+        DatabaseConfig cfg = (DatabaseConfig) connectorConfig;
         if (connectorFactory.isAlive(cfg)) {
             connectorMapper = (DatabaseConnectorMapper) connectorFactory.connect(cfg);
             serverName = cfg.getUrl();

+ 1 - 1
dbsyncer-listener/src/main/test/DBChangeNotificationTest.java

@@ -28,7 +28,7 @@ public class DBChangeNotificationTest {
 
         final DBChangeNotification dcn = new DBChangeNotification(username, password, url);
         dcn.addRowEventListener((e) ->
-            logger.info("{}触发{}, before:{}, after:{}", e.getTableName(), e.getEvent(), e.getBeforeData(), e.getAfterData())
+            logger.info("{}触发{}, before:{}, after:{}", e.getSourceTableName(), e.getEvent(), e.getBeforeData(), e.getAfterData())
         );
         dcn.start();
 

+ 9 - 4
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -31,6 +31,7 @@ import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
@@ -73,6 +74,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
     @Autowired
     private ConnectorFactory connectorFactory;
 
+    @Qualifier("taskExecutor")
     @Autowired
     private Executor taskExecutor;
 
@@ -80,7 +82,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
 
     @PostConstruct
     private void init() {
-        scheduledTaskService.start("*/10 * * * * ?", this);
+        scheduledTaskService.start(10000, this);
     }
 
     @Override
@@ -240,10 +242,12 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         @Override
         public void changedEvent(RowChangedEvent rowChangedEvent) {
             final FieldPicker picker = tablePicker.get(rowChangedEvent.getTableGroupIndex());
-            rowChangedEvent.setTableName(picker.getTableGroup().getSourceTable().getName());
+            TableGroup tableGroup = picker.getTableGroup();
+            rowChangedEvent.setSourceTableName(tableGroup.getSourceTable().getName());
+            rowChangedEvent.setTargetTableName(tableGroup.getTargetTable().getName());
 
             // 处理过程有异常向上抛
-            parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
+            parser.execute(mapping, tableGroup, rowChangedEvent);
 
             // 标记有变更记录
             changed.compareAndSet(false, true);
@@ -296,7 +300,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         @Override
         public void changedEvent(RowChangedEvent rowChangedEvent) {
             // 处理过程有异常向上抛
-            List<FieldPicker> pickers = tablePicker.get(rowChangedEvent.getTableName());
+            List<FieldPicker> pickers = tablePicker.get(rowChangedEvent.getSourceTableName());
             if (!CollectionUtils.isEmpty(pickers)) {
                 pickers.parallelStream().forEach(picker -> {
                     final Map<String, Object> before = picker.getColumns(rowChangedEvent.getBeforeData());
@@ -304,6 +308,7 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
                     if (picker.filter(StringUtil.equals(ConnectorConstant.OPERTION_DELETE, rowChangedEvent.getEvent()) ? before : after)) {
                         rowChangedEvent.setBefore(before);
                         rowChangedEvent.setAfter(after);
+                        rowChangedEvent.setTargetTableName(picker.getTableGroup().getTargetTable().getName());
                         parser.execute(mapping, picker.getTableGroup(), rowChangedEvent);
                     }
                 });

+ 3 - 12
dbsyncer-parser/src/main/java/org/dbsyncer/parser/Parser.java

@@ -2,8 +2,6 @@ package org.dbsyncer.parser;
 
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.model.Result;
-import org.dbsyncer.connector.config.Field;
-import org.dbsyncer.parser.model.Task;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.connector.config.MetaInfo;
@@ -13,9 +11,7 @@ import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.listener.enums.QuartzFilterEnum;
 import org.dbsyncer.parser.enums.ConvertEnum;
-import org.dbsyncer.parser.model.Connector;
-import org.dbsyncer.parser.model.Mapping;
-import org.dbsyncer.parser.model.TableGroup;
+import org.dbsyncer.parser.model.*;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
 
 import java.util.List;
@@ -167,14 +163,9 @@ public interface Parser {
     /**
      * 批执行
      *
-     * @param connectorMapper
-     * @param command
-     * @param event
-     * @param fields
-     * @param dataList
-     * @param batchSize
+     * @param batchWriter
      * @return
      */
-    Result writeBatch(ConnectorMapper connectorMapper, Map<String, String> command, String event, List<Field> fields, List<Map> dataList, int batchSize);
+    Result writeBatch(BatchWriter batchWriter);
 
 }

+ 31 - 59
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -27,12 +27,10 @@ import org.dbsyncer.parser.util.ConvertUtil;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.dbsyncer.plugin.PluginFactory;
 import org.dbsyncer.storage.enums.StorageDataStatusEnum;
-import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.ApplicationContext;
@@ -174,7 +172,6 @@ public class ParserFactory implements Parser {
         try {
             JSONObject conn = new JSONObject(json);
             JSONObject config = (JSONObject) conn.remove("config");
-            JSONArray table = (JSONArray) conn.remove("table");
             Connector connector = JsonUtil.jsonToObj(conn.toString(), Connector.class);
             Assert.notNull(connector, "Connector can not be null.");
             String connectorType = config.getString("connectorType");
@@ -182,19 +179,6 @@ public class ParserFactory implements Parser {
             ConnectorConfig obj = (ConnectorConfig) JsonUtil.jsonToObj(config.toString(), configClass);
             connector.setConfig(obj);
 
-            List<Table> tableList = new ArrayList<>();
-            boolean exist = false;
-            for (int i = 0; i < table.length(); i++) {
-                if (table.get(i) instanceof String) {
-                    tableList.add(new Table(table.getString(i)));
-                    exist = true;
-                }
-            }
-            if (!exist) {
-                tableList = JsonUtil.jsonToArray(table.toString(), Table.class);
-            }
-            connector.setTable(tableList);
-
             return connector;
         } catch (JSONException e) {
             logger.error(e.getMessage());
@@ -204,16 +188,8 @@ public class ParserFactory implements Parser {
 
     @Override
     public <T> T parseObject(String json, Class<T> clazz) {
-        try {
-            JSONObject obj = new JSONObject(json);
-            T t = JsonUtil.jsonToObj(obj.toString(), clazz);
-            String format = String.format("%s can not be null.", clazz.getSimpleName());
-            Assert.notNull(t, format);
-            return t;
-        } catch (JSONException e) {
-            logger.error(e.getMessage());
-            throw new ParserException(e.getMessage());
-        }
+        T t = JsonUtil.jsonToObj(json, clazz);
+        return t;
     }
 
     @Override
@@ -283,7 +259,7 @@ public class ParserFactory implements Parser {
             // 1、获取数据源数据
             int pageIndex = Integer.parseInt(params.get(ParserEnum.PAGE_INDEX.getCode()));
             Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), pageIndex, pageSize));
-            List<Map> data = reader.getData();
+            List<Map> data = reader.getSuccessData();
             if (CollectionUtils.isEmpty(data)) {
                 params.clear();
                 logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
@@ -291,7 +267,7 @@ public class ParserFactory implements Parser {
             }
 
             // 2、映射字段
-            List<Map> target = picker.pickData(reader.getData());
+            List<Map> target = picker.pickData(data);
 
             // 3、参数转换
             ConvertUtil.convert(group.getConvert(), target);
@@ -300,10 +276,10 @@ public class ParserFactory implements Parser {
             pluginFactory.convert(group.getPlugin(), data, target);
 
             // 5、写入目标源
-            Result writer = writeBatch(tConnectorMapper, command, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize);
+            Result writer = writeBatch(new BatchWriter(tConnectorMapper, command, sTableName, ConnectorConstant.OPERTION_INSERT, picker.getTargetFields(), target, batchSize));
 
             // 6、更新结果
-            flush(task, writer, target);
+            flush(task, writer);
 
             // 7、更新分页数
             params.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(++pageIndex));
@@ -311,15 +287,13 @@ public class ParserFactory implements Parser {
     }
 
     @Override
-    public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent rowChangedEvent) {
-        logger.info("Table[{}] {}, before:{}, after:{}", rowChangedEvent.getTableName(), rowChangedEvent.getEvent(),
-                rowChangedEvent.getBefore(), rowChangedEvent.getAfter());
-        final String metaId = mapping.getMetaId();
+    public void execute(Mapping mapping, TableGroup tableGroup, RowChangedEvent event) {
+        logger.info("Table[{}] {}, before:{}, after:{}", event.getSourceTableName(), event.getEvent(),
+                event.getBefore(), event.getAfter());
 
-        ConnectorMapper tConnectorMapper = connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId()));
         // 1、获取映射字段
-        final String event = rowChangedEvent.getEvent();
-        Map<String, Object> data = StringUtil.equals(ConnectorConstant.OPERTION_DELETE, event) ? rowChangedEvent.getBefore() : rowChangedEvent.getAfter();
+        final String eventName = event.getEvent();
+        Map<String, Object> data = StringUtil.equals(ConnectorConstant.OPERTION_DELETE, eventName) ? event.getBefore() : event.getAfter();
         Picker picker = new Picker(tableGroup.getFieldMapping(), data);
         Map target = picker.getTargetMap();
 
@@ -327,29 +301,32 @@ public class ParserFactory implements Parser {
         ConvertUtil.convert(tableGroup.getConvert(), target);
 
         // 3、插件转换
-        pluginFactory.convert(tableGroup.getPlugin(), event, data, target);
+        pluginFactory.convert(tableGroup.getPlugin(), eventName, data, target);
 
         // 4、写入缓冲执行器
-        writerBufferActuator.offer(new WriterRequest(metaId, tableGroup.getId(), event, tConnectorMapper, picker.getTargetFields(), tableGroup.getCommand(), target));
+        writerBufferActuator.offer(new WriterRequest(tableGroup.getId(), target, mapping.getMetaId(), mapping.getTargetConnectorId(), event.getSourceTableName(), event.getTargetTableName(), eventName, picker.getTargetFields(), tableGroup.getCommand()));
     }
 
     /**
      * 批量写入
      *
-     * @param connectorMapper
-     * @param command
-     * @param fields
-     * @param dataList
-     * @param batchSize
+     * @param batchWriter
      * @return
      */
     @Override
-    public Result writeBatch(ConnectorMapper connectorMapper, Map<String, String> command, String event, List<Field> fields, List<Map> dataList, int batchSize) {
+    public Result writeBatch(BatchWriter batchWriter) {
+        List<Map> dataList = batchWriter.getDataList();
+        int batchSize = batchWriter.getBatchSize();
+        String tableName = batchWriter.getTableName();
+        String event = batchWriter.getEvent();
+        Map<String, String> command = batchWriter.getCommand();
+        List<Field> fields = batchWriter.getFields();
+        boolean forceUpdate = batchWriter.isForceUpdate();
         // 总数
         int total = dataList.size();
         // 单次任务
         if (total <= batchSize) {
-            return connectorFactory.writer(connectorMapper, new WriterBatchConfig(event, command, fields, dataList));
+            return connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, dataList, forceUpdate));
         }
 
         // 批量任务, 拆分
@@ -372,13 +349,12 @@ public class ParserFactory implements Parser {
 
             taskExecutor.execute(() -> {
                 try {
-                    Result w = connectorFactory.writer(connectorMapper, new WriterBatchConfig(event, command, fields, data));
-                    // CAS
-                    result.getFailData().addAll(w.getFailData());
-                    result.getFail().getAndAdd(w.getFail().get());
+                    Result w = connectorFactory.writer(batchWriter.getConnectorMapper(), new WriterBatchConfig(tableName, event, command, fields, data, forceUpdate));
+                    result.addSuccessData(w.getSuccessData());
+                    result.addFailData(w.getFailData());
                     result.getError().append(w.getError());
                 } catch (Exception e) {
-                    result.getError().append(e.getMessage()).append(System.lineSeparator());
+                    logger.error(e.getMessage());
                 } finally {
                     latch.countDown();
                 }
@@ -397,10 +373,9 @@ public class ParserFactory implements Parser {
      *
      * @param task
      * @param writer
-     * @param data
      */
-    private void flush(Task task, Result writer, List<Map> data) {
-        flushStrategy.flushFullData(task.getId(), writer, ConnectorConstant.OPERTION_INSERT, data);
+    private void flush(Task task, Result writer) {
+        flushStrategy.flushFullData(task.getId(), writer, ConnectorConstant.OPERTION_INSERT);
 
         // 发布刷新事件给FullExtractor
         task.setEndTime(Instant.now().toEpochMilli());
@@ -430,9 +405,7 @@ public class ParserFactory implements Parser {
         Assert.hasText(connectorId, "Connector id can not be empty.");
         Connector conn = cacheService.get(connectorId, Connector.class);
         Assert.notNull(conn, "Connector can not be null.");
-        Connector connector = new Connector();
-        BeanUtils.copyProperties(conn, connector);
-        return connector;
+        return conn;
     }
 
     /**
@@ -442,8 +415,7 @@ public class ParserFactory implements Parser {
      * @return
      */
     private ConnectorConfig getConnectorConfig(String connectorId) {
-        Connector connector = getConnector(connectorId);
-        return connector.getConfig();
+        return getConnector(connectorId).getConfig();
     }
 
 }

+ 31 - 16
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -2,8 +2,6 @@ package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.common.scheduled.ScheduledTaskJob;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
-import org.dbsyncer.parser.flush.model.AbstractRequest;
-import org.dbsyncer.parser.flush.model.AbstractResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -14,6 +12,9 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * @author AE86
@@ -31,10 +32,12 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
 
     private Queue<Request> temp = new ConcurrentLinkedQueue();
 
-    private final Object LOCK = new Object();
+    private final Lock lock = new ReentrantLock(true);
 
     private volatile boolean running;
 
+    private final static long MAX_BATCH_COUNT = 1000L;
+
     @PostConstruct
     private void init() {
         scheduledTaskService.start(getPeriod(), this);
@@ -52,7 +55,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
      *
      * @return
      */
-    protected abstract AbstractResponse getValue();
+    protected abstract BufferResponse getValue();
 
     /**
      * 生成分区key
@@ -78,7 +81,7 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
     protected abstract void pull(Response response);
 
     @Override
-    public void offer(AbstractRequest request) {
+    public void offer(BufferRequest request) {
         if (running) {
             temp.offer((Request) request);
             return;
@@ -91,27 +94,39 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
         if (running) {
             return;
         }
-        synchronized (LOCK) {
-            if (running) {
-                return;
+
+        final Lock bufferLock = lock;
+        boolean locked = false;
+        try {
+            locked = bufferLock.tryLock();
+            if (locked) {
+                running = true;
+                flush(buffer);
+                running = false;
+                flush(temp);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        } finally {
+            if (locked) {
+                running = false;
+                bufferLock.unlock();
             }
-            running = true;
-            flush(buffer);
-            running = false;
-            flush(temp);
         }
     }
 
     private void flush(Queue<Request> queue) {
         if (!queue.isEmpty()) {
-            final Map<String, AbstractResponse> map = new LinkedHashMap<>();
-            while (!queue.isEmpty()) {
+            AtomicLong batchCounter = new AtomicLong();
+            final Map<String, BufferResponse> map = new LinkedHashMap<>();
+            while (!queue.isEmpty() && batchCounter.get() < MAX_BATCH_COUNT) {
                 Request poll = queue.poll();
                 String key = getPartitionKey(poll);
                 if (!map.containsKey(key)) {
                     map.putIfAbsent(key, getValue());
                 }
                 partition(poll, (Response) map.get(key));
+                batchCounter.incrementAndGet();
             }
 
             map.forEach((key, flushTask) -> {
@@ -119,9 +134,9 @@ public abstract class AbstractBufferActuator<Request, Response> implements Buffe
                 try {
                     pull((Response) flushTask);
                 } catch (Exception e) {
-                    logger.error("[{}]-flush异常{}", key);
+                    logger.error("[{}]异常{}", key);
                 }
-                logger.info("[{}]-flush{}条,耗时{}秒", key, flushTask.getTaskSize(), (Instant.now().toEpochMilli() - now) / 1000);
+                logger.info("[{}]{}条,耗时{}秒", key, flushTask.getTaskSize(), (Instant.now().toEpochMilli() - now) / 1000);
             });
             map.clear();
         }

+ 16 - 19
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractFlushStrategy.java

@@ -2,13 +2,11 @@ package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.parser.model.Meta;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.Assert;
 
-import java.util.List;
-import java.util.Map;
-
 /**
  * @author AE86
  * @version 1.0.0
@@ -23,31 +21,30 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
     private CacheService cacheService;
 
     @Override
-    public void flushFullData(String metaId, Result result, String event, List<Map> dataList) {
-        flush(metaId, result, event, dataList);
+    public void flushFullData(String metaId, Result result, String event) {
+        flush(metaId, result, event);
     }
 
     @Override
-    public void flushIncrementData(String metaId, Result result, String event, List<Map> dataList) {
-        flush(metaId, result, event, dataList);
+    public void flushIncrementData(String metaId, Result result, String event) {
+        flush(metaId, result, event);
     }
 
-    protected void flush(String metaId, Result result, String event, List<Map> dataList) {
-        refreshTotal(metaId, result, dataList);
+    protected void flush(String metaId, Result result, String event) {
+        refreshTotal(metaId, result);
 
-        boolean success = 0 == result.getFail().get();
-        if (!success) {
-            dataList.clear();
-            dataList.addAll(result.getFailData());
+        if (!CollectionUtils.isEmpty(result.getFailData())) {
+            flushService.asyncWrite(metaId, event, false, result.getFailData(), result.getError().toString());
+        }
+        if (!CollectionUtils.isEmpty(result.getSuccessData())) {
+            flushService.asyncWrite(metaId, event, true, result.getSuccessData(), "");
         }
-        flushService.asyncWrite(metaId, event, success, dataList, result.getError().toString());
     }
 
-    protected void refreshTotal(String metaId, Result writer, List<Map> dataList){
-        long fail = writer.getFail().get();
+    protected void refreshTotal(String metaId, Result writer) {
         Meta meta = getMeta(metaId);
-        meta.getFail().getAndAdd(fail);
-        meta.getSuccess().getAndAdd(dataList.size() - fail);
+        meta.getFail().getAndAdd(writer.getFailData().size());
+        meta.getSuccess().getAndAdd(writer.getSuccessData().size());
     }
 
     protected Meta getMeta(String metaId) {
@@ -57,4 +54,4 @@ public abstract class AbstractFlushStrategy implements FlushStrategy {
         return meta;
     }
 
-}
+}

+ 1 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferActuator.java

@@ -1,7 +1,5 @@
 package org.dbsyncer.parser.flush;
 
-import org.dbsyncer.parser.flush.model.AbstractRequest;
-
 /**
  * @author AE86
  * @version 1.0.0
@@ -9,6 +7,6 @@ import org.dbsyncer.parser.flush.model.AbstractRequest;
  */
 public interface BufferActuator {
 
-    void offer(AbstractRequest request);
+    void offer(BufferRequest request);
 
 }

+ 10 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferRequest.java

@@ -0,0 +1,10 @@
+package org.dbsyncer.parser.flush;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/3/27 16:57
+ */
+public interface BufferRequest {
+
+}

+ 3 - 3
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractResponse.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/BufferResponse.java

@@ -1,17 +1,17 @@
-package org.dbsyncer.parser.flush.model;
+package org.dbsyncer.parser.flush;
 
 /**
  * @author AE86
  * @version 1.0.0
  * @date 2022/3/27 18:11
  */
-public abstract class AbstractResponse {
+public interface BufferResponse {
 
     /**
      * 获取批处理数
      *
      * @return
      */
-    public abstract int getTaskSize();
+    int getTaskSize();
 
 }

+ 2 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/FlushStrategy.java

@@ -2,9 +2,6 @@ package org.dbsyncer.parser.flush;
 
 import org.dbsyncer.common.model.Result;
 
-import java.util.List;
-import java.util.Map;
-
 /**
  * 记录同步数据策略
  *
@@ -20,9 +17,8 @@ public interface FlushStrategy {
      * @param metaId
      * @param result
      * @param event
-     * @param dataList
      */
-    void flushFullData(String metaId, Result result, String event, List<Map> dataList);
+    void flushFullData(String metaId, Result result, String event);
 
     /**
      * 记录增量同步数据
@@ -30,8 +26,7 @@ public interface FlushStrategy {
      * @param metaId
      * @param result
      * @param event
-     * @param dataList
      */
-    void flushIncrementData(String metaId, Result result, String event, List<Map> dataList);
+    void flushIncrementData(String metaId, Result result, String event);
 
 }

+ 4 - 6
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/DisableFullFlushStrategy.java

@@ -1,14 +1,12 @@
 package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.parser.flush.AbstractFlushStrategy;
 import org.dbsyncer.parser.logger.LogService;
 import org.dbsyncer.parser.logger.LogType;
 import org.springframework.beans.factory.annotation.Autowired;
 
-import java.util.List;
-import java.util.Map;
-
 /**
  * 不记录全量数据, 只记录增量同步数据, 将异常记录到系统日志中
  *
@@ -22,11 +20,11 @@ public final class DisableFullFlushStrategy extends AbstractFlushStrategy {
     private LogService logService;
 
     @Override
-    public void flushFullData(String metaId, Result result, String event, List<Map> dataList) {
+    public void flushFullData(String metaId, Result result, String event) {
         // 不记录全量数据,只统计成功失败总数
-        refreshTotal(metaId, result, dataList);
+        refreshTotal(metaId, result);
 
-        if (0 < result.getFail().get()) {
+        if (!CollectionUtils.isEmpty(result.getFailData())) {
             LogType logType = LogType.TableGroupLog.FULL_FAILED;
             logService.log(logType, "%s:%s", logType.getMessage(), result.getError().toString());
         }

+ 1 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/EnableFlushStrategy.java

@@ -12,7 +12,7 @@ import org.springframework.stereotype.Component;
  * @date 2021/11/18 22:21
  */
 @Component
-@ConditionalOnProperty(value = "dbsyncer.parser.flush.enabled", havingValue = "true")
+@ConditionalOnProperty(value = "dbsyncer.parser.flush.full.enabled", havingValue = "true")
 public final class EnableFlushStrategy extends AbstractFlushStrategy {
 
 }

+ 4 - 7
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushServiceImpl.java

@@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSONException;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.parser.flush.BufferActuator;
 import org.dbsyncer.parser.flush.FlushService;
-import org.dbsyncer.parser.flush.model.FlushRequest;
+import org.dbsyncer.parser.flush.model.StorageRequest;
 import org.dbsyncer.storage.SnowflakeIdWorker;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.constant.ConfigConstant;
@@ -19,7 +19,6 @@ import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 /**
@@ -43,7 +42,7 @@ public class FlushServiceImpl implements FlushService {
     private SnowflakeIdWorker snowflakeIdWorker;
 
     @Autowired
-    private BufferActuator flushBufferActuator;
+    private BufferActuator storageBufferActuator;
 
     @Override
     public void asyncWrite(String type, String error) {
@@ -58,13 +57,12 @@ public class FlushServiceImpl implements FlushService {
     @Override
     public void asyncWrite(String metaId, String event, boolean success, List<Map> data, String error) {
         long now = Instant.now().toEpochMilli();
-        AtomicBoolean added = new AtomicBoolean(false);
         List<Map> list = data.parallelStream().map(r -> {
             Map<String, Object> params = new HashMap();
             params.put(ConfigConstant.CONFIG_MODEL_ID, String.valueOf(snowflakeIdWorker.nextId()));
             params.put(ConfigConstant.DATA_SUCCESS, success ? StorageDataStatusEnum.SUCCESS.getValue() : StorageDataStatusEnum.FAIL.getValue());
             params.put(ConfigConstant.DATA_EVENT, event);
-            params.put(ConfigConstant.DATA_ERROR, added.get() ? "" : error);
+            params.put(ConfigConstant.DATA_ERROR, error);
             try {
                 params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(r));
             } catch (JSONException e) {
@@ -72,11 +70,10 @@ public class FlushServiceImpl implements FlushService {
                 params.put(ConfigConstant.CONFIG_MODEL_JSON, r.toString());
             }
             params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
-            added.set(true);
             return params;
         }).collect(Collectors.toList());
 
-        flushBufferActuator.offer(new FlushRequest(metaId, list));
+        storageBufferActuator.offer(new StorageRequest(metaId, list));
     }
 
 }

+ 9 - 9
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/FlushBufferActuator.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/StorageBufferActuator.java

@@ -1,9 +1,9 @@
 package org.dbsyncer.parser.flush.impl;
 
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
-import org.dbsyncer.parser.flush.model.AbstractResponse;
-import org.dbsyncer.parser.flush.model.FlushRequest;
-import org.dbsyncer.parser.flush.model.FlushResponse;
+import org.dbsyncer.parser.flush.BufferResponse;
+import org.dbsyncer.parser.flush.model.StorageRequest;
+import org.dbsyncer.parser.flush.model.StorageResponse;
 import org.dbsyncer.storage.StorageService;
 import org.dbsyncer.storage.enums.StorageEnum;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -15,7 +15,7 @@ import org.springframework.stereotype.Component;
  * @date 2022/3/27 16:50
  */
 @Component
-public class FlushBufferActuator extends AbstractBufferActuator<FlushRequest, FlushResponse> {
+public class StorageBufferActuator extends AbstractBufferActuator<StorageRequest, StorageResponse> {
 
     @Autowired
     private StorageService storageService;
@@ -26,23 +26,23 @@ public class FlushBufferActuator extends AbstractBufferActuator<FlushRequest, Fl
     }
 
     @Override
-    protected AbstractResponse getValue() {
-        return new FlushResponse();
+    protected BufferResponse getValue() {
+        return new StorageResponse();
     }
 
     @Override
-    protected String getPartitionKey(FlushRequest bufferTask) {
+    protected String getPartitionKey(StorageRequest bufferTask) {
         return bufferTask.getMetaId();
     }
 
     @Override
-    protected void partition(FlushRequest request, FlushResponse response) {
+    protected void partition(StorageRequest request, StorageResponse response) {
         response.setMetaId(request.getMetaId());
         response.getDataList().addAll(request.getList());
     }
 
     @Override
-    protected void pull(FlushResponse response) {
+    protected void pull(StorageResponse response) {
         storageService.addData(StorageEnum.DATA, response.getMetaId(), response.getDataList());
     }
 }

+ 35 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/WriterBufferActuator.java

@@ -1,14 +1,21 @@
 package org.dbsyncer.parser.flush.impl;
 
+import org.dbsyncer.cache.CacheService;
 import org.dbsyncer.common.model.Result;
+import org.dbsyncer.connector.ConnectorFactory;
+import org.dbsyncer.connector.ConnectorMapper;
+import org.dbsyncer.connector.config.ConnectorConfig;
 import org.dbsyncer.parser.ParserFactory;
 import org.dbsyncer.parser.flush.AbstractBufferActuator;
-import org.dbsyncer.parser.flush.model.AbstractResponse;
+import org.dbsyncer.parser.flush.BufferResponse;
 import org.dbsyncer.parser.flush.FlushStrategy;
 import org.dbsyncer.parser.flush.model.WriterRequest;
 import org.dbsyncer.parser.flush.model.WriterResponse;
+import org.dbsyncer.parser.model.BatchWriter;
+import org.dbsyncer.parser.model.Connector;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
 
 import java.util.Collections;
 
@@ -20,12 +27,18 @@ import java.util.Collections;
 @Component
 public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest, WriterResponse> {
 
+    @Autowired
+    private ConnectorFactory connectorFactory;
+
     @Autowired
     private ParserFactory parserFactory;
 
     @Autowired
     private FlushStrategy flushStrategy;
 
+    @Autowired
+    private CacheService cacheService;
+
     private final static int BATCH_SIZE = 100;
 
     @Override
@@ -34,7 +47,7 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
     }
 
     @Override
-    protected AbstractResponse getValue() {
+    protected BufferResponse getValue() {
         return new WriterResponse();
     }
 
@@ -50,8 +63,10 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
             return;
         }
         response.setMetaId(request.getMetaId());
+        response.setTargetConnectorId(request.getTargetConnectorId());
+        response.setSourceTableName(request.getSourceTableName());
+        response.setTargetTableName(request.getTargetTableName());
         response.setEvent(request.getEvent());
-        response.setConnectorMapper(request.getConnectorMapper());
         response.setFields(Collections.unmodifiableList(request.getFields()));
         response.setCommand(request.getCommand());
         response.setMerged(true);
@@ -59,7 +74,22 @@ public class WriterBufferActuator extends AbstractBufferActuator<WriterRequest,
 
     @Override
     protected void pull(WriterResponse response) {
-        Result result = parserFactory.writeBatch(response.getConnectorMapper(), response.getCommand(), response.getEvent(), response.getFields(), response.getDataList(), BATCH_SIZE);
-        flushStrategy.flushIncrementData(response.getMetaId(), result, response.getEvent(), response.getDataList());
+        ConnectorMapper targetConnectorMapper = connectorFactory.connect(getConnectorConfig(response.getTargetConnectorId()));
+        Result result = parserFactory.writeBatch(new BatchWriter(targetConnectorMapper, response.getCommand(), response.getTargetTableName(), response.getEvent(),
+                response.getFields(), response.getDataList(), BATCH_SIZE, true));
+        flushStrategy.flushIncrementData(response.getMetaId(), result, response.getEvent());
+    }
+
+    /**
+     * 获取连接器配置
+     *
+     * @param connectorId
+     * @return
+     */
+    private ConnectorConfig getConnectorConfig(String connectorId) {
+        Assert.hasText(connectorId, "Connector id can not be empty.");
+        Connector conn = cacheService.get(connectorId, Connector.class);
+        Assert.notNull(conn, "Connector can not be null.");
+        return conn.getConfig();
     }
 }

+ 0 - 10
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractRequest.java

@@ -1,10 +0,0 @@
-package org.dbsyncer.parser.flush.model;
-
-/**
- * @author AE86
- * @version 1.0.0
- * @date 2022/3/27 16:57
- */
-public abstract class AbstractRequest {
-
-}

+ 84 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/AbstractWriter.java

@@ -0,0 +1,84 @@
+package org.dbsyncer.parser.flush.model;
+
+import org.dbsyncer.connector.config.Field;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/4/4 23:02
+ */
+public abstract class AbstractWriter {
+
+    private String metaId;
+
+    private String targetConnectorId;
+
+    private String sourceTableName;
+
+    private String targetTableName;
+
+    private List<Field> fields;
+
+    private Map<String, String> command;
+
+    private String event;
+
+    public String getMetaId() {
+        return metaId;
+    }
+
+    public void setMetaId(String metaId) {
+        this.metaId = metaId;
+    }
+
+    public String getTargetConnectorId() {
+        return targetConnectorId;
+    }
+
+    public void setTargetConnectorId(String targetConnectorId) {
+        this.targetConnectorId = targetConnectorId;
+    }
+
+    public String getSourceTableName() {
+        return sourceTableName;
+    }
+
+    public void setSourceTableName(String sourceTableName) {
+        this.sourceTableName = sourceTableName;
+    }
+
+    public String getTargetTableName() {
+        return targetTableName;
+    }
+
+    public void setTargetTableName(String targetTableName) {
+        this.targetTableName = targetTableName;
+    }
+
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    public void setFields(List<Field> fields) {
+        this.fields = fields;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+
+    public void setCommand(Map<String, String> command) {
+        this.command = command;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public void setEvent(String event) {
+        this.event = event;
+    }
+}

+ 4 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/FlushRequest.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/StorageRequest.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.parser.flush.model;
 
+import org.dbsyncer.parser.flush.BufferRequest;
+
 import java.util.List;
 import java.util.Map;
 
@@ -8,13 +10,13 @@ import java.util.Map;
  * @version 1.0.0
  * @date 2022/3/27 16:57
  */
-public class FlushRequest extends AbstractRequest {
+public class StorageRequest implements BufferRequest {
 
     private String metaId;
 
     private List<Map> list;
 
-    public FlushRequest(String metaId, List<Map> list) {
+    public StorageRequest(String metaId, List<Map> list) {
         this.metaId = metaId;
         this.list = list;
     }

+ 3 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/FlushResponse.java → dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/StorageResponse.java

@@ -1,5 +1,7 @@
 package org.dbsyncer.parser.flush.model;
 
+import org.dbsyncer.parser.flush.BufferResponse;
+
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -9,7 +11,7 @@ import java.util.Map;
  * @version 1.0.0
  * @date 2022/3/27 16:57
  */
-public class FlushResponse extends AbstractResponse {
+public class StorageResponse implements BufferResponse {
 
     private String metaId;
     private List<Map> dataList = new LinkedList<>();

+ 11 - 38
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterRequest.java

@@ -1,7 +1,7 @@
 package org.dbsyncer.parser.flush.model;
 
-import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.parser.flush.BufferRequest;
 
 import java.util.List;
 import java.util.Map;
@@ -11,57 +11,30 @@ import java.util.Map;
  * @version 1.0.0
  * @date 2022/3/27 16:57
  */
-public class WriterRequest extends AbstractRequest {
-
-    private String metaId;
+public class WriterRequest extends AbstractWriter implements BufferRequest {
 
     private String tableGroupId;
 
-    private String event;
-
-    private ConnectorMapper connectorMapper;
-
-    private List<Field> fields;
-
-    private Map<String, String> command;
-
     private Map row;
 
-    public WriterRequest(String metaId, String tableGroupId, String event, ConnectorMapper connectorMapper, List<Field> fields, Map<String, String> command, Map row) {
-        this.metaId = metaId;
+    public WriterRequest(String tableGroupId, Map row, String metaId, String targetConnectorId, String sourceTableName, String targetTableName, String event, List<Field> fields, Map<String, String> command) {
+        setMetaId(metaId);
+        setTargetConnectorId(targetConnectorId);
+        setSourceTableName(sourceTableName);
+        setTargetTableName(targetTableName);
+        setEvent(event);
+        setFields(fields);
+        setCommand(command);
         this.tableGroupId = tableGroupId;
-        this.event = event;
-        this.connectorMapper = connectorMapper;
-        this.fields = fields;
-        this.command = command;
         this.row = row;
     }
 
-    public String getMetaId() {
-        return metaId;
-    }
-
     public String getTableGroupId() {
         return tableGroupId;
     }
 
-    public String getEvent() {
-        return event;
-    }
-
-    public ConnectorMapper getConnectorMapper() {
-        return connectorMapper;
-    }
-
-    public List<Field> getFields() {
-        return fields;
-    }
-
-    public Map<String, String> getCommand() {
-        return command;
-    }
-
     public Map getRow() {
         return row;
     }
+
 }

+ 11 - 63
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/model/WriterResponse.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.parser.flush.model;
 
-import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.parser.flush.BufferResponse;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -12,68 +11,15 @@ import java.util.Map;
  * @version 1.0.0
  * @date 2022/3/27 18:11
  */
-public class WriterResponse extends AbstractResponse {
-
-    private boolean isMerged;
-
-    private String metaId;
-
-    private String event;
-
-    private ConnectorMapper connectorMapper;
-
-    private List<Field> fields;
-
-    private Map<String, String> command;
+public class WriterResponse extends AbstractWriter implements BufferResponse {
 
     private List<Map> dataList = new LinkedList<>();
 
-    public boolean isMerged() {
-        return isMerged;
-    }
-
-    public void setMerged(boolean merged) {
-        isMerged = merged;
-    }
-
-    public String getMetaId() {
-        return metaId;
-    }
-
-    public void setMetaId(String metaId) {
-        this.metaId = metaId;
-    }
-
-    public String getEvent() {
-        return event;
-    }
-
-    public void setEvent(String event) {
-        this.event = event;
-    }
-
-    public ConnectorMapper getConnectorMapper() {
-        return connectorMapper;
-    }
-
-    public void setConnectorMapper(ConnectorMapper connectorMapper) {
-        this.connectorMapper = connectorMapper;
-    }
-
-    public List<Field> getFields() {
-        return fields;
-    }
-
-    public void setFields(List<Field> fields) {
-        this.fields = fields;
-    }
-
-    public Map<String, String> getCommand() {
-        return command;
-    }
+    private boolean isMerged;
 
-    public void setCommand(Map<String, String> command) {
-        this.command = command;
+    @Override
+    public int getTaskSize() {
+        return dataList.size();
     }
 
     public List<Map> getDataList() {
@@ -84,9 +30,11 @@ public class WriterResponse extends AbstractResponse {
         this.dataList = dataList;
     }
 
-    @Override
-    public int getTaskSize() {
-        return dataList.size();
+    public boolean isMerged() {
+        return isMerged;
     }
 
+    public void setMerged(boolean merged) {
+        isMerged = merged;
+    }
 }

+ 68 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/BatchWriter.java

@@ -0,0 +1,68 @@
+package org.dbsyncer.parser.model;
+
+import org.dbsyncer.connector.ConnectorMapper;
+import org.dbsyncer.connector.config.Field;
+
+import java.util.List;
+import java.util.Map;
+
+public final class BatchWriter {
+
+    private ConnectorMapper     connectorMapper;
+    private Map<String, String> command;
+    private String              tableName;
+    private String              event;
+    private List<Field>         fields;
+    private List<Map>           dataList;
+    private int                 batchSize;
+    private boolean             isForceUpdate;
+
+    public BatchWriter(ConnectorMapper connectorMapper, Map<String, String> command, String tableName, String event,
+                       List<Field> fields, List<Map> dataList, int batchSize) {
+        this(connectorMapper, command, tableName, event, fields, dataList, batchSize, false);
+    }
+
+    public BatchWriter(ConnectorMapper connectorMapper, Map<String, String> command, String tableName, String event,
+                       List<Field> fields, List<Map> dataList, int batchSize, boolean isForceUpdate) {
+        this.connectorMapper = connectorMapper;
+        this.command = command;
+        this.tableName = tableName;
+        this.event = event;
+        this.fields = fields;
+        this.dataList = dataList;
+        this.batchSize = batchSize;
+        this.isForceUpdate = isForceUpdate;
+    }
+
+    public ConnectorMapper getConnectorMapper() {
+        return connectorMapper;
+    }
+
+    public Map<String, String> getCommand() {
+        return command;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    public List<Map> getDataList() {
+        return dataList;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public boolean isForceUpdate() {
+        return isForceUpdate;
+    }
+}

+ 5 - 9
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -91,6 +91,7 @@ public class Shard {
     }
 
     public void close() throws IOException {
+        indexWriter.commit();
         indexReader.close();
         indexWriter.close();
     }
@@ -191,16 +192,11 @@ public class Shard {
     }
 
     private void execute(Object value, Callback callback) throws IOException {
-        if (null != value) {
-            synchronized (LOCK) {
-                if (indexWriter.isOpen()) {
-                    callback.execute();
-                    indexWriter.commit();
-                    return;
-                }
-            }
-            logger.error(value.toString());
+        if (null != value && indexWriter.isOpen()) {
+            callback.execute();
+            return;
         }
+        logger.error(value.toString());
     }
 
     interface Callback {

+ 11 - 4
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -5,7 +5,10 @@ import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.config.Field;
+import org.dbsyncer.connector.config.SqlBuilderConfig;
+import org.dbsyncer.connector.config.WriterBatchConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.Database;
@@ -38,6 +41,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -77,7 +81,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
     private String database;
 
     @PostConstruct
-    private void init() {
+    private void init() throws InterruptedException {
         logger.info("url:{}", config.getUrl());
         config.setConnectorType(ConnectorEnum.MYSQL.getType());
         connectorMapper = (DatabaseConnectorMapper) connectorFactory.connect(config);
@@ -177,7 +181,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
             Map<String, String> command = new HashMap<>();
             command.put(SqlBuilderEnum.INSERT.getName(), executor.getInsert());
             ConnectorMapper connectorMapper = connectorFactory.connect(config);
-            connectorFactory.writer(connectorMapper, new WriterBatchConfig(ConnectorConstant.OPERTION_INSERT, command, executor.getFields(), list));
+            connectorFactory.writer(connectorMapper, new WriterBatchConfig(table, ConnectorConstant.OPERTION_INSERT, command, executor.getFields(), list));
         }
 
     }
@@ -268,7 +272,7 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         }
     }
 
-    private void initTable() {
+    private void initTable() throws InterruptedException {
         // 配置
         FieldBuilder builder = new FieldBuilder();
         builder.build(ConfigConstant.CONFIG_MODEL_ID, ConfigConstant.CONFIG_MODEL_NAME, ConfigConstant.CONFIG_MODEL_TYPE, ConfigConstant.CONFIG_MODEL_CREATE_TIME, ConfigConstant.CONFIG_MODEL_UPDATE_TIME, ConfigConstant.CONFIG_MODEL_JSON);
@@ -294,6 +298,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
                 createTableIfNotExist(tableName, e);
             }
         });
+
+        // wait few seconds for execute sql
+        TimeUnit.SECONDS.sleep(1);
     }
 
     private void createTableIfNotExist(String table, Executor executor) {

+ 7 - 7
dbsyncer-web/src/main/java/org/dbsyncer/web/config/WebAppConfig.java

@@ -83,9 +83,9 @@ public class WebAppConfig extends WebSecurityConfigurerAdapter implements Authen
         return new SavedRequestAwareAuthenticationSuccessHandler() {
             @Override
             public void onAuthenticationSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) {
-                Object principal = authentication.getPrincipal();
-                logger.info("USER : " + principal + " LOGIN SUCCESS !  ");
-                write(response, RestResult.restSuccess("登录成功!"));
+                String msg = String.format("%s 登录成功!", authentication.getPrincipal());
+                write(response, RestResult.restSuccess(msg));
+                logger.info(msg);
             }
         };
     }
@@ -94,12 +94,12 @@ public class WebAppConfig extends WebSecurityConfigurerAdapter implements Authen
     public LogoutSuccessHandler logoutHandler() {
         return (request, response, authentication) -> {
             try {
-                Object principal = authentication.getPrincipal();
-                logger.info("USER : {} LOGOUT SUCCESS ! ", principal);
-                write(response, RestResult.restSuccess("注销成功!"));
+                String msg = String.format("%s 注销成功!", authentication.getPrincipal());
+                write(response, RestResult.restSuccess(msg));
+                logger.info(msg);
             } catch (Exception e) {
-                logger.info("LOGOUT EXCEPTION , e : {}", e.getMessage());
                 write(response, RestResult.restFail(e.getMessage(), 403));
+                logger.info("注销失败: {}", e.getMessage());
             }
         };
     }

+ 3 - 3
dbsyncer-web/src/main/resources/application.properties

@@ -4,8 +4,8 @@ server.port=18686
 #web
 dbsyncer.web.login.username=admin
 dbsyncer.web.login.password=0DPiKuNIrrVmD8IUCuw1hQxNqZc=
-dbsyncer.web.thread.pool.core.size=20
-dbsyncer.web.thread.pool.queue.capacity=5000
+dbsyncer.web.thread.pool.core.size=10
+dbsyncer.web.thread.pool.queue.capacity=2000
 server.servlet.session.timeout=1800
 server.servlet.context-path=/
 
@@ -17,7 +17,7 @@ dbsyncer.storage.id=1
 #dbsyncer.storage.support.mysql.config.password=123
 
 #parser
-#dbsyncer.parser.flush.enabled=true
+#dbsyncer.parser.flush.full.enabled=true
 
 #monitor
 management.endpoints.web.base-path=/app

+ 62 - 0
dbsyncer-web/src/main/resources/public/connector/addDqlPostgreSQL.html

@@ -0,0 +1,62 @@
+<!DOCTYPE html>
+<html lang="zh-CN"
+      xmlns="http://www.w3.org/1999/xhtml" xmlns:th="http://www.thymeleaf.org">
+
+<div th:fragment="content">
+    <div class="form-group">
+        <label class="col-sm-2 control-label">帐号 <strong class="driverVerifcateRequired">*</strong></label>
+        <div class="col-sm-4">
+            <input class="form-control" dbsyncer-valid="require" maxlength="32" name="username" placeholder="admin"
+                   th:value="${connector?.config?.username}" type="text"/>
+        </div>
+        <label class="col-sm-2 control-label">密码 <strong class="driverVerifcateRequired">*</strong></label>
+        <div class="col-sm-4 ">
+            <input class="form-control" dbsyncer-valid="require" maxlength="32" name="password"
+                   th:value="${connector?.config?.password}" type="password"/>
+        </div>
+    </div>
+    <div class="form-group">
+        <label class="col-sm-2 control-label">SQL <strong class="driverVerifcateRequired">*</strong></label>
+        <div class="col-sm-10">
+            <textarea class="form-control dbsyncer_textarea_resize_none" dbsyncer-valid="require" id="sql"
+                      maxlength="1024" name="sql" rows="10"
+                      th:text="${connector?.config?.sql}?:'SELECT T1.* FROM &quot;USER&quot; T1'"></textarea>
+        </div>
+    </div>
+    <div class="form-group">
+        <label class="col-sm-2 control-label">主表 <strong class="driverVerifcateRequired">*</strong></label>
+        <div class="col-sm-4">
+            <input class="form-control" dbsyncer-valid="require" maxlength="32" name="table" placeholder="USER"
+                   th:value="${connector?.config?.table}" type="text"/>
+        </div>
+        <div class="col-sm-6 text-right">
+            <a href="javascript:beautifySql();"><span class="fa fa-magic fa-1x fa-flip-horizontal dbsyncer_pointer"
+                                                      title="美化SQL"></span>美化SQL</a>
+        </div>
+    </div>
+    <div class="form-group">
+        <label class="col-sm-2 control-label">URL <strong class="driverVerifcateRequired">*</strong></label>
+        <div class="col-sm-10">
+            <textarea class="form-control dbsyncer_textarea_resize_none" dbsyncer-valid="require" maxlength="1024"
+                      name="url" rows="5"
+                      th:text="${connector?.config?.url} ?: 'jdbc:postgresql://127.0.0.1:5432/test'"></textarea>
+        </div>
+    </div>
+    <div class="form-group">
+        <label class="col-sm-2 control-label">架构名 <strong class="driverVerifcateRequired">*</strong></label>
+        <div class="col-sm-4">
+            <input class="form-control" dbsyncer-valid="require" maxlength="32" name="schema" placeholder="public"
+                   th:value="${connector?.config?.schema} ?: 'public'" type="text"/>
+        </div>
+        <div class="col-sm-6"></div>
+    </div>
+    <div class="form-group">
+        <label class="col-sm-2 control-label">驱动 </label>
+        <div class="col-sm-10">
+            <input class="form-control" name="driverClassName" readonly="true"
+                   th:value="${connector?.config?.driverClassName} ?: 'org.postgresql.Driver'" type="text"/>
+        </div>
+    </div>
+</div>
+
+</html>

+ 50 - 0
dbsyncer-web/src/main/resources/public/connector/addPostgreSQL.html

@@ -0,0 +1,50 @@
+<!DOCTYPE html>
+<html lang="zh-CN"
+      xmlns="http://www.w3.org/1999/xhtml" xmlns:th="http://www.thymeleaf.org">
+
+<div th:fragment="content">
+    <div class="form-group">
+        <label class="col-sm-2 control-label">帐号 <strong class="driverVerifcateRequired">*</strong></label>
+        <div class="col-sm-4">
+            <input class="form-control" dbsyncer-valid="require" maxlength="32" name="username" placeholder="root"
+                   th:value="${connector?.config?.username}" type="text"/>
+        </div>
+        <label class="col-sm-2 control-label">密码 <strong class="driverVerifcateRequired">*</strong></label>
+        <div class="col-sm-4 ">
+            <input class="form-control" dbsyncer-valid="require" maxlength="32" name="password"
+                   th:value="${connector?.config?.password}" type="password"/>
+        </div>
+    </div>
+    <div class="form-group">
+        <label class="col-sm-2 control-label">URL <strong class="driverVerifcateRequired">*</strong></label>
+        <div class="col-sm-10">
+            <textarea class="form-control dbsyncer_textarea_resize_none" dbsyncer-valid="require" maxlength="1024"
+                      name="url" rows="5"
+                      th:text="${connector?.config?.url} ?: 'jdbc:postgresql://127.0.0.1:5432/test'"></textarea>
+        </div>
+    </div>
+    <div class="form-group">
+        <label class="col-sm-2 control-label">架构名 <strong class="driverVerifcateRequired">*</strong></label>
+        <div class="col-sm-4">
+            <input class="form-control" dbsyncer-valid="require" maxlength="32" name="schema" placeholder="public"
+                   th:value="${connector?.config?.schema} ?: 'public'" type="text"/>
+        </div>
+        <div class="col-sm-6"></div>
+    </div>
+    <div class="form-group">
+        <label class="col-sm-2 control-label">驱动 </label>
+        <div class="col-sm-10">
+            <input class="form-control" name="driverClassName" readonly="true"
+                   th:value="${connector?.config?.driverClassName} ?: 'org.postgresql.Driver'" type="text"/>
+        </div>
+    </div>
+
+    <script type="text/javascript">
+        $(function () {
+            // 初始化select插件
+            initSelectIndex($(".select-control"), 1);
+        })
+    </script>
+</div>
+
+</html>

+ 4 - 2
dbsyncer-web/src/main/resources/public/mapping/editTable.html

@@ -9,7 +9,8 @@
             <label class="col-sm-3 control-label text-right">数据源表</label>
             <div class="col-sm-9">
                 <select id="sourceTable" class="form-control select-control-table" multiple="multiple">
-                    <option th:each="t,s:${mapping?.sourceConnector?.table}" th:value="${t?.name}" th:text="${t?.name}" />
+                    <option th:each="t,s:${mapping?.sourceConnector?.table}" th:text="${t?.name} + ' ('+${t?.type}+')'"
+                            th:value="${t?.name}"/>
                 </select>
             </div>
         </div>
@@ -23,7 +24,8 @@
                 <label class="col-sm-3 control-label text-right">目标源表</label>
                 <div class="col-sm-9">
                     <select id="targetTable" class="form-control select-control-table" multiple="multiple">
-                        <option th:each="t,s:${mapping?.targetConnector?.table}" th:value="${t?.name}" th:text="${t?.name}" />
+                        <option th:each="t,s:${mapping?.targetConnector?.table}"
+                                th:text="${t?.name} + ' ('+${t?.type}+')'" th:value="${t?.name}"/>
                     </select>
                 </div>
             </div>

二进制
dbsyncer-web/src/main/resources/static/img/DqlPostgreSQL.png


二进制
dbsyncer-web/src/main/resources/static/img/PostgreSQL.png


+ 8 - 0
pom.xml

@@ -45,6 +45,7 @@
         <mysql.version>5.1.40</mysql.version>
         <mysql-binlog.version>0.21.0</mysql-binlog.version>
         <mssql-jdbc.version>8.2.0.jre8</mssql-jdbc.version>
+        <postgresql.version>42.3.3</postgresql.version>
         <kafka.version>0.9.0.0</kafka.version>
         <json.version>20090211</json.version>
         <fastjson.version>1.2.75</fastjson.version>
@@ -150,6 +151,13 @@
                 <version>${mssql-jdbc.version}</version>
             </dependency>
 
+            <!-- postgresql -->
+            <dependency>
+                <groupId>org.postgresql</groupId>
+                <artifactId>postgresql</artifactId>
+                <version>${postgresql.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>com.github.shyiko</groupId>
                 <artifactId>mysql-binlog-connector-java</artifactId>