Explorar o código

!120 upgrade version
Merge pull request !120 from AE86/v_1.0.0_dev

AE86 %!s(int64=2) %!d(string=hai) anos
pai
achega
e10821eba5
Modificáronse 66 ficheiros con 957 adicións e 594 borrados
  1. 1 1
      dbsyncer-biz/pom.xml
  2. 14 11
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/AbstractDataBaseConfigChecker.java
  3. 0 2
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/ElasticsearchConfigChecker.java
  4. 20 10
      dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java
  5. 1 1
      dbsyncer-cache/pom.xml
  6. 1 1
      dbsyncer-cluster/pom.xml
  7. 1 1
      dbsyncer-common/pom.xml
  8. 0 4
      dbsyncer-common/src/main/java/org/dbsyncer/common/model/AbstractConnectorConfig.java
  9. 4 0
      dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java
  10. 1 1
      dbsyncer-connector/pom.xml
  11. 10 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java
  12. 14 38
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/DatabaseConfig.java
  13. 0 12
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ESConfig.java
  14. 5 9
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java
  15. 6 6
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java
  16. 92 70
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  17. 8 3
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderDelete.java
  18. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQuery.java
  19. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryCursor.java
  20. 7 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderUpdate.java
  21. 6 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java
  22. 3 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java
  23. 0 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Field.java
  24. 16 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/MetaInfo.java
  25. 16 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/PageSql.java
  26. 43 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/SqlTable.java
  27. 26 15
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Table.java
  28. 27 19
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java
  29. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java
  30. 27 17
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java
  31. 36 15
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java
  32. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java
  33. 1 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLPostgreSQLConnector.java
  34. 8 5
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java
  35. 6 1
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java
  36. 70 8
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PrimaryKeyUtil.java
  37. 1 1
      dbsyncer-listener/pom.xml
  38. 68 49
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractDatabaseExtractor.java
  39. 23 24
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java
  40. 2 7
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java
  41. 2 2
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/DqlOracleExtractor.java
  42. 7 14
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java
  43. 6 5
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/TableGroupCommand.java
  44. 1 1
      dbsyncer-manager/pom.xml
  45. 5 3
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java
  46. 33 38
      dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java
  47. 1 1
      dbsyncer-monitor/pom.xml
  48. 1 1
      dbsyncer-parser/pom.xml
  49. 7 16
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java
  50. 5 5
      dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Task.java
  51. 1 1
      dbsyncer-plugin/pom.xml
  52. 1 1
      dbsyncer-storage/pom.xml
  53. 3 1
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java
  54. 1 1
      dbsyncer-web/pom.xml
  55. 1 1
      dbsyncer-web/src/main/resources/application.properties
  56. 5 26
      dbsyncer-web/src/main/resources/public/connector/addDqlMysql.html
  57. 15 30
      dbsyncer-web/src/main/resources/public/connector/addDqlOracle.html
  58. 5 27
      dbsyncer-web/src/main/resources/public/connector/addDqlPostgreSQL.html
  59. 12 27
      dbsyncer-web/src/main/resources/public/connector/addDqlSqlServer.html
  60. 4 11
      dbsyncer-web/src/main/resources/public/connector/addElasticsearch.html
  61. 5 5
      dbsyncer-web/src/main/resources/public/connector/addFile.html
  62. 7 7
      dbsyncer-web/src/main/resources/public/connector/addKafka.html
  63. 253 0
      dbsyncer-web/src/main/resources/public/connector/addSQL.html
  64. 5 4
      dbsyncer-web/src/main/resources/static/js/common.js
  65. 2 2
      dbsyncer-web/src/main/resources/static/js/monitor/index.js
  66. 1 1
      pom.xml

+ 1 - 1
dbsyncer-biz/pom.xml

@@ -5,7 +5,7 @@
 	<parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.2-RC</version>
+        <version>1.2.3-RC</version>
     </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-biz</artifactId>

+ 14 - 11
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/AbstractDataBaseConfigChecker.java

@@ -1,9 +1,13 @@
 package org.dbsyncer.biz.checker.impl.connector;
 
 import org.dbsyncer.biz.checker.ConnectorConfigChecker;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.model.SqlTable;
 import org.springframework.util.Assert;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -19,7 +23,6 @@ public abstract class AbstractDataBaseConfigChecker implements ConnectorConfigCh
         String password = params.get("password");
         String url = params.get("url");
         String driverClassName = params.get("driverClassName");
-        String primaryKey = params.get("primaryKey");
         Assert.hasText(username, "Username is empty.");
         Assert.hasText(password, "Password is empty.");
         Assert.hasText(url, "Url is empty.");
@@ -28,19 +31,19 @@ public abstract class AbstractDataBaseConfigChecker implements ConnectorConfigCh
         connectorConfig.setPassword(password);
         connectorConfig.setUrl(url);
         connectorConfig.setDriverClassName(driverClassName);
-        connectorConfig.setPrimaryKey(primaryKey);
     }
 
     protected void modifyDql(DatabaseConfig connectorConfig, Map<String, String> params) {
-        String sql = params.get("sql");
-        String table = params.get("table");
-        String primaryKey = params.get("primaryKey");
-        Assert.hasText(sql, "Sql is empty.");
-        Assert.hasText(table, "Table is empty.");
-        Assert.hasText(primaryKey, "PrimaryKey is empty.");
-        connectorConfig.setSql(sql);
-        connectorConfig.setTable(table);
-        connectorConfig.setPrimaryKey(primaryKey);
+        String sqlTableParams = params.get("sqlTableParams");
+        Assert.hasText(sqlTableParams, "sqlTableParams is empty.");
+        List<SqlTable> sqlTables = JsonUtil.jsonToArray(sqlTableParams, SqlTable.class);
+        Assert.isTrue(!CollectionUtils.isEmpty(sqlTables), "sqlTables is empty.");
+        sqlTables.forEach(sqlTable -> {
+            Assert.hasText(sqlTable.getSqlName(), "SqlName is empty.");
+            Assert.hasText(sqlTable.getSql(), "Sql is empty.");
+            Assert.hasText(sqlTable.getTable(), "Table is empty.");
+        });
+        connectorConfig.setSqlTables(sqlTables);
     }
 
     protected void modifySchema(DatabaseConfig connectorConfig, Map<String, String> params) {

+ 0 - 2
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/ElasticsearchConfigChecker.java

@@ -21,7 +21,6 @@ public class ElasticsearchConfigChecker implements ConnectorConfigChecker<ESConf
         String password = params.get("password");
         String index = params.get("index");
         String url = params.get("url");
-        String primaryKey = params.get("primaryKey");
         Assert.hasText(username, "Username is empty.");
         Assert.hasText(password, "Password is empty.");
         Assert.hasText(index, "Index is empty.");
@@ -31,6 +30,5 @@ public class ElasticsearchConfigChecker implements ConnectorConfigChecker<ESConf
         connectorConfig.setPassword(password);
         connectorConfig.setIndex(index);
         connectorConfig.setUrl(url);
-        connectorConfig.setPrimaryKey(primaryKey);
     }
 }

+ 20 - 10
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/tablegroup/TableGroupChecker.java

@@ -24,10 +24,13 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * @author AE86
@@ -132,19 +135,26 @@ public class TableGroupChecker extends AbstractChecker {
         checker.dealIncrementStrategy(mapping, tableGroup);
     }
 
-    private Table getTable(String connectorId, String tableName, String primaryKey) {
+    private Table getTable(String connectorId, String tableName, String primaryKeyStr) {
         MetaInfo metaInfo = manager.getMetaInfo(connectorId, tableName);
-        Assert.notNull(metaInfo, "无法获取连接器表信息.");
+        Assert.notNull(metaInfo, "无法获取连接器表信息:" + tableName);
         // 自定义主键
-        if (StringUtil.isNotBlank(primaryKey) && !CollectionUtils.isEmpty(metaInfo.getColumn())) {
-            for (Field field : metaInfo.getColumn()) {
-                if (StringUtil.equals(field.getName(), primaryKey)) {
-                    field.setPk(true);
-                    break;
-                }
-            }
+        List<String> primaryKeys = new ArrayList<>();
+        if (StringUtil.isNotBlank(primaryKeyStr)) {
+            String[] pks = StringUtil.split(primaryKeyStr, ",");
+            Set<String> keys = new LinkedHashSet<>(Arrays.asList(pks));
+            primaryKeys.addAll(keys);
+        }
+        if (!CollectionUtils.isEmpty(primaryKeys) && !CollectionUtils.isEmpty(metaInfo.getColumn())) {
+            primaryKeys.forEach(pk ->
+                metaInfo.getColumn().forEach(field -> {
+                    if(StringUtil.equalsIgnoreCase(field.getName(), pk)){
+                        field.setPk(true);
+                    }
+                })
+            );
         }
-        return new Table(tableName, metaInfo.getTableType(), primaryKey, metaInfo.getColumn());
+        return new Table(tableName, metaInfo.getTableType(), primaryKeys, metaInfo.getColumn(), metaInfo.getSql());
     }
 
     private void checkRepeatedTable(String mappingId, String sourceTable, String targetTable) {

+ 1 - 1
dbsyncer-cache/pom.xml

@@ -4,7 +4,7 @@
 	<parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.2-RC</version>
+        <version>1.2.3-RC</version>
     </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-cache</artifactId>

+ 1 - 1
dbsyncer-cluster/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.2-RC</version>
+        <version>1.2.3-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-cluster</artifactId>

+ 1 - 1
dbsyncer-common/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.2-RC</version>
+        <version>1.2.3-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-common</artifactId>

+ 0 - 4
dbsyncer-common/src/main/java/org/dbsyncer/common/model/AbstractConnectorConfig.java

@@ -22,8 +22,4 @@ public abstract class AbstractConnectorConfig {
         return this;
     }
 
-    public String getPrimaryKey() {
-        return "";
-    }
-
 }

+ 4 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/StringUtil.java

@@ -8,6 +8,10 @@ public abstract class StringUtil {
         return StringUtils.equals(cs1, cs2);
     }
 
+    public static boolean equalsIgnoreCase(CharSequence cs1, CharSequence cs2) {
+        return StringUtils.equalsIgnoreCase(cs1, cs2);
+    }
+
     public static boolean isBlank(CharSequence cs) {
         return StringUtils.isBlank(cs);
     }

+ 1 - 1
dbsyncer-connector/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.2-RC</version>
+        <version>1.2.3-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-connector</artifactId>

+ 10 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/AbstractConnector.java

@@ -11,6 +11,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -70,7 +71,7 @@ public abstract class AbstractConnector {
         for (Map row : config.getData()) {
             // 根据目标字段类型转换值
             for (Field f : config.getFields()) {
-                if(null == f){
+                if (null == f) {
                     continue;
                 }
                 // 根据字段类型转换值
@@ -88,13 +89,18 @@ public abstract class AbstractConnector {
         }
     }
 
-    protected Field getPrimaryKeyField(List<Field> fields) {
+    protected List<Field> getPrimaryKeys(List<Field> fields) {
+        List<Field> list = new ArrayList<>();
+
         for (Field f : fields) {
             if (f.isPk()) {
-                return f;
+                list.add(f);
             }
         }
-        throw new ConnectorException("主键为空");
+        if (CollectionUtils.isEmpty(list)) {
+            throw new ConnectorException("主键为空");
+        }
+        return list;
     }
 
     protected boolean isUpdate(String event) {

+ 14 - 38
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/DatabaseConfig.java

@@ -1,8 +1,10 @@
 package org.dbsyncer.connector.config;
 
 import org.dbsyncer.common.model.AbstractConnectorConfig;
+import org.dbsyncer.connector.model.SqlTable;
 
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -34,24 +36,14 @@ public class DatabaseConfig extends AbstractConnectorConfig {
     private String password;
 
     /**
-     * 主表
-     */
-    private String table;
-
-    /**
-     * 主键
-     */
-    private String primaryKey;
-
-    /**
-     * SQL
+     * 构架名
      */
-    private String sql;
+    private String schema;
 
     /**
-     * 构架名
+     * sql
      */
-    private String schema;
+    private List<SqlTable> sqlTables;
 
     /**
      * 参数配置
@@ -98,30 +90,6 @@ public class DatabaseConfig extends AbstractConnectorConfig {
         this.password = password;
     }
 
-    public String getTable() {
-        return table;
-    }
-
-    public void setTable(String table) {
-        this.table = table;
-    }
-
-    public String getPrimaryKey() {
-        return primaryKey;
-    }
-
-    public void setPrimaryKey(String primaryKey) {
-        this.primaryKey = primaryKey;
-    }
-
-    public String getSql() {
-        return sql;
-    }
-
-    public void setSql(String sql) {
-        this.sql = sql;
-    }
-
     public String getSchema() {
         return schema;
     }
@@ -130,6 +98,14 @@ public class DatabaseConfig extends AbstractConnectorConfig {
         this.schema = schema;
     }
 
+    public List<SqlTable> getSqlTables() {
+        return sqlTables;
+    }
+
+    public void setSqlTables(List<SqlTable> sqlTables) {
+        this.sqlTables = sqlTables;
+    }
+
     public Map<String, String> getProperties() {
         return properties;
     }

+ 0 - 12
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ESConfig.java

@@ -35,11 +35,6 @@ public class ESConfig extends AbstractConnectorConfig {
      */
     private String type = "_doc";
 
-    /**
-     * 主键
-     */
-    private String primaryKey;
-
     public String getUrl() {
         return url;
     }
@@ -80,11 +75,4 @@ public class ESConfig extends AbstractConnectorConfig {
         this.type = type;
     }
 
-    public String getPrimaryKey() {
-        return primaryKey;
-    }
-
-    public void setPrimaryKey(String primaryKey) {
-        this.primaryKey = primaryKey;
-    }
 }

+ 5 - 9
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/ReaderConfig.java

@@ -7,14 +7,14 @@ public class ReaderConfig {
 
     private Map<String, String> command;
     private List<Object> args;
-    private Object cursor;
+    private Object[] cursors;
     private int pageIndex;
     private int pageSize;
 
-    public ReaderConfig(Map<String,String> command, List<Object> args, Object cursor, int pageIndex, int pageSize) {
+    public ReaderConfig(Map<String,String> command, List<Object> args, Object[] cursors, int pageIndex, int pageSize) {
         this.command = command;
         this.args = args;
-        this.cursor = cursor;
+        this.cursors = cursors;
         this.pageIndex = pageIndex;
         this.pageSize = pageSize;
     }
@@ -27,12 +27,8 @@ public class ReaderConfig {
         return args;
     }
 
-    public Object getCursor() {
-        return cursor;
-    }
-
-    public void setCursor(String cursor) {
-        this.cursor = cursor;
+    public Object[] getCursors() {
+        return cursors;
     }
 
     public int getPageIndex() {

+ 6 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/SqlBuilderConfig.java

@@ -12,8 +12,8 @@ public class SqlBuilderConfig {
     private String schema;
     // 表名
     private String tableName;
-    // 主键
-    private String pk;
+    // 主键列表
+    private List<String> primaryKeys;
     // 字段
     private List<Field> fields;
     // 过滤条件
@@ -21,11 +21,11 @@ public class SqlBuilderConfig {
     // 引号
     private String quotation;
 
-    public SqlBuilderConfig(Database database, String schema, String tableName, String pk, List<Field> fields, String queryFilter, String quotation) {
+    public SqlBuilderConfig(Database database, String schema, String tableName, List<String> primaryKeys, List<Field> fields, String queryFilter, String quotation) {
         this.database = database;
         this.schema = schema;
         this.tableName = tableName;
-        this.pk = pk;
+        this.primaryKeys = primaryKeys;
         this.fields = fields;
         this.queryFilter = queryFilter;
         this.quotation = quotation;
@@ -43,8 +43,8 @@ public class SqlBuilderConfig {
         return tableName;
     }
 
-    public String getPk() {
-        return pk;
+    public List<String> getPrimaryKeys() {
+        return primaryKeys;
     }
 
     public List<Field> getFields() {

+ 92 - 70
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -121,7 +122,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
     @Override
     public Result reader(DatabaseConnectorMapper connectorMapper, ReaderConfig config) {
         // 1、获取select SQL
-        String queryKey = enableCursor() && null == config.getCursor() ? ConnectorConstant.OPERTION_QUERY_CURSOR : SqlBuilderEnum.QUERY.getName();
+        String queryKey = enableCursor() && null == config.getCursors() ? ConnectorConstant.OPERTION_QUERY_CURSOR : SqlBuilderEnum.QUERY.getName();
         String querySql = config.getCommand().get(queryKey);
         Assert.hasText(querySql, "查询语句不能为空.");
 
@@ -152,15 +153,15 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             throw new ConnectorException("writer data can not be empty.");
         }
         List<Field> fields = new ArrayList<>(config.getFields());
-        Field pkField = getPrimaryKeyField(config.getFields());
+        List<Field> pkFields = getPrimaryKeys(config.getFields());
         // Update / Delete
         if (!isInsert(event)) {
             if (isDelete(event)) {
                 fields.clear();
             } else if (isUpdate(event)) {
-                fields.remove(pkField);
+                removeFieldWithPk(fields, pkFields);
             }
-            fields.add(pkField);
+            fields.addAll(pkFields);
         }
 
         Result result = new Result();
@@ -170,7 +171,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             execute = connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(executeSql, batchRows(fields, data)));
         } catch (Exception e) {
             logger.error(e.getMessage());
-            data.forEach(row -> forceUpdate(result, connectorMapper, config, pkField, row));
+            data.forEach(row -> forceUpdate(result, connectorMapper, config, pkFields, row));
         }
 
         if (null != execute) {
@@ -180,7 +181,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
                     result.getSuccessData().add(data.get(i));
                     continue;
                 }
-                forceUpdate(result, connectorMapper, config, pkField, data.get(i));
+                forceUpdate(result, connectorMapper, config, pkFields, data.get(i));
             }
         }
         return result;
@@ -223,39 +224,41 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
 
         // 获取查询数据行是否存在
         String tableName = commandConfig.getTable().getName();
-        String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getTable());
-        StringBuilder queryCount = new StringBuilder("SELECT COUNT(1) FROM ").append(schema).append(quotation).append(tableName).append(quotation)
-                .append(" WHERE ").append(quotation).append(pk).append(quotation).append(" = ?");
+        List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
+        StringBuilder queryCount = new StringBuilder("SELECT COUNT(1) FROM ").append(schema).append(quotation).append(tableName).append(quotation).append(" WHERE ");
+        // id = ? AND uid = ?
+        PrimaryKeyUtil.buildSql(queryCount, primaryKeys, quotation, " AND ", " = ? ", true);
+
         String queryCountExist = ConnectorConstant.OPERTION_QUERY_COUNT_EXIST;
         map.put(queryCountExist, queryCount.toString());
         return map;
     }
 
     /**
-     * 是否支持游标查询
+     * 查询语句表名和字段带上引号(默认不加)
      *
      * @return
      */
-    protected boolean enableCursor() {
-        return false;
+    public String buildSqlWithQuotation() {
+        return "";
     }
 
     /**
-     * 健康检查
+     * 是否支持游标查询
      *
      * @return
      */
-    protected String getValidationQuery() {
-        return "select 1";
+    protected boolean enableCursor() {
+        return false;
     }
 
     /**
-     * 查询语句表名和字段带上引号(默认不加)
+     * 健康检
      *
      * @return
      */
-    protected String buildSqlWithQuotation() {
-        return "";
+    protected String getValidationQuery() {
+        return "select 1";
     }
 
     /**
@@ -278,33 +281,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return config.getSchema();
     }
 
-    /**
-     * 获取表列表
-     *
-     * @param connectorMapper
-     * @param catalog
-     * @param schema
-     * @param tableNamePattern
-     * @return
-     */
-    protected List<Table> getTable(DatabaseConnectorMapper connectorMapper, String catalog, String schema, String tableNamePattern) {
-        return connectorMapper.execute(databaseTemplate -> {
-            List<Table> tables = new ArrayList<>();
-            SimpleConnection connection = (SimpleConnection) databaseTemplate.getConnection();
-            Connection conn = connection.getConnection();
-            String databaseCatalog = null == catalog ? conn.getCatalog() : catalog;
-            String schemaNamePattern = null == schema ? conn.getSchema() : schema;
-            String[] types = {TableTypeEnum.TABLE.getCode(), TableTypeEnum.VIEW.getCode(), TableTypeEnum.MATERIALIZED_VIEW.getCode()};
-            final ResultSet rs = conn.getMetaData().getTables(databaseCatalog, schemaNamePattern, tableNamePattern, types);
-            while (rs.next()) {
-                final String tableName = rs.getString("TABLE_NAME");
-                final String tableType = rs.getString("TABLE_TYPE");
-                tables.add(new Table(tableName, tableType));
-            }
-            return tables;
-        });
-    }
-
     /**
      * 获取数据库表元数据信息
      *
@@ -366,14 +342,17 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
      */
     protected String getQueryCountSql(CommandConfig commandConfig, String schema, String quotation, String queryFilterSql) {
         String table = commandConfig.getTable().getName();
-        String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getTable());
-        StringBuilder queryCount = new StringBuilder();
-        queryCount.append("SELECT COUNT(1) FROM (SELECT 1 FROM ").append(schema).append(quotation).append(table).append(quotation);
+        List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
+        StringBuilder sql = new StringBuilder();
+        sql.append("SELECT COUNT(1) FROM (SELECT 1 FROM ").append(schema).append(quotation).append(table).append(quotation);
         if (StringUtil.isNotBlank(queryFilterSql)) {
-            queryCount.append(queryFilterSql);
+            sql.append(queryFilterSql);
         }
-        queryCount.append(" GROUP BY ").append(quotation).append(pk).append(quotation).append(") DBSYNCER_T");
-        return queryCount.toString();
+        sql.append(" GROUP BY ");
+        // id,uid
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        sql.append(") DBSYNCER_T");
+        return sql.toString();
     }
 
     /**
@@ -412,6 +391,33 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return sql.toString();
     }
 
+    /**
+     * 获取表列表
+     *
+     * @param connectorMapper
+     * @param catalog
+     * @param schema
+     * @param tableNamePattern
+     * @return
+     */
+    private List<Table> getTable(DatabaseConnectorMapper connectorMapper, String catalog, String schema, String tableNamePattern) {
+        return connectorMapper.execute(databaseTemplate -> {
+            List<Table> tables = new ArrayList<>();
+            SimpleConnection connection = (SimpleConnection) databaseTemplate.getConnection();
+            Connection conn = connection.getConnection();
+            String databaseCatalog = null == catalog ? conn.getCatalog() : catalog;
+            String schemaNamePattern = null == schema ? conn.getSchema() : schema;
+            String[] types = {TableTypeEnum.TABLE.getCode(), TableTypeEnum.VIEW.getCode(), TableTypeEnum.MATERIALIZED_VIEW.getCode()};
+            final ResultSet rs = conn.getMetaData().getTables(databaseCatalog, schemaNamePattern, tableNamePattern, types);
+            while (rs.next()) {
+                final String tableName = rs.getString("TABLE_NAME");
+                final String tableType = rs.getString("TABLE_TYPE");
+                tables.add(new Table(tableName, tableType));
+            }
+            return tables;
+        });
+    }
+
     /**
      * 根据过滤条件获取查询SQL
      *
@@ -466,7 +472,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         if (CollectionUtils.isEmpty(column)) {
             return null;
         }
-        String pk = null;
         Set<String> mark = new HashSet<>();
         List<Field> fields = new ArrayList<>();
         for (Field c : column) {
@@ -474,9 +479,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             if (StringUtil.isBlank(name)) {
                 throw new ConnectorException("The field name can not be empty.");
             }
-            if (c.isPk()) {
-                pk = name;
-            }
             if (!mark.contains(name)) {
                 fields.add(c);
                 mark.add(name);
@@ -491,11 +493,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             logger.error("Table name can not be empty.");
             throw new ConnectorException("Table name can not be empty.");
         }
-        if (StringUtil.isBlank(pk)) {
-            pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(table);
-        }
-
-        SqlBuilderConfig config = new SqlBuilderConfig(this, schema, tableName, pk, fields, queryFilterSQL, buildSqlWithQuotation());
+        List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
+        SqlBuilderConfig config = new SqlBuilderConfig(this, schema, tableName, primaryKeys, fields, queryFilterSQL, buildSqlWithQuotation());
         return SqlBuilderEnum.getSqlBuilder(type).buildSql(config);
     }
 
@@ -552,18 +551,24 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return args;
     }
 
-    private void forceUpdate(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField,
+    private void forceUpdate(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, List<Field> pkFields,
                              Map row) {
-        if(isUpdate(config.getEvent()) || isInsert(config.getEvent())){
+        if (isUpdate(config.getEvent()) || isInsert(config.getEvent())) {
             // 存在执行覆盖更新,否则写入
             final String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
-            final String event = existRow(connectorMapper, queryCount, row.get(pkField.getName())) ? ConnectorConstant.OPERTION_UPDATE : ConnectorConstant.OPERTION_INSERT;
+            int size = pkFields.size();
+            Object[] args = new Object[size];
+            for (int i = 0; i < size; i++) {
+                args[i] = row.get(pkFields.get(i).getName());
+            }
+            final String event = existRow(connectorMapper, queryCount, args) ? ConnectorConstant.OPERTION_UPDATE
+                    : ConnectorConstant.OPERTION_INSERT;
             logger.warn("{}表执行{}失败, 重新执行{}, {}", config.getTableName(), config.getEvent(), event, row);
-            writer(result, connectorMapper, config, pkField, row, event);
+            writer(result, connectorMapper, config, pkFields, row, event);
         }
     }
 
-    private void writer(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField, Map row,
+    private void writer(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, List<Field> pkFields, Map row,
                         String event) {
         // 1、获取 SQL
         String sql = config.getCommand().get(event);
@@ -574,9 +579,9 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             if (isDelete(event)) {
                 fields.clear();
             } else if (isUpdate(event)) {
-                fields.remove(pkField);
+                fields.remove(pkFields);
             }
-            fields.add(pkField);
+            fields.addAll(pkFields);
         }
 
         try {
@@ -595,12 +600,12 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         }
     }
 
-    private boolean existRow(DatabaseConnectorMapper connectorMapper, String sql, Object value) {
+    private boolean existRow(DatabaseConnectorMapper connectorMapper, String sql, Object[] args) {
         int rowNum = 0;
         try {
-            rowNum = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, new Object[]{value}, Integer.class));
+            rowNum = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, Integer.class, args));
         } catch (Exception e) {
-            logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, value);
+            logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, args);
         }
         return rowNum > 0;
     }
@@ -610,4 +615,21 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return !CollectionUtils.isEmpty(pk) && pk.contains(name);
     }
 
+    private void removeFieldWithPk(List<Field> fields, List<Field> pkFields) {
+        if (CollectionUtils.isEmpty(fields) || CollectionUtils.isEmpty(pkFields)) {
+            return;
+        }
+
+        pkFields.forEach(pkField -> {
+            Iterator<Field> iterator = fields.iterator();
+            while (iterator.hasNext()) {
+                Field next = iterator.next();
+                if (next != null && StringUtil.equals(next.getName(), pkField.getName())) {
+                    iterator.remove();
+                    break;
+                }
+            }
+        });
+    }
+
 }

+ 8 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderDelete.java

@@ -2,6 +2,9 @@ package org.dbsyncer.connector.database.sqlbuilder;
 
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
+
+import java.util.List;
 
 /**
  * @author AE86
@@ -14,9 +17,11 @@ public class SqlBuilderDelete extends AbstractSqlBuilder {
     public String buildSql(SqlBuilderConfig config) {
         String tableName = config.getTableName();
         String quotation = config.getQuotation();
-        // DELETE FROM "USER" WHERE "ID"=?
-        return new StringBuilder().append("DELETE FROM ").append(config.getSchema()).append(quotation).append(tableName).append(quotation).append(" WHERE ").append(quotation).append(config.getPk()).append(quotation)
-                .append("=?").toString();
+        List<String> primaryKeys = config.getPrimaryKeys();
+        // DELETE FROM "USER" WHERE "ID"=? AND "UID" = ?
+        StringBuilder sql = new StringBuilder().append("DELETE FROM ").append(config.getSchema()).append(quotation).append(tableName).append(quotation).append(" WHERE ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " = ? ", true);
+        return sql.toString();
     }
 
 }

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQuery.java

@@ -20,7 +20,7 @@ public class SqlBuilderQuery extends AbstractSqlBuilder {
     public String buildSql(SqlBuilderConfig config) {
         // 分页语句
         Database database = config.getDatabase();
-        PageSql pageSql = new PageSql(config, buildQuerySql(config), config.getPk());
+        PageSql pageSql = new PageSql(config, buildQuerySql(config), config.getPrimaryKeys());
         return database.getPageSql(pageSql);
     }
 

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderQueryCursor.java

@@ -15,7 +15,7 @@ public class SqlBuilderQueryCursor extends SqlBuilderQuery {
     public String buildSql(SqlBuilderConfig config) {
         // 分页语句
         Database database = config.getDatabase();
-        PageSql pageSql = new PageSql(config, buildQuerySql(config), config.getPk());
+        PageSql pageSql = new PageSql(config, buildQuerySql(config), config.getPrimaryKeys());
         return database.getPageCursorSql(pageSql);
     }
 

+ 7 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/sqlbuilder/SqlBuilderUpdate.java

@@ -4,8 +4,10 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 import org.dbsyncer.connector.database.AbstractSqlBuilder;
 import org.dbsyncer.connector.model.Field;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
 import java.util.List;
+import java.util.Set;
 
 /**
  * @author AE86
@@ -20,8 +22,8 @@ public class SqlBuilderUpdate extends AbstractSqlBuilder {
         List<Field> fields = config.getFields();
         String quotation = config.getQuotation();
         StringBuilder sql = new StringBuilder();
+        List<String> primaryKeys = config.getPrimaryKeys();
         int size = fields.size();
-        int end = size - 1;
         sql.append("UPDATE ").append(config.getSchema()).append(quotation).append(tableName).append(quotation).append(" SET ");
         for (int i = 0; i < size; i++) {
             // skip pk
@@ -31,7 +33,7 @@ public class SqlBuilderUpdate extends AbstractSqlBuilder {
 
             // "USERNAME"=?
             sql.append(quotation).append(fields.get(i).getName()).append(quotation).append("=?");
-            if (i < end) {
+            if (i < size - 1) {
                 sql.append(",");
             }
         }
@@ -42,8 +44,9 @@ public class SqlBuilderUpdate extends AbstractSqlBuilder {
             sql.deleteCharAt(last);
         }
 
-        // UPDATE "USER" SET "USERNAME"=?,"AGE"=? WHERE "ID"=?
-        sql.append(" WHERE ").append(quotation).append(config.getPk()).append(quotation).append("=?");
+        // UPDATE "USER" SET "USERNAME"=?,"AGE"=? WHERE "ID"=? AND "UID" = ?
+        sql.append(" WHERE ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " = ? ", true);
         return sql.toString();
     }
 

+ 6 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -126,7 +126,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             }
             properties.forEach((k, v) -> {
                 String columnType = (String) v.get("type");
-                fields.add(new Field(k, columnType, ESFieldTypeEnum.getType(columnType), StringUtil.equals(config.getPrimaryKey(), k)));
+                fields.add(new Field(k, columnType, ESFieldTypeEnum.getType(columnType)));
             });
         } catch (IOException e) {
             logger.error(e.getMessage());
@@ -188,11 +188,12 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
 
         final Result result = new Result();
         final ESConfig cfg = connectorMapper.getConfig();
-        final Field pkField = getPrimaryKeyField(config.getFields());
-        final String primaryKeyName = pkField.getName();
+        final List<Field> pkFields = getPrimaryKeys(config.getFields());
         try {
             BulkRequest request = new BulkRequest();
-            data.forEach(row -> addRequest(request, cfg.getIndex(), cfg.getType(), config.getEvent(), String.valueOf(row.get(primaryKeyName)), row));
+            // 默认取第一个主键
+            final String pk = pkFields.get(0).getName();
+            data.forEach(row -> addRequest(request, cfg.getIndex(), cfg.getType(), config.getEvent(), String.valueOf(row.get(pk)), row));
 
             BulkResponse response = connectorMapper.getConnection().bulk(request, RequestOptions.DEFAULT);
             RestStatus restStatus = response.status();
@@ -232,7 +233,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
     public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
         Table table = commandConfig.getTable();
         if (!CollectionUtils.isEmpty(table.getColumn())) {
-            getPrimaryKeyField(table.getColumn());
+            getPrimaryKeys(table.getColumn());
         }
         return Collections.EMPTY_MAP;
     }

+ 3 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -84,10 +84,11 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
 
         Result result = new Result();
         final KafkaConfig cfg = connectorMapper.getConfig();
-        Field pkField = getPrimaryKeyField(config.getFields());
+        final List<Field> pkNames = getPrimaryKeys(config.getFields());
         try {
             String topic = cfg.getTopic();
-            String pk = pkField.getName();
+            // 默认取第一个主键
+            final String pk = pkNames.get(0).getName();
             data.forEach(row -> connectorMapper.getConnection().send(topic, String.valueOf(row.get(pk)), row));
             result.addSuccessData(data);
         } catch (Exception e) {

+ 0 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Field.java

@@ -44,11 +44,6 @@ public class Field {
     public Field() {
     }
 
-    public Field(String name, String labelName) {
-        this.name = name;
-        this.labelName = labelName;
-    }
-
     public Field(String name, String typeName, int type) {
         this.name = name;
         this.typeName = typeName;

+ 16 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/MetaInfo.java

@@ -16,12 +16,18 @@ public class MetaInfo {
      * 表类型
      */
     private String tableType;
+
     /**
      * 属性字段
      * 格式:[{"name":"ID","typeName":"INT","type":"4"},{"name":"NAME","typeName":"VARCHAR","type":"12"}]
      */
     private List<Field> column;
 
+    /**
+     * sql
+     */
+    private String sql;
+
     public String getTableType() {
         return tableType;
     }
@@ -39,6 +45,16 @@ public class MetaInfo {
         this.column = column;
         return this;
     }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public MetaInfo setSql(String sql) {
+        this.sql = sql;
+        return this;
+    }
+
     @Override
     public String toString() {
         return new StringBuilder().append("MetaInfo{").append("tableType=").append(tableType).append(", ").append("column=").append(column).append('}').toString();

+ 16 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/PageSql.java

@@ -2,23 +2,28 @@ package org.dbsyncer.connector.model;
 
 import org.dbsyncer.connector.config.SqlBuilderConfig;
 
+import java.util.List;
+
 public class PageSql {
 
     private SqlBuilderConfig sqlBuilderConfig;
 
     private String querySql;
 
-    private String pk;
+    private String quotation;
+
+    private List<String> primaryKeys;
 
-    public PageSql(String querySql, String pk) {
+    public PageSql(String querySql, String quotation, List<String> primaryKeys) {
         this.querySql = querySql;
-        this.pk = pk;
+        this.quotation = quotation;
+        this.primaryKeys = primaryKeys;
     }
 
-    public PageSql(SqlBuilderConfig sqlBuilderConfig, String querySql, String pk) {
+    public PageSql(SqlBuilderConfig sqlBuilderConfig, String querySql, List<String> primaryKeys) {
         this.sqlBuilderConfig = sqlBuilderConfig;
         this.querySql = querySql;
-        this.pk = pk;
+        this.primaryKeys = primaryKeys;
     }
 
     public SqlBuilderConfig getSqlBuilderConfig() {
@@ -29,7 +34,11 @@ public class PageSql {
         return querySql;
     }
 
-    public String getPk() {
-        return pk;
+    public String getQuotation() {
+        return quotation;
+    }
+
+    public List<String> getPrimaryKeys() {
+        return primaryKeys;
     }
 }

+ 43 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/SqlTable.java

@@ -0,0 +1,43 @@
+package org.dbsyncer.connector.model;
+
+public class SqlTable {
+
+    private String sqlName;
+
+    private String sql;
+
+    private String table;
+
+    public SqlTable() {
+    }
+
+    public SqlTable(String sqlName, String sql, String table) {
+        this.sqlName = sqlName;
+        this.sql = sql;
+        this.table = table;
+    }
+
+    public String getSqlName() {
+        return sqlName;
+    }
+
+    public void setSqlName(String sqlName) {
+        this.sqlName = sqlName;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+}

+ 26 - 15
dbsyncer-connector/src/main/java/org/dbsyncer/connector/model/Table.java

@@ -22,9 +22,9 @@ public class Table {
     private String type;
 
     /**
-     * 主键
+     * 主键列表
      */
-    private String primaryKey;
+    private List<String> primaryKeys;
 
     /**
      * 属性字段
@@ -32,6 +32,11 @@ public class Table {
      */
     private List<Field> column;
 
+    /**
+     * sql
+     */
+    private String sql;
+
     // 总数
     private long count;
 
@@ -43,49 +48,55 @@ public class Table {
     }
 
     public Table(String name, String type) {
-        this(name, type, null, null);
+        this(name, type, null, null, null);
     }
 
-    public Table(String name, String type, String primaryKey, List<Field> column) {
+    public Table(String name, String type, List<String> primaryKeys, List<Field> column, String sql) {
         this.name = name;
         this.type = type;
-        this.primaryKey = primaryKey;
+        this.primaryKeys = primaryKeys;
         this.column = column;
+        this.sql = sql;
     }
 
     public String getName() {
         return name;
     }
 
-    public Table setName(String name) {
+    public void setName(String name) {
         this.name = name;
-        return this;
     }
 
     public String getType() {
         return type;
     }
 
-    public Table setType(String type) {
+    public void setType(String type) {
         this.type = type;
-        return this;
     }
 
-    public String getPrimaryKey() {
-        return primaryKey;
+    public List<String> getPrimaryKeys() {
+        return primaryKeys;
     }
 
-    public void setPrimaryKey(String primaryKey) {
-        this.primaryKey = primaryKey;
+    public void setPrimaryKeys(List<String> primaryKeys) {
+        this.primaryKeys = primaryKeys;
     }
 
     public List<Field> getColumn() {
         return column;
     }
 
-    public Table setColumn(List<Field> column) {
+    public void setColumn(List<Field> column) {
         this.column = column;
-        return this;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
     }
 
     public long getCount() {

+ 27 - 19
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -4,47 +4,55 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
+
+import java.util.List;
 
 public final class MysqlConnector extends AbstractDatabaseConnector {
 
     @Override
-    protected String buildSqlWithQuotation() {
+    public String buildSqlWithQuotation() {
         return "`";
     }
 
     @Override
     public String getPageSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
-        final String pk = config.getPk();
-        // select * from test.`my_user` where `id` > ? order by `id` limit ?
-        StringBuilder querySql = new StringBuilder(config.getQuerySql());
-        String queryFilter = config.getSqlBuilderConfig().getQueryFilter();
-        if (StringUtil.isNotBlank(queryFilter)) {
-            querySql.append(" AND ");
-        } else {
-            querySql.append(" WHERE ");
-        }
-        querySql.append(quotation).append(pk).append(quotation).append(" > ? ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
-        return querySql.toString();
+        final List<String> primaryKeys = config.getPrimaryKeys();
+        // select * from test.`my_user` where `id` > ? and `uid` > ? order by `id`,`uid` limit ?
+        StringBuilder sql = new StringBuilder(config.getQuerySql());
+        boolean blank = StringUtil.isBlank(config.getSqlBuilderConfig().getQueryFilter());
+        sql.append(blank ? " WHERE " : " AND ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " > ? ", blank);
+        sql.append(" ORDER BY ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        sql.append(" LIMIT ?");
+        return sql.toString();
     }
 
     @Override
     public String getPageCursorSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
-        final String pk = config.getPk();
-        // select * from test.`my_user` order by `id` limit ?
-        StringBuilder querySql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
-        return querySql.toString();
+        final List<String> primaryKeys = config.getPrimaryKeys();
+        // select * from test.`my_user` order by `id`,`uid` limit ?
+        StringBuilder sql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        sql.append(" LIMIT ?");
+        return sql.toString();
     }
 
     @Override
     public Object[] getPageArgs(ReaderConfig config) {
         int pageSize = config.getPageSize();
-        Object cursor = config.getCursor();
-        if (null == cursor) {
+        Object[] cursors = config.getCursors();
+        if (null == cursors) {
             return new Object[]{pageSize};
         }
-        return new Object[]{cursor, pageSize};
+        int cursorsLen = cursors.length;
+        Object[] newCursors = new Object[cursorsLen + 1];
+        System.arraycopy(cursors, 0, newCursors, 0, cursorsLen);
+        newCursors[cursorsLen] = pageSize;
+        return newCursors;
     }
 
     @Override

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -31,7 +31,7 @@ public final class OracleConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    protected String buildSqlWithQuotation() {
+    public String buildSqlWithQuotation() {
         return "\"";
     }
 

+ 27 - 17
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -8,8 +8,10 @@ import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
 import java.sql.Types;
+import java.util.List;
 
 public final class PostgreSQLConnector extends AbstractDatabaseConnector {
 
@@ -18,41 +20,49 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    protected String buildSqlWithQuotation() {
+    public String buildSqlWithQuotation() {
         return "\"";
     }
 
     @Override
     public String getPageSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
-        final String pk = config.getPk();
-        StringBuilder querySql = new StringBuilder(config.getQuerySql());
-        String queryFilter = config.getSqlBuilderConfig().getQueryFilter();
-        if (StringUtil.isNotBlank(queryFilter)) {
-            querySql.append(" AND ");
-        } else {
-            querySql.append(" WHERE ");
-        }
-        querySql.append(quotation).append(pk).append(quotation).append(" > ? ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
-        return querySql.toString();
+        final List<String> primaryKeys = config.getPrimaryKeys();
+        // select * from test.`my_user` where `id` > ? and `uid` > ? order by `id`,`uid` limit ?
+        StringBuilder sql = new StringBuilder(config.getQuerySql());
+        boolean blank = StringUtil.isBlank(config.getSqlBuilderConfig().getQueryFilter());
+        sql.append(blank ? " WHERE " : " AND ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, " AND ", " > ? ", blank);
+        sql.append(" ORDER BY ");
+        // id,uid
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        sql.append(" LIMIT ?");
+        return sql.toString();
     }
 
     @Override
     public String getPageCursorSql(PageSql config) {
         final String quotation = buildSqlWithQuotation();
-        final String pk = config.getPk();
-        StringBuilder querySql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ").append(quotation).append(pk).append(quotation).append(" LIMIT ?");
-        return querySql.toString();
+        final List<String> primaryKeys = config.getPrimaryKeys();
+        // select * from test.`my_user` order by `id`,`uid` limit ?
+        StringBuilder sql = new StringBuilder(config.getQuerySql()).append(" ORDER BY ");
+        PrimaryKeyUtil.buildSql(sql, primaryKeys, quotation, ",", "", true);
+        sql.append(" LIMIT ?");
+        return sql.toString();
     }
 
     @Override
     public Object[] getPageArgs(ReaderConfig config) {
         int pageSize = config.getPageSize();
-        Object cursor = config.getCursor();
-        if (null == cursor) {
+        Object[] cursors = config.getCursors();
+        if (null == cursors) {
             return new Object[]{pageSize};
         }
-        return new Object[]{cursor, pageSize};
+        int cursorsLen = cursors.length;
+        Object[] newCursors = new Object[cursorsLen + 1];
+        System.arraycopy(cursors, 0, newCursors, 0, cursorsLen);
+        newCursors[cursorsLen] = pageSize;
+        return newCursors;
     }
 
     @Override

+ 36 - 15
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.sql;
 
+import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
@@ -7,10 +8,15 @@ import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.enums.SqlBuilderEnum;
+import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.MetaInfo;
 import org.dbsyncer.connector.model.PageSql;
+import org.dbsyncer.connector.model.SqlTable;
 import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -23,9 +29,16 @@ import java.util.Map;
 public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
 
     @Override
-    public List<Table> getTable(DatabaseConnectorMapper config) {
-        DatabaseConfig cfg = config.getConfig();
-        return super.getTable(config, null, getSchema(cfg), cfg.getTable());
+    public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
+        DatabaseConfig cfg = connectorMapper.getConfig();
+        List<SqlTable> sqlTables = cfg.getSqlTables();
+        List<Table> tables = new ArrayList<>();
+        if (!CollectionUtils.isEmpty(sqlTables)) {
+            sqlTables.forEach(s ->
+                tables.add(new Table(s.getSqlName(), TableTypeEnum.TABLE.getCode(), Collections.EMPTY_LIST, Collections.EMPTY_LIST, s.getSql()))
+            );
+        }
+        return tables;
     }
 
     @Override
@@ -34,14 +47,20 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) {
+    public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String sqlName) {
         DatabaseConfig cfg = connectorMapper.getConfig();
-        String sql = cfg.getSql().toUpperCase();
-        sql = sql.replace("\t", " ");
-        sql = sql.replace("\r", " ");
-        sql = sql.replace("\n", " ");
-        String queryMetaSql = StringUtil.contains(sql, " WHERE ") ? cfg.getSql() + " AND 1!=1 " : cfg.getSql() + " WHERE 1!=1 ";
-        return connectorMapper.execute(databaseTemplate -> super.getMetaInfo(databaseTemplate, queryMetaSql, getSchema(cfg), cfg.getTable()));
+        List<SqlTable> sqlTables = cfg.getSqlTables();
+        for (SqlTable s : sqlTables) {
+            if (StringUtil.equals(s.getSqlName(), sqlName)) {
+                String sql = s.getSql().toUpperCase();
+                sql = sql.replace("\t", " ");
+                sql = sql.replace("\r", " ");
+                sql = sql.replace("\n", " ");
+                String queryMetaSql = StringUtil.contains(sql, " WHERE ") ? s.getSql() + " AND 1!=1 " : s.getSql() + " WHERE 1!=1 ";
+                return connectorMapper.execute(databaseTemplate -> super.getMetaInfo(databaseTemplate, queryMetaSql, getSchema(cfg), s.getTable()));
+            }
+        }
+        return null;
     }
 
     /**
@@ -54,19 +73,19 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
     protected Map<String, String> getDqlSourceCommand(CommandConfig commandConfig, boolean groupByPK) {
         // 获取过滤SQL
         String queryFilterSql = getQueryFilterSql(commandConfig.getFilter());
-        DatabaseConfig cfg = (DatabaseConfig) commandConfig.getConnectorConfig();
+        Table table = commandConfig.getTable();
+        List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(commandConfig.getOriginalTable());
 
         // 获取查询SQL
         Map<String, String> map = new HashMap<>();
-        String querySql = cfg.getSql();
+        String querySql = table.getSql();
 
         // 存在条件
         if (StringUtil.isNotBlank(queryFilterSql)) {
             querySql += queryFilterSql;
         }
         String quotation = buildSqlWithQuotation();
-        String pk = new StringBuilder(quotation).append(cfg.getPrimaryKey()).append(quotation).toString();
-        map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(new PageSql(querySql, pk)));
+        map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(new PageSql(querySql, quotation, primaryKeys)));
 
         // 获取查询总数SQL
         StringBuilder queryCount = new StringBuilder();
@@ -74,7 +93,9 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
 
         // Mysql
         if (groupByPK) {
-            queryCount.append(" GROUP BY ").append(pk);
+            queryCount.append(" GROUP BY ");
+            // id,id2
+            PrimaryKeyUtil.buildSql(queryCount, primaryKeys, quotation, ",", "", true);
         }
         queryCount.append(") DBSYNCER_T");
         map.put(ConnectorConstant.OPERTION_QUERY_COUNT, queryCount.toString());

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLOracleConnector.java

@@ -19,7 +19,7 @@ public final class DQLOracleConnector extends AbstractDQLConnector {
     }
 
     @Override
-    protected String buildSqlWithQuotation() {
+    public String buildSqlWithQuotation() {
         return "\"";
     }
 

+ 1 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLPostgreSQLConnector.java

@@ -19,7 +19,7 @@ public final class DQLPostgreSQLConnector extends AbstractDQLConnector {
     }
 
     @Override
-    protected String buildSqlWithQuotation() {
+    public String buildSqlWithQuotation() {
         return "\"";
     }
 }

+ 8 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java

@@ -3,16 +3,19 @@ package org.dbsyncer.connector.sql;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.model.PageSql;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
-public final class DQLSqlServerConnector extends AbstractDQLConnector {
+import java.util.List;
 
-    private final Logger logger = LoggerFactory.getLogger(getClass());
+public final class DQLSqlServerConnector extends AbstractDQLConnector {
 
     @Override
     public String getPageSql(PageSql config) {
-        return String.format(DatabaseConstant.SQLSERVER_PAGE_SQL, config.getPk(), config.getQuerySql());
+        String quotation = config.getQuotation();
+        List<String> primaryKeys = config.getPrimaryKeys();
+        StringBuilder orderBy = new StringBuilder();
+        PrimaryKeyUtil.buildSql(orderBy, primaryKeys, quotation, " AND ", " = ? ", true);
+        return String.format(DatabaseConstant.SQLSERVER_PAGE_SQL, orderBy.toString(), config.getQuerySql());
     }
 
     @Override

+ 6 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -11,6 +11,7 @@ import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.enums.TableTypeEnum;
 import org.dbsyncer.connector.model.PageSql;
 import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -32,7 +33,11 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
 
     @Override
     public String getPageSql(PageSql config) {
-        return String.format(DatabaseConstant.SQLSERVER_PAGE_SQL, config.getPk(), config.getQuerySql());
+        String quotation = config.getQuotation();
+        List<String> primaryKeys = config.getPrimaryKeys();
+        StringBuilder orderBy = new StringBuilder();
+        PrimaryKeyUtil.buildSql(orderBy, primaryKeys, quotation, " AND ", " = ? ", true);
+        return String.format(DatabaseConstant.SQLSERVER_PAGE_SQL, orderBy.toString(), config.getQuerySql());
     }
 
     @Override

+ 70 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/PrimaryKeyUtil.java

@@ -1,12 +1,18 @@
 package org.dbsyncer.connector.util;
 
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.Table;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public abstract class PrimaryKeyUtil {
 
@@ -16,28 +22,84 @@ public abstract class PrimaryKeyUtil {
      * @param table
      * @return
      */
-    public static String findOriginalTablePrimaryKey(Table table) {
+    public static List<String> findOriginalTablePrimaryKey(Table table) {
         if (null == table) {
-            return null;
+            throw new ConnectorException("The table is null.");
         }
 
         // 获取自定义主键
-        String pk = table.getPrimaryKey();
-        if (StringUtil.isNotBlank(pk)) {
-            return pk;
+        if (!CollectionUtils.isEmpty(table.getPrimaryKeys())) {
+            return Collections.unmodifiableList(table.getPrimaryKeys());
         }
 
         // 获取表原始主键
+        List<String> primaryKeys = new ArrayList<>();
         List<Field> column = table.getColumn();
         if (!CollectionUtils.isEmpty(column)) {
             for (Field c : column) {
-                if (c.isPk()) {
-                    return c.getName();
+                if (c.isPk() && !primaryKeys.contains(c.getName())) {
+                    primaryKeys.add(c.getName());
                 }
             }
         }
 
-        throw new ConnectorException(String.format("The primary key of table '%s' is null.", table.getName()));
+        if (CollectionUtils.isEmpty(primaryKeys)) {
+            throw new ConnectorException(String.format("The primary key of table '%s' is null.", table.getName()));
+        }
+        return Collections.unmodifiableList(primaryKeys);
+    }
+
+    public static void buildSql(StringBuilder sql, List<String> primaryKeys, String quotation, String join, String value, boolean skipFirst) {
+        AtomicBoolean added = new AtomicBoolean();
+        primaryKeys.forEach(pk -> {
+            // skip first pk
+            if (!skipFirst || added.get()) {
+                if (StringUtil.isNotBlank(join)) {
+                    sql.append(join);
+                }
+            }
+            sql.append(quotation).append(pk).append(quotation);
+            if (StringUtil.isNotBlank(value)) {
+                sql.append(value);
+            }
+            added.set(true);
+        });
     }
 
+    /**
+     * 获取最新游标值
+     *
+     * @param data
+     * @param primaryKeys
+     * @return
+     */
+    public static Object[] getLastCursors(List<Map> data, List<String> primaryKeys) {
+        if (CollectionUtils.isEmpty(data)) {
+            return null;
+        }
+        Map last = data.get(data.size() - 1);
+        Object[] cursors = new Object[primaryKeys.size()];
+        Iterator<String> iterator = primaryKeys.iterator();
+        int i = 0;
+        while (iterator.hasNext()) {
+            String pk = iterator.next();
+            cursors[i] = last.get(pk);
+            i++;
+        }
+        return cursors;
+    }
+
+    public static Object[] getLastCursors(String str) {
+        Object[] cursors = null;
+        if (StringUtil.isNotBlank(str)) {
+            String[] split = StringUtil.split(str, ",");
+            int length = split.length;
+            cursors = new Object[length];
+            for (int i = 0; i < length; i++) {
+                String val = split[i];
+                cursors[i] = NumberUtil.isCreatable(val) ? NumberUtil.toLong(val) : val;
+            }
+        }
+        return cursors;
+    }
 }

+ 1 - 1
dbsyncer-listener/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.2-RC</version>
+        <version>1.2.3-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-listener</artifactId>

+ 68 - 49
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractDatabaseExtractor.java

@@ -3,15 +3,20 @@ package org.dbsyncer.listener;
 import org.dbsyncer.common.event.RowChangedEvent;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.database.AbstractDatabaseConnector;
 import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.dbsyncer.connector.model.Field;
 import org.dbsyncer.connector.model.MetaInfo;
+import org.dbsyncer.connector.model.Table;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 import org.springframework.util.Assert;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author AE86
@@ -20,14 +25,14 @@ import java.util.Map;
  */
 public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
 
-    private DqlMapper dqlMapper;
+    private Map<String, DqlMapper> dqlMap = new ConcurrentHashMap<>();
 
     /**
      * 发送增量事件
      *
      * @param event
      */
-    protected void sendChangedEvent(RowChangedEvent event){
+    protected void sendChangedEvent(RowChangedEvent event) {
         changedEvent(event);
     }
 
@@ -37,16 +42,19 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
      * @param event
      */
     protected void sendDqlChangedEvent(RowChangedEvent event) {
-        if (null != event && event.getSourceTableName().equals(dqlMapper.tableName)) {
-            switch (event.getEvent()) {
-                case ConnectorConstant.OPERTION_UPDATE:
-                case ConnectorConstant.OPERTION_INSERT:
-                    event.setDataList(queryDqlData(event.getDataList()));
-                    break;
-                default:
-                    break;
+        if (null != event) {
+            DqlMapper dqlMapper = dqlMap.get(event.getSourceTableName());
+            if (null != dqlMapper) {
+                switch (event.getEvent()) {
+                    case ConnectorConstant.OPERTION_UPDATE:
+                    case ConnectorConstant.OPERTION_INSERT:
+                        event.setDataList(queryDqlData(dqlMapper, event.getDataList()));
+                        break;
+                    default:
+                        break;
+                }
+                changedEvent(event);
             }
-            changedEvent(event);
         }
     }
 
@@ -55,53 +63,66 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
      */
     protected void postProcessDqlBeforeInitialization() {
         DatabaseConnectorMapper mapper = (DatabaseConnectorMapper) connectorFactory.connect(connectorConfig);
-        DatabaseConfig cfg = mapper.getConfig();
-        final String tableName = cfg.getTable();
-        final String primaryKey = cfg.getPrimaryKey();
-        Assert.hasText(tableName, String.format("The table name '%s' is null.", tableName));
-        Assert.hasText(primaryKey, "The primaryKey is null.");
-        MetaInfo metaInfo = connectorFactory.getMetaInfo(mapper, tableName);
-        final List<Field> column = metaInfo.getColumn();
-        Assert.notEmpty(column, String.format("The column of table name '%s' is empty.", tableName));
-
-        String sql = cfg.getSql().toUpperCase();
-        sql = sql.replace("\t", " ");
-        sql = sql.replace("\r", " ");
-        sql = sql.replace("\n", " ");
-        StringBuilder querySql = new StringBuilder(cfg.getSql());
-        if(StringUtil.contains(sql, " WHERE ")){
-            querySql.append(" AND ");
-        }else{
-            querySql.append(" WHERE ");
+        AbstractDatabaseConnector connector = (AbstractDatabaseConnector) connectorFactory.getConnector(mapper);
+        String quotation = connector.buildSqlWithQuotation();
+
+        Map<String, String> tableMap = new HashMap<>();
+        mapper.getConfig().getSqlTables().forEach(s -> tableMap.put(s.getSqlName(), s.getTable()));
+
+        for (Table t : sourceTable) {
+            String sql = t.getSql();
+            String sqlName = t.getName();
+            List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(t);
+            String tableName = tableMap.get(sqlName);
+            Assert.hasText(sql, "The sql is null.");
+            Assert.hasText(tableName, "The tableName is null.");
+
+            MetaInfo metaInfo = connectorFactory.getMetaInfo(mapper, sqlName);
+            final List<Field> column = metaInfo.getColumn();
+            Assert.notEmpty(column, String.format("The column of table name '%s' is empty.", sqlName));
+
+            sql = sql.toUpperCase().replace("\t", " ");
+            sql = sql.replace("\r", " ");
+            sql = sql.replace("\n", " ");
+
+            StringBuilder querySql = new StringBuilder(sql);
+            boolean notContainsWhere = !StringUtil.contains(sql, " WHERE ");
+            querySql.append(notContainsWhere ? " WHERE " : " AND ");
+            PrimaryKeyUtil.buildSql(querySql, primaryKeys, quotation, " AND ", " = ? ", notContainsWhere);
+            DqlMapper dqlMapper = new DqlMapper(mapper, querySql.toString(), column, getPrimaryKeyIndexArray(column, primaryKeys));
+            dqlMap.putIfAbsent(tableName, dqlMapper);
         }
-        querySql.append(primaryKey).append("=?");
-        dqlMapper = new DqlMapper(mapper, querySql.toString(), tableName, column, getPKIndex(column, primaryKey));
     }
 
     /**
      * 获取主表主键索引
      *
      * @param column
-     * @param primaryKey
+     * @param primaryKeys
      * @return
      */
-    protected int getPKIndex(List<Field> column, String primaryKey) {
-        int pkIndex = 0;
-        boolean findPkIndex = false;
+    protected Integer[] getPrimaryKeyIndexArray(List<Field> column, List<String> primaryKeys) {
+        List<Integer> indexList = new ArrayList<>();
         for (Field f : column) {
-            if (f.getName().equals(primaryKey)) {
-                pkIndex = column.indexOf(f);
-                findPkIndex = true;
-                break;
+            if (primaryKeys.contains(f.getName())) {
+                indexList.add(column.indexOf(f));
             }
         }
-        Assert.isTrue(findPkIndex, "The primaryKey is invalid.");
-        return pkIndex;
+        Assert.isTrue(!CollectionUtils.isEmpty(indexList), "The primaryKeys is invalid.");
+        Integer[] indexArray = (Integer[]) indexList.toArray();
+        return indexArray;
     }
 
-    private List<Object> queryDqlData(List<Object> data) {
+    private List<Object> queryDqlData(DqlMapper dqlMapper, List<Object> data) {
         if (!CollectionUtils.isEmpty(data)) {
-            Map<String, Object> row = dqlMapper.mapper.execute(databaseTemplate -> databaseTemplate.queryForMap(dqlMapper.sql, data.get(dqlMapper.pkIndex)));
+            Map<String, Object> row = dqlMapper.mapper.execute(databaseTemplate -> {
+                int size = dqlMapper.primaryKeyIndexArray.length;
+                Object[] args = new Object[size];
+                for (int i = 0; i < size; i++) {
+                    args[i] = data.get(dqlMapper.primaryKeyIndexArray[i]);
+                }
+                return databaseTemplate.queryForMap(dqlMapper.sql, args);
+            });
             if (!CollectionUtils.isEmpty(row)) {
                 data.clear();
                 dqlMapper.column.forEach(field -> data.add(row.get(field.getName())));
@@ -113,15 +134,13 @@ public abstract class AbstractDatabaseExtractor extends AbstractExtractor {
     final class DqlMapper {
         DatabaseConnectorMapper mapper;
         String sql;
-        String tableName;
         List<Field> column;
-        int pkIndex;
+        Integer[] primaryKeyIndexArray;
 
-        public DqlMapper(DatabaseConnectorMapper mapper, String sql, String tableName, List<Field> column, int pkIndex) {
+        public DqlMapper(DatabaseConnectorMapper mapper, String sql, List<Field> column, Integer[] primaryKeyIndexArray) {
             this.mapper = mapper;
-            this.tableName = tableName;
             this.column = column;
-            this.pkIndex = pkIndex;
+            this.primaryKeyIndexArray = primaryKeyIndexArray;
             this.sql = sql;
         }
     }

+ 23 - 24
dbsyncer-listener/src/main/java/org/dbsyncer/listener/AbstractExtractor.java

@@ -6,6 +6,7 @@ import org.dbsyncer.common.model.AbstractConnectorConfig;
 import org.dbsyncer.common.scheduled.ScheduledTaskService;
 import org.dbsyncer.connector.ConnectorFactory;
 import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.model.Table;
 import org.dbsyncer.listener.config.ListenerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -13,7 +14,6 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -24,25 +24,19 @@ import java.util.concurrent.TimeUnit;
 public abstract class AbstractExtractor implements Extractor {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
-    protected String metaId;
     protected ConnectorFactory connectorFactory;
     protected ScheduledTaskService scheduledTaskService;
     protected AbstractConnectorConfig connectorConfig;
     protected ListenerConfig listenerConfig;
-    protected Map<String, String> snapshot;
     protected Set<String> filterTable;
-    private final List<Event> watcher = new CopyOnWriteArrayList<>();
-
-    @Override
-    public void addListener(Event event) {
-        if (null != event) {
-            watcher.add(event);
-        }
-    }
+    protected List<Table> sourceTable;
+    protected Map<String, String> snapshot;
+    protected String metaId;
+    protected Event watcher;
 
     @Override
-    public void clearAllListener() {
-        watcher.clear();
+    public void register(Event event) {
+        watcher = event;
     }
 
     @Override
@@ -69,23 +63,23 @@ public abstract class AbstractExtractor implements Extractor {
 
     @Override
     public void flushEvent() {
-        watcher.forEach(w -> w.flushEvent(snapshot));
+        watcher.flushEvent(snapshot);
     }
 
     @Override
     public void forceFlushEvent() {
         logger.info("Force flush:{}", snapshot);
-        watcher.forEach(w -> w.forceFlushEvent(snapshot));
+        watcher.forceFlushEvent(snapshot);
     }
 
     @Override
     public void errorEvent(Exception e) {
-        watcher.forEach(w -> w.errorEvent(e));
+        watcher.errorEvent(e);
     }
 
     @Override
     public void interruptException(Exception e) {
-        watcher.forEach(w -> w.interruptException(e));
+        watcher.interruptException(e);
     }
 
     protected void sleepInMills(long timeout) {
@@ -104,14 +98,10 @@ public abstract class AbstractExtractor implements Extractor {
      */
     private void processEvent(boolean permitEvent, RowChangedEvent event) {
         if (permitEvent) {
-            watcher.forEach(w -> w.changedEvent(event));
+            watcher.changedEvent(event);
         }
     }
 
-    public void setMetaId(String metaId) {
-        this.metaId = metaId;
-    }
-
     public void setConnectorFactory(ConnectorFactory connectorFactory) {
         this.connectorFactory = connectorFactory;
     }
@@ -128,11 +118,20 @@ public abstract class AbstractExtractor implements Extractor {
         this.listenerConfig = listenerConfig;
     }
 
+    public void setFilterTable(Set<String> filterTable) {
+        this.filterTable = filterTable;
+    }
+
+    public AbstractExtractor setSourceTable(List<Table> sourceTable) {
+        this.sourceTable = sourceTable;
+        return this;
+    }
+
     public void setSnapshot(Map<String, String> snapshot) {
         this.snapshot = snapshot;
     }
 
-    public void setFilterTable(Set<String> filterTable) {
-        this.filterTable = filterTable;
+    public void setMetaId(String metaId) {
+        this.metaId = metaId;
     }
 }

+ 2 - 7
dbsyncer-listener/src/main/java/org/dbsyncer/listener/Extractor.java

@@ -16,16 +16,11 @@ public interface Extractor {
     void close();
 
     /**
-     * 添加监听器(获取增量数据)
+     * 注册监听事件(获取增量数据)
      *
      * @param event
      */
-    void addListener(Event event);
-
-    /**
-     * 清空监听器
-     */
-    void clearAllListener();
+    void register(Event event);
 
     /**
      * 数据变更事件

+ 2 - 2
dbsyncer-listener/src/main/java/org/dbsyncer/listener/oracle/DqlOracleExtractor.java

@@ -24,9 +24,9 @@ public class DqlOracleExtractor extends OracleExtractor {
     }
 
     @Override
-    protected int getPKIndex(List<Field> column, String primaryKey) {
+    protected Integer[] getPrimaryKeyIndexArray(List<Field> column, List<String> primaryKeys) {
         // ROW_ID
-        return 0;
+        return new Integer[]{0};
     }
 
 }

+ 7 - 14
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/AbstractQuartzExtractor.java

@@ -9,6 +9,7 @@ import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.common.util.UUIDUtil;
 import org.dbsyncer.connector.config.ReaderConfig;
 import org.dbsyncer.connector.constant.ConnectorConstant;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 import org.dbsyncer.listener.AbstractExtractor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,15 +97,16 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
     private void execute(TableGroupCommand tableGroupCommand, int index) {
         final Map<String, String> command = tableGroupCommand.getCommand();
-        final String pk = tableGroupCommand.getPk();
+        final List<String> primaryKeys = tableGroupCommand.getPrimaryKeys();
 
         // 检查增量点
         ConnectorMapper connectionMapper = connectorFactory.connect(connectorConfig);
         Point point = checkLastPoint(command, index);
         int pageIndex = 1;
-        String cursor = snapshot.get(index + CURSOR);
+        Object[] cursors = PrimaryKeyUtil.getLastCursors(snapshot.get(index + CURSOR));
+
         while (running) {
-            Result reader = connectorFactory.reader(connectionMapper, new ReaderConfig(point.getCommand(), point.getArgs(), cursor, pageIndex++, READ_NUM));
+            Result reader = connectorFactory.reader(connectionMapper, new ReaderConfig(point.getCommand(), point.getArgs(), cursors, pageIndex++, READ_NUM));
             List<Map> data = reader.getSuccessData();
             if (CollectionUtils.isEmpty(data)) {
                 break;
@@ -133,19 +135,18 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
 
             }
             // 更新记录点
-            cursor = getLastCursor(data, pk);
+            cursors = PrimaryKeyUtil.getLastCursors(data, primaryKeys);
             point.refresh();
 
             if (data.size() < READ_NUM) {
                 break;
             }
-
         }
 
         // 持久化
         if (point.refreshed()) {
             snapshot.putAll(point.getPosition());
-            snapshot.put(index + CURSOR, cursor);
+            snapshot.put(index + CURSOR, StringUtil.join(cursors, ","));
         }
 
     }
@@ -154,12 +155,4 @@ public abstract class AbstractQuartzExtractor extends AbstractExtractor implemen
         this.commands = commands;
     }
 
-    private String getLastCursor(List<Map> data, String pk) {
-        if (!CollectionUtils.isEmpty(data)) {
-            Object value = data.get(data.size() - 1).get(pk);
-            return value == null ? "" : String.valueOf(value);
-        }
-        return "";
-    }
-
 }

+ 6 - 5
dbsyncer-listener/src/main/java/org/dbsyncer/listener/quartz/TableGroupCommand.java

@@ -1,20 +1,21 @@
 package org.dbsyncer.listener.quartz;
 
+import java.util.List;
 import java.util.Map;
 
 public class TableGroupCommand {
 
-    private String pk;
+    private List<String> primaryKeys;
 
     private Map<String, String> command;
 
-    public TableGroupCommand(String pk, Map<String, String> command) {
-        this.pk = pk;
+    public TableGroupCommand(List<String> primaryKeys, Map<String, String> command) {
+        this.primaryKeys = primaryKeys;
         this.command = command;
     }
 
-    public String getPk() {
-        return pk;
+    public List<String> getPrimaryKeys() {
+        return primaryKeys;
     }
 
     public Map<String, String> getCommand() {

+ 1 - 1
dbsyncer-manager/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.2-RC</version>
+        <version>1.2.3-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-manager</artifactId>

+ 5 - 3
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/FullPuller.java

@@ -1,6 +1,8 @@
 package org.dbsyncer.manager.puller;
 
 import org.dbsyncer.common.util.NumberUtil;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.util.PrimaryKeyUtil;
 import org.dbsyncer.manager.Manager;
 import org.dbsyncer.parser.Parser;
 import org.dbsyncer.parser.enums.ParserEnum;
@@ -100,7 +102,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         task.setPageIndex(NumberUtil.toInt(snapshot.get(ParserEnum.PAGE_INDEX.getCode()), ParserEnum.PAGE_INDEX.getDefaultValue()));
         // 反序列化游标值类型(通常为数字或字符串类型)
         String cursorValue = snapshot.get(ParserEnum.CURSOR.getCode());
-        task.setCursor(NumberUtil.isCreatable(cursorValue) ? NumberUtil.toLong(cursorValue) : cursorValue);
+        task.setCursors(PrimaryKeyUtil.getLastCursors(cursorValue));
         task.setTableGroupIndex(NumberUtil.toInt(snapshot.get(ParserEnum.TABLE_GROUP_INDEX.getCode()), ParserEnum.TABLE_GROUP_INDEX.getDefaultValue()));
         flush(task);
 
@@ -111,7 +113,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
                 break;
             }
             task.setPageIndex(ParserEnum.PAGE_INDEX.getDefaultValue());
-            task.setCursor(null);
+            task.setCursors(null);
             task.setTableGroupIndex(++i);
             flush(task);
         }
@@ -136,7 +138,7 @@ public class FullPuller extends AbstractPuller implements ApplicationListener<Fu
         meta.setEndTime(task.getEndTime());
         Map<String, String> snapshot = meta.getSnapshot();
         snapshot.put(ParserEnum.PAGE_INDEX.getCode(), String.valueOf(task.getPageIndex()));
-        snapshot.put(ParserEnum.CURSOR.getCode(), String.valueOf(task.getCursor()));
+        snapshot.put(ParserEnum.CURSOR.getCode(), StringUtil.join(task.getCursors(), ","));
         snapshot.put(ParserEnum.TABLE_GROUP_INDEX.getCode(), String.valueOf(task.getTableGroupIndex()));
         manager.editConfigModel(meta);
     }

+ 33 - 38
dbsyncer-manager/src/main/java/org/dbsyncer/manager/puller/IncrementPuller.java

@@ -30,7 +30,6 @@ import org.dbsyncer.parser.model.TableGroup;
 import org.dbsyncer.parser.util.PickerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
@@ -47,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -81,10 +79,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
     @Resource
     private ConnectorFactory connectorFactory;
 
-    @Qualifier("taskExecutor")
-    @Resource
-    private Executor taskExecutor;
-
     private Map<String, Extractor> map = new ConcurrentHashMap<>();
 
     @PostConstruct
@@ -127,7 +121,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
     public void close(String metaId) {
         Extractor extractor = map.get(metaId);
         if (null != extractor) {
-            extractor.clearAllListener();
             extractor.close();
         }
         map.remove(metaId);
@@ -141,50 +134,56 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         map.forEach((k, v) -> v.flushEvent());
     }
 
-    private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta)
-            throws InstantiationException, IllegalAccessException {
+    private AbstractExtractor getExtractor(Mapping mapping, Connector connector, List<TableGroup> list, Meta meta) throws InstantiationException, IllegalAccessException {
         AbstractConnectorConfig connectorConfig = connector.getConfig();
         ListenerConfig listenerConfig = mapping.getListener();
 
         // timing/log
         final String listenerType = listenerConfig.getListenerType();
 
+        AbstractExtractor extractor = null;
         // 默认定时抽取
         if (ListenerTypeEnum.isTiming(listenerType)) {
-            AbstractQuartzExtractor extractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
-            extractor.setCommands(list.stream().map(t -> {
-                String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(t.getSourceTable());
-                return new TableGroupCommand(pk, t.getCommand());
+            AbstractQuartzExtractor quartzExtractor = listener.getExtractor(ListenerTypeEnum.TIMING, connectorConfig.getConnectorType(), AbstractQuartzExtractor.class);
+            quartzExtractor.setCommands(list.stream().map(t -> {
+                List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(t.getSourceTable());
+                return new TableGroupCommand(primaryKeys, t.getCommand());
             }).collect(Collectors.toList()));
-            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getSnapshot(), new QuartzListener(mapping, list));
-            return extractor;
+            quartzExtractor.register(new QuartzListener(mapping, list));
+            extractor = quartzExtractor;
         }
 
         // 基于日志抽取
         if (ListenerTypeEnum.isLog(listenerType)) {
-            AbstractExtractor extractor = listener.getExtractor(ListenerTypeEnum.LOG, connectorConfig.getConnectorType(), AbstractExtractor.class);
+            extractor = listener.getExtractor(ListenerTypeEnum.LOG, connectorConfig.getConnectorType(), AbstractExtractor.class);
+            extractor.register(new LogListener(mapping, list, extractor));
+        }
+
+        if (null != extractor) {
             Set<String> filterTable = new HashSet<>();
-            LogListener logListener = new LogListener(mapping, list, extractor);
-            logListener.getTablePicker().forEach((k, fieldPickers) -> filterTable.add(k));
+            List<Table> sourceTable = new ArrayList<>();
+            list.forEach(t -> {
+                Table table = t.getSourceTable();
+                if (!filterTable.contains(t.getName())) {
+                    sourceTable.add(table);
+                }
+                filterTable.add(table.getName());
+            });
+
+            extractor.setConnectorFactory(connectorFactory);
+            extractor.setScheduledTaskService(scheduledTaskService);
+            extractor.setConnectorConfig(connectorConfig);
+            extractor.setListenerConfig(listenerConfig);
             extractor.setFilterTable(filterTable);
-            setExtractorConfig(extractor, connectorConfig, listenerConfig, meta.getSnapshot(), logListener);
+            extractor.setSourceTable(sourceTable);
+            extractor.setSnapshot(meta.getSnapshot());
+            extractor.setMetaId(meta.getId());
             return extractor;
         }
 
         throw new ManagerException("未知的监听配置.");
     }
 
-    private void setExtractorConfig(AbstractExtractor extractor, AbstractConnectorConfig connector, ListenerConfig listener,
-                                    Map<String, String> snapshot, AbstractListener event) {
-        extractor.setConnectorFactory(connectorFactory);
-        extractor.setScheduledTaskService(scheduledTaskService);
-        extractor.setConnectorConfig(connector);
-        extractor.setListenerConfig(listener);
-        extractor.setSnapshot(snapshot);
-        extractor.addListener(event);
-        extractor.setMetaId(event.metaId);
-    }
-
     abstract class AbstractListener implements Event {
         private static final int FLUSH_DELAYED_SECONDS = 30;
         protected Mapping mapping;
@@ -245,11 +244,11 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
 
         private List<FieldPicker> tablePicker;
 
-        public QuartzListener(Mapping mapping, List<TableGroup> list) {
+        public QuartzListener(Mapping mapping, List<TableGroup> tableGroups) {
             this.mapping = mapping;
             this.metaId = mapping.getMetaId();
             this.tablePicker = new LinkedList<>();
-            list.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
+            tableGroups.forEach(t -> tablePicker.add(new FieldPicker(PickerUtil.mergeTableGroupConfig(mapping, t))));
         }
 
         @Override
@@ -290,13 +289,13 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
         private AtomicInteger eventCounter;
         private static final int MAX_LOG_CACHE_SIZE = 128;
 
-        public LogListener(Mapping mapping, List<TableGroup> list, Extractor extractor) {
+        public LogListener(Mapping mapping, List<TableGroup> tableGroups, Extractor extractor) {
             this.mapping = mapping;
             this.metaId = mapping.getMetaId();
             this.extractor = extractor;
             this.tablePicker = new LinkedHashMap<>();
             this.eventCounter = new AtomicInteger();
-            list.forEach(t -> {
+            tableGroups.forEach(t -> {
                 final Table table = t.getSourceTable();
                 final String tableName = table.getName();
                 List<Field> pkList = t.getTargetTable().getColumn().stream().filter(field -> field.isPk()).collect(Collectors.toList());
@@ -328,10 +327,6 @@ public class IncrementPuller extends AbstractPuller implements ScheduledTaskJob
                 eventCounter.set(0);
             }
         }
-
-        public Map<String, List<FieldPicker>> getTablePicker() {
-            return tablePicker;
-        }
     }
 
 }

+ 1 - 1
dbsyncer-monitor/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.2-RC</version>
+        <version>1.2.3-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-monitor</artifactId>

+ 1 - 1
dbsyncer-parser/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.2-RC</version>
+        <version>1.2.3-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-parser</artifactId>

+ 7 - 16
dbsyncer-parser/src/main/java/org/dbsyncer/parser/ParserFactory.java

@@ -52,6 +52,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -139,6 +140,7 @@ public class ParserFactory implements Parser {
             for (Table t : connector.getTable()) {
                 if (t.getName().equals(tableName)) {
                     metaInfo.setTableType(t.getType());
+                    metaInfo.setSql(t.getSql());
                     break;
                 }
             }
@@ -152,8 +154,8 @@ public class ParserFactory implements Parser {
         AbstractConnectorConfig tConnConfig = getConnectorConfig(mapping.getTargetConnectorId());
         Table sourceTable = tableGroup.getSourceTable();
         Table targetTable = tableGroup.getTargetTable();
-        Table sTable = new Table(sourceTable.getName(), sourceTable.getType(), sourceTable.getPrimaryKey(), new ArrayList<>());
-        Table tTable = new Table(targetTable.getName(), targetTable.getType(), targetTable.getPrimaryKey(), new ArrayList<>());
+        Table sTable = new Table(sourceTable.getName(), sourceTable.getType(), sourceTable.getPrimaryKeys(), new ArrayList<>(), sourceTable.getSql());
+        Table tTable = new Table(targetTable.getName(), targetTable.getType(), targetTable.getPrimaryKeys(), new ArrayList<>(), sourceTable.getSql());
         List<FieldMapping> fieldMapping = tableGroup.getFieldMapping();
         if (!CollectionUtils.isEmpty(fieldMapping)) {
             fieldMapping.forEach(m -> {
@@ -247,7 +249,7 @@ public class ParserFactory implements Parser {
         Assert.notEmpty(fieldMapping, String.format("数据源表[%s]同步到目标源表[%s], 映射关系不能为空.", sTableName, tTableName));
         // 获取同步字段
         Picker picker = new Picker(fieldMapping);
-        String pk = PrimaryKeyUtil.findOriginalTablePrimaryKey(tableGroup.getSourceTable());
+        List<String> primaryKeys = PrimaryKeyUtil.findOriginalTablePrimaryKey(tableGroup.getSourceTable());
 
         int pageSize = mapping.getReadNum();
         int batchSize = mapping.getBatchNum();
@@ -263,7 +265,7 @@ public class ParserFactory implements Parser {
             }
 
             // 1、获取数据源数据
-            Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), task.getCursor(), task.getPageIndex(), pageSize));
+            Result reader = connectorFactory.reader(sConnectorMapper, new ReaderConfig(command, new ArrayList<>(), task.getCursors(), task.getPageIndex(), pageSize));
             List<Map> source = reader.getSuccessData();
             if (CollectionUtils.isEmpty(source)) {
                 logger.info("完成全量同步任务:{}, [{}] >> [{}]", metaId, sTableName, tTableName);
@@ -290,7 +292,7 @@ public class ParserFactory implements Parser {
 
             // 7、更新结果
             task.setPageIndex(task.getPageIndex() + 1);
-            task.setCursor(getLastCursor(source, pk));
+            task.setCursors(PrimaryKeyUtil.getLastCursors(source, primaryKeys));
             result.setTableGroupId(tableGroup.getId());
             result.setTargetTableGroupName(tTableName);
             flush(task, result);
@@ -425,15 +427,4 @@ public class ParserFactory implements Parser {
         return getConnector(connectorId).getConfig();
     }
 
-    /**
-     * 获取最新游标值
-     *
-     * @param source
-     * @param pk
-     * @return
-     */
-    private Object getLastCursor(List<Map> source, String pk) {
-        return CollectionUtils.isEmpty(source) ? null : source.get(source.size() - 1).get(pk);
-    }
-
 }

+ 5 - 5
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/Task.java

@@ -10,7 +10,7 @@ public class Task {
 
     private int pageIndex;
 
-    private Object cursor;
+    private Object[] cursors;
 
     private long beginTime;
 
@@ -56,12 +56,12 @@ public class Task {
         this.pageIndex = pageIndex;
     }
 
-    public Object getCursor() {
-        return cursor;
+    public Object[] getCursors() {
+        return cursors;
     }
 
-    public void setCursor(Object cursor) {
-        this.cursor = cursor;
+    public void setCursors(Object[] cursors) {
+        this.cursors = cursors;
     }
 
     public long getBeginTime() {

+ 1 - 1
dbsyncer-plugin/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.2-RC</version>
+        <version>1.2.3-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-plugin</artifactId>

+ 1 - 1
dbsyncer-storage/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.2-RC</version>
+        <version>1.2.3-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-storage</artifactId>

+ 3 - 1
dbsyncer-storage/src/main/java/org/dbsyncer/storage/support/MysqlStorageServiceImpl.java

@@ -401,7 +401,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
         }
 
         List<Field> fields = executor.getFields();
-        final SqlBuilderConfig config = new SqlBuilderConfig(connector, "", table, ConfigConstant.CONFIG_MODEL_ID, fields, "", "");
+        List<String> primaryKeys = new ArrayList<>();
+        primaryKeys.add(ConfigConstant.CONFIG_MODEL_ID);
+        final SqlBuilderConfig config = new SqlBuilderConfig(connector, "", table, primaryKeys, fields, "", "");
 
         String query = SqlBuilderEnum.QUERY.getSqlBuilder().buildQuerySql(config);
         String insert = SqlBuilderEnum.INSERT.getSqlBuilder().buildSql(config);

+ 1 - 1
dbsyncer-web/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.2-RC</version>
+        <version>1.2.3-RC</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-web</artifactId>

+ 1 - 1
dbsyncer-web/src/main/resources/application.properties

@@ -41,7 +41,7 @@ management.endpoints.web.exposure.include=*
 management.endpoint.health.show-details=always
 management.health.elasticsearch.enabled=false
 info.app.name=DBSyncer
-info.app.version=1.2.2-RC
+info.app.version=1.2.3-RC
 info.app.copyright=&copy;2022 ${info.app.name}(${info.app.version})<footer>Designed By <a href='https://gitee.com/ghi/dbsyncer' target='_blank' >AE86</a></footer>
 
 #All < Trace < Debug < Info < Warn < Error < Fatal < OFF

+ 5 - 26
dbsyncer-web/src/main/resources/public/connector/addDqlMysql.html

@@ -14,32 +14,6 @@
                    th:value="${connector?.config?.password}"/>
         </div>
     </div>
-    <div class="form-group">
-        <label class="col-sm-2 control-label">SQL <strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-10">
-            <textarea id="sql" name="sql" class="form-control" maxlength="8192" dbsyncer-valid="require" rows="10"
-                      th:text="${connector?.config?.sql}?:'SELECT T1.* FROM USER T1'"></textarea>
-        </div>
-    </div>
-    <div class="form-group">
-        <div class="col-sm-6"></div>
-        <div class="col-sm-6 text-right">
-            <a href="javascript:beautifySql();"><span class="fa fa-magic fa-1x fa-flip-horizontal dbsyncer_pointer"
-                                                      title="美化SQL"></span>美化SQL</a>
-        </div>
-    </div>
-    <div class="form-group">
-        <label class="col-sm-2 control-label">主表 <strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-4">
-            <input class="form-control" name="table" type="text" maxlength="32" dbsyncer-valid="require"
-                   placeholder="USER" th:value="${connector?.config?.table}"/>
-        </div>
-        <label class="col-sm-2 control-label">主键 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="如果使用的表没有主键,可以自定义主键(大小写必须一致,添加表映射关系时,主键不能为空)。"></i><strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-4">
-            <input class="form-control" name="primaryKey" type="text" maxlength="32" dbsyncer-valid="require"
-                   placeholder="ID" th:value="${connector?.config?.primaryKey}"/>
-        </div>
-    </div>
     <div class="form-group">
         <label class="col-sm-2 control-label">URL <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-10">
@@ -49,10 +23,15 @@
         </div>
     </div>
 
+    <!-- SQL配置 -->
+    <div th:replace="connector/addSQL :: content"></div>
+
     <script type="text/javascript">
         $(function () {
             // 初始化select插件
             initSelectIndex($(".select-control"), 1);
+
+            $("#sql").val("SELECT T1.* FROM USER T1");
         })
     </script>
 </div>

+ 15 - 30
dbsyncer-web/src/main/resources/public/connector/addDqlOracle.html

@@ -15,31 +15,11 @@
         </div>
     </div>
     <div class="form-group">
-        <label class="col-sm-2 control-label">SQL <strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-10">
-            <textarea id="sql" name="sql" class="form-control dbsyncer_textarea_resize_none" maxlength="8192"
-                      dbsyncer-valid="require" rows="10"
-                      th:text="${connector?.config?.sql}?:'SELECT T1.*,ROWIDTOCHAR(ROWID) as RID FROM &quot;USER&quot; T1'"></textarea>
-        </div>
-    </div>
-    <div class="form-group">
-        <div class="col-sm-6"></div>
-        <div class="col-sm-6 text-right">
-            <a href="javascript:beautifySql();"><span class="fa fa-magic fa-1x fa-flip-horizontal dbsyncer_pointer"
-                                                      title="美化SQL"></span>美化SQL</a>
-        </div>
-    </div>
-    <div class="form-group">
-        <label class="col-sm-2 control-label">主表 <strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-4">
-            <input class="form-control" name="table" type="text" maxlength="32" dbsyncer-valid="require"
-                   placeholder="USER" th:value="${connector?.config?.table}"/>
-        </div>
-        <label class="col-sm-2 control-label">主键 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="如果使用的表没有主键,可以自定义主键(大小写必须一致,添加表映射关系时,主键不能为空)。"></i><strong class="driverVerifcateRequired">*</strong></label>
+        <label class="col-sm-2 control-label">架构名 </label>
         <div class="col-sm-4">
-            <input class="form-control" name="primaryKey" type="text" maxlength="32" dbsyncer-valid="require"
-                   placeholder="ID" th:value="${connector?.config?.primaryKey}"/>
+            <input class="form-control" name="schema" type="text" maxlength="32" th:value="${connector?.config?.schema}"/>
         </div>
+        <div class="col-sm-6"></div>
     </div>
     <div class="form-group">
         <label class="col-sm-2 control-label">URL <strong class="driverVerifcateRequired">*</strong></label>
@@ -47,19 +27,24 @@
             <textarea name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="512" dbsyncer-valid="require" rows="5" th:text="${connector?.config?.url}?:'jdbc:oracle:thin:@127.0.0.1:1521:ORCL'"></textarea>
         </div>
     </div>
-    <div class="form-group">
-        <label class="col-sm-2 control-label">架构名 </label>
-        <div class="col-sm-4">
-            <input class="form-control" name="schema" type="text" maxlength="32" th:value="${connector?.config?.schema}"/>
-        </div>
-        <div class="col-sm-6"></div>
-    </div>
     <div class="form-group">
         <label class="col-sm-2 control-label">驱动 </label>
         <div class="col-sm-10">
             <input class="form-control" readonly="true" name="driverClassName" type="text" value="oracle.jdbc.OracleDriver" />
         </div>
     </div>
+
+    <!-- SQL配置 -->
+    <div th:replace="connector/addSQL :: content"></div>
+
+    <script type="text/javascript">
+        $(function () {
+            // 初始化select插件
+            initSelectIndex($(".select-control"), 1);
+
+            $("#sql").val("SELECT T1.*,ROWIDTOCHAR(ROWID) as RID FROM \"USER\" T1");
+        })
+    </script>
 </div>
 
 </html>

+ 5 - 27
dbsyncer-web/src/main/resources/public/connector/addDqlPostgreSQL.html

@@ -15,33 +15,6 @@
                    th:value="${connector?.config?.password}" type="password"/>
         </div>
     </div>
-    <div class="form-group">
-        <label class="col-sm-2 control-label">SQL <strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-10">
-            <textarea class="form-control dbsyncer_textarea_resize_none" maxlength="8192" dbsyncer-valid="require"
-                      id="sql" name="sql" rows="10"
-                      th:text="${connector?.config?.sql}?:'SELECT T1.* FROM &quot;USER&quot; T1'"></textarea>
-        </div>
-    </div>
-    <div class="form-group">
-        <div class="col-sm-6"></div>
-        <div class="col-sm-6 text-right">
-            <a href="javascript:beautifySql();"><span class="fa fa-magic fa-1x fa-flip-horizontal dbsyncer_pointer"
-                                                      title="美化SQL"></span>美化SQL</a>
-        </div>
-    </div>
-    <div class="form-group">
-        <label class="col-sm-2 control-label">主表 <strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-4">
-            <input class="form-control" name="table" type="text" maxlength="32" dbsyncer-valid="require"
-                   placeholder="USER" th:value="${connector?.config?.table}"/>
-        </div>
-        <label class="col-sm-2 control-label">主键 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="如果使用的表没有主键,可以自定义主键(大小写必须一致,添加表映射关系时,主键不能为空)。"></i><strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-4">
-            <input class="form-control" name="primaryKey" type="text" maxlength="32" dbsyncer-valid="require"
-                   placeholder="ID" th:value="${connector?.config?.primaryKey}"/>
-        </div>
-    </div>
     <div class="form-group">
         <label class="col-sm-2 control-label">URL <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-10">
@@ -81,6 +54,9 @@
         </div>
     </div>
 
+    <!-- SQL配置 -->
+    <div th:replace="connector/addSQL :: content"></div>
+
     <script type="text/javascript">
         $(function () {
             $('#dropSlotOnCloseSwitch').bootstrapSwitch({
@@ -92,6 +68,8 @@
             });
             // 初始化select插件
             initSelectIndex($(".select-control"), 1);
+
+            $("#sql").val("SELECT T1.* FROM \"USER\" T1");
         })
     </script>
 </div>

+ 12 - 27
dbsyncer-web/src/main/resources/public/connector/addDqlSqlServer.html

@@ -14,33 +14,6 @@
                    th:value="${connector?.config?.password}"/>
         </div>
     </div>
-    <div class="form-group">
-        <label class="col-sm-2 control-label">SQL <strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-10">
-            <textarea id="sql" name="sql" class="form-control dbsyncer_textarea_resize_none" maxlength="1024"
-                      dbsyncer-valid="require" rows="10"
-                      th:text="${connector?.config?.sql}?:'SELECT T1.* FROM USER T1'"></textarea>
-        </div>
-    </div>
-    <div class="form-group">
-        <div class="col-sm-6"></div>
-        <div class="col-sm-6 text-right">
-            <a href="javascript:beautifySql();"><span class="fa fa-magic fa-1x fa-flip-horizontal dbsyncer_pointer"
-                                                      title="美化SQL"></span>美化SQL</a>
-        </div>
-    </div>
-    <div class="form-group">
-        <label class="col-sm-2 control-label">主表 <strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-4">
-            <input class="form-control" name="table" type="text" maxlength="32" dbsyncer-valid="require"
-                   placeholder="USER" th:value="${connector?.config?.table}"/>
-        </div>
-        <label class="col-sm-2 control-label">主键 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="如果使用的表没有主键,可以自定义主键(大小写必须一致,添加表映射关系时,主键不能为空)。"></i><strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-4">
-            <input class="form-control" name="primaryKey" type="text" maxlength="32" dbsyncer-valid="require"
-                   placeholder="ID" th:value="${connector?.config?.primaryKey}"/>
-        </div>
-    </div>
     <div class="form-group">
         <label class="col-sm-2 control-label">URL <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-10">
@@ -60,6 +33,18 @@
             <input class="form-control" readonly="true" name="driverClassName" type="text" th:value="${connector?.config?.driverClassName} ?: 'com.microsoft.sqlserver.jdbc.SQLServerDriver'" />
         </div>
     </div>
+
+    <!-- SQL配置 -->
+    <div th:replace="connector/addSQL :: content"></div>
+
+    <script type="text/javascript">
+        $(function () {
+            // 初始化select插件
+            initSelectIndex($(".select-control"), 1);
+
+            $("#sql").val("SELECT T1.* FROM USER T1");
+        })
+    </script>
 </div>
 
 </html>

+ 4 - 11
dbsyncer-web/src/main/resources/public/connector/addElasticsearch.html

@@ -14,21 +14,14 @@
         </div>
     </div>
     <div class="form-group">
-        <label class="col-sm-2 control-label">index <strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-4">
-            <input class="form-control" name="index" type="text" maxlength="32" dbsyncer-valid="require" placeholder="test" th:value="${connector?.config?.index}?:'test'"/>
-        </div>
-        <label class="col-sm-2 control-label">主键 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="如果使用的表没有主键,可以自定义主键(大小写必须一致,添加表映射关系时,主键不能为空)。"></i><strong class="driverVerifcateRequired">*</strong></label>
+        <label class="col-sm-2 control-label">URL <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="多个使用英文逗号,例如:http://192.168.1.100:9200,http://192.168.1.200:9200"></i> <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-4">
-            <input class="form-control" name="primaryKey" type="text" maxlength="32" dbsyncer-valid="require" placeholder="id" th:value="${connector?.config?.primaryKey}?:'id'"/>
+            <input class="form-control" name="url" type="text" maxlength="1024" dbsyncer-valid="require" th:value="${connector?.config?.url}?:'http://127.0.0.1:9200'"/>
         </div>
-    </div>
-    <div class="form-group">
-        <label class="col-sm-2 control-label">地址 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="多个使用英文逗号,例如:http://192.168.1.100:9200,http://192.168.1.200:9200"></i> <strong class="driverVerifcateRequired">*</strong></label>
+        <label class="col-sm-2 control-label">index <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-4">
-            <textarea id="sql" name="url" class="form-control dbsyncer_textarea_resize_none" maxlength="1024" dbsyncer-valid="require" rows="2" th:text="${connector?.config?.url}?:'http://127.0.0.1:9200'"></textarea>
+            <input class="form-control" name="index" type="text" maxlength="32" dbsyncer-valid="require" placeholder="test" th:value="${connector?.config?.index}?:'test'"/>
         </div>
-        <div class="col-sm-6"></div>
     </div>
 
     <script type="text/javascript">

+ 5 - 5
dbsyncer-web/src/main/resources/public/connector/addFile.html

@@ -23,14 +23,14 @@
         <label class="col-sm-2 control-label">schema <i class="fa fa-question-circle fa_gray" aria-hidden="true"
                                                         title="支持10种字段类型。name字段名, typeName类型名称, type类型编码, pk是否为主键"></i><strong
                 class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-9">
+        <div class="col-sm-10">
             <textarea id="schema" name="schema" class="form-control dbsyncer_textarea_resize_none" maxlength="4096"
                       dbsyncer-valid="require" rows="20" th:text="${connector?.config?.schema}"></textarea>
         </div>
-        <div class="col-sm-1">
-            <button type="button" class="btn btn-default" onclick="format()">
-                <span class="fa fa-magic"></span>美化
-            </button>
+    </div>
+    <div class="form-group">
+        <div class="col-sm-12 text-right">
+            <a href="javascript:format();"><span class="fa fa-magic fa-1x fa-flip-horizontal dbsyncer_pointer" title="美化"></span>美化</a>
         </div>
     </div>
 

+ 7 - 7
dbsyncer-web/src/main/resources/public/connector/addKafka.html

@@ -4,9 +4,9 @@
 
 <div th:fragment="content">
     <div class="form-group">
-        <label class="col-sm-2 control-label">地址 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="多个使用英文逗号,例如:192.168.1.100:9092,192.168.1.200:9092"></i> <strong class="driverVerifcateRequired">*</strong></label>
+        <label class="col-sm-2 control-label">URL <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="多个使用英文逗号,例如:192.168.1.100:9092,192.168.1.200:9092"></i> <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-4">
-            <textarea name="bootstrapServers" class="form-control dbsyncer_textarea_resize_none" maxlength="1024" dbsyncer-valid="require" rows="2" th:text="${connector?.config?.bootstrapServers}?:'127.0.0.1:9092'"></textarea>
+            <input class="form-control" name="bootstrapServers" type="text" maxlength="1024" dbsyncer-valid="require" th:value="${connector?.config?.bootstrapServers}?:'127.0.0.1:9092'"/>
         </div>
         <label class="col-sm-2 control-label">Topic <strong class="driverVerifcateRequired">*</strong></label>
         <div class="col-sm-4">
@@ -16,13 +16,13 @@
 
     <div class="form-group">
         <label class="col-sm-2 control-label">字段 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="支持11种字段类型。name字段名, typeName类型名称, type类型编码, pk是否为主键"></i><strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-9">
+        <div class="col-sm-10">
             <textarea id="fields" name="fields" class="form-control dbsyncer_textarea_resize_none" maxlength="4096" dbsyncer-valid="require" rows="20" th:text="${connector?.config?.fields}"></textarea>
         </div>
-        <div class="col-sm-1">
-            <button type="button" class="btn btn-default" onclick="format()">
-                <span class="fa fa-magic"></span>美化
-            </button>
+    </div>
+    <div class="form-group">
+        <div class="col-sm-12 text-right">
+            <a href="javascript:format();"><span class="fa fa-magic fa-1x fa-flip-horizontal dbsyncer_pointer" title="美化"></span>美化</a>
         </div>
     </div>
 

+ 253 - 0
dbsyncer-web/src/main/resources/public/connector/addSQL.html

@@ -0,0 +1,253 @@
+<!DOCTYPE html>
+<html xmlns="http://www.w3.org/1999/xhtml"
+      xmlns:th="http://www.thymeleaf.org" lang="zh-CN">
+
+<div th:fragment="content">
+    <!-- SQL配置 -->
+    <div class="page-header"></div>
+
+    <!-- 已添加的SQL配置 -->
+    <div class="form-group">
+        <label class="col-sm-2 control-label">SQL表</label>
+        <div class="col-sm-4">
+            <select id="sqlTableSelect" class="form-control select-control">
+                <option th:each="sqlTable,state:${connector?.config?.sqlTables}" th:text="${sqlTable?.sqlName}" />
+            </select>
+        </div>
+        <div class="col-sm-6 text-right">
+            <button id="delSqlTableBtn" type="button" class="btn btn-default hidden"> <span class="fa fa-remove"></span>删除</button>
+            <button id="editSqlTableBtn" type="button" class="btn btn-default hidden"> <span class="fa fa-pencil"></span>修改</button>
+            <button id="addSqlTableBtn" type="button" class="btn btn-primary"> <span class="fa fa-plus"></span>添加(<span id="sqlTableCount">[[${connector?.config?.sqlTables?.size()} ?: 0]]</span>)</button>
+        </div>
+        <input type="hidden" id="sqlTableParams" name="sqlTableParams"/>
+    </div>
+
+    <div class="form-group">
+        <label class="col-sm-2 control-label">SQL名称</label>
+        <div class="col-sm-4">
+            <input class="form-control" id="sqlName" type="text" maxlength="32" placeholder="ERP用户表"/>
+        </div>
+        <label class="col-sm-2 control-label">主表 <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="用于增量同步,根据监听的主表获取增量数据"></i></label>
+        <div class="col-sm-4">
+            <input class="form-control" id="table" type="text" maxlength="32" placeholder="USER"/>
+        </div>
+    </div>
+    <div class="form-group">
+        <label class="col-sm-2 control-label">SQL</label>
+        <div class="col-sm-10">
+            <textarea id="sql" name="sql" class="sql form-control dbsyncer_textarea_resize_none" maxlength="8192" rows="10"></textarea>
+        </div>
+    </div>
+    <div class="form-group">
+        <div class="col-sm-12 text-right">
+            <a href="javascript:beautifySql();"><span class="fa fa-magic fa-1x fa-flip-horizontal dbsyncer_pointer" title="美化SQL"></span>美化SQL</a>
+        </div>
+    </div>
+
+<script type="text/javascript" th:inline="javascript">
+    $(function () {
+        let sqlTables = [[${connector?.config?.sqlTables}]];
+        // SQL配置模板
+        let $template = {
+            selector: initSelectIndex($("#sqlTableSelect"), 1),
+            addSqlTableBtn: $("#addSqlTableBtn"),
+            editSqlTableBtn: $("#editSqlTableBtn"),
+            delSqlTableBtn: $("#delSqlTableBtn"),
+            sqlTableCount: $("#sqlTableCount"),
+            sqlTableParams: $("#sqlTableParams"),
+            sqlTableArray: sqlTables == null ? [] : sqlTables,
+            sqlNameInput: $("#sqlName"),
+            tableInput: $("#table"),
+            sqlInput: $("#sql"),
+            // 禁用按钮
+            before: function ($btn) {
+                $btn.prop('disabled', true);
+            },
+            // 校验表单
+            validate: function (callback) {
+                let itemSqlName = this.validateItem(this.sqlNameInput, "SQL名称不能空.");
+                if (!itemSqlName) {
+                    return;
+                }
+                let itemTable = this.validateItem(this.tableInput, "主表不能空.");
+                if (!itemTable) {
+                    return;
+                }
+                let itemSql = this.validateItem(this.sqlInput, "SQL不能空.");
+                if (!itemSql) {
+                    return;
+                }
+                callback();
+            },
+            // 校验参数
+            validateItem: function ($input, $msg) {
+                if (isBlank($input.val())) {
+                    bootGrowl($msg, "danger");
+                    return false;
+                }
+                return true;
+            },
+            // 加入新项到下拉
+            addOption: function () {
+                let sqlName = this.sqlNameInput.val();
+                this.sqlTableArray.push({"sqlName": sqlName, "table": this.tableInput.val(), "sql": this.sqlInput.val()});
+                this.selector.append('<option>' + sqlName + '</option>');
+                this.selector.selectpicker('refresh');
+                this.selector.selectpicker('val', sqlName);
+                this.stash();
+            },
+            // 删除下拉选中项
+            removeOption: function () {
+                this.removeSqlTable(this.selectedVal());
+                this.selector.find('option:selected').remove();
+                this.selector.selectpicker('refresh');
+                this.stash();
+            },
+            // 暂存配置
+            stash: function () {
+                this.sqlTableParams.val(JSON.stringify(this.sqlTableArray));
+            },
+            // 清空表单
+            clear: function () {
+                this.sqlNameInput.val('');
+                this.tableInput.val('');
+                this.sqlInput.val('');
+            },
+            // 显示默认配置
+            popStash: function(){
+                if (this.hasOption()) {
+                    this.selector.selectpicker('val', this.sqlTableArray[0].sqlName);
+                }
+            },
+            // 回显下拉选中配置
+            apply: function () {
+                let sqlTable = this.getSqlTable(this.selectedVal());
+                if (sqlTable != null) {
+                    this.sqlNameInput.val(sqlTable.sqlName);
+                    this.tableInput.val(sqlTable.table);
+                    this.sqlInput.val(sqlTable.sql);
+                }
+                this.reset();
+            },
+            // 显示操作菜单
+            showMenu: function () {
+                if (this.hasOption()) {
+                    this.editSqlTableBtn.removeClass("hidden");
+                    this.delSqlTableBtn.removeClass("hidden");
+                } else {
+                    this.editSqlTableBtn.addClass("hidden");
+                    this.delSqlTableBtn.addClass("hidden");
+                }
+            },
+            // 更新添加按钮数量显示
+            refreshCount: function () {
+                this.sqlTableCount.text(this.sqlTableArray.length);
+            },
+            // 是否有配置
+            hasOption: function () {
+                return this.sqlTableArray.length > 0;
+            },
+            // 下拉选择值
+            selectedVal: function () {
+                return this.selector.selectpicker('val');
+            },
+            // 清空美化记录
+            reset: function () {
+                this.sqlInput.removeAttr('tmp');
+            },
+            // 恢复按钮
+            after: function ($btn) {
+                $btn.removeAttr('disabled');
+            },
+            getSqlTable: function (sqlName) {
+                if (!isBlank(sqlName)) {
+                    let tLen = this.sqlTableArray.length;
+                    for (let j = 0; j < tLen; j++) {
+                        if (this.sqlTableArray[j].sqlName == sqlName) {
+                            return this.sqlTableArray[j];
+                        }
+                    }
+                }
+                return null;
+            },
+            removeSqlTable: function (sqlName) {
+                let newArray = [];
+                if (!isBlank(sqlName)) {
+                    let tLen = this.sqlTableArray.length;
+                    for (let j = 0; j < tLen; j++) {
+                        if (this.sqlTableArray[j].sqlName != sqlName) {
+                            newArray.push(this.sqlTableArray[j]);
+                        }
+                    }
+                }
+                this.sqlTableArray = newArray;
+            }
+        }
+
+        $template.showMenu();
+        $template.selector.on('changed.bs.select', function () {
+            $template.apply();
+        });
+        $template.stash();
+        $template.popStash();
+        bindSqlTableAddEvent($template);
+        bindSqlTableEditEvent($template);
+        bindSqlTableDelEvent($template);
+    })
+
+    function bindSqlTableAddEvent($template) {
+        $template.addSqlTableBtn.click(function () {
+            $template.before($(this));
+            $template.validate(function () {
+                // 重复校验
+                let newSqlName = $template.sqlNameInput.val();
+                let sqlTable = $template.getSqlTable(newSqlName);
+                if (sqlTable != null) {
+                    bootGrowl(newSqlName + " 已存在.", "danger");
+                } else {
+                    $template.addOption();
+                    $template.refreshCount();
+                    $template.showMenu();
+                }
+            });
+            $template.reset();
+            $template.after($(this));
+        });
+    }
+    function bindSqlTableEditEvent($template) {
+        $template.editSqlTableBtn.click(function () {
+            $template.before($(this));
+            $template.validate(function () {
+                let oldSqlName = $template.selectedVal();
+                if (!isBlank(oldSqlName)) {
+                    let newSqlName = $template.sqlNameInput.val();
+                    // 重命名的SQL名称已存在
+                    if (oldSqlName != newSqlName && $template.getSqlTable(newSqlName) != null) {
+                        bootGrowl(newSqlName + " 已存在.", "danger");
+                        return;
+                    }
+                    $template.removeOption();
+                    $template.addOption();
+                } else {
+                    bootGrowl("修改无效.", "danger");
+                }
+            });
+            $template.reset();
+            $template.after($(this));
+        });
+    }
+    function bindSqlTableDelEvent($template) {
+        $template.delSqlTableBtn.click(function () {
+            $template.before($(this));
+            $template.removeOption();
+            $template.refreshCount();
+            $template.clear();
+            $template.showMenu();
+            $template.popStash();
+            $template.reset();
+            $template.after($(this));
+        });
+    }
+</script>
+</div>
+</html>

+ 5 - 4
dbsyncer-web/src/main/resources/static/js/common.js

@@ -38,7 +38,7 @@ function backIndexPage(projectGroupId) {
 
 // 美化SQL
 function beautifySql(){
-    var $sql = $("#sql");
+    var $sql = $(".sql");
     var $tmp = $sql.attr('tmp');
     if(null == $tmp){
         $sql.attr('tmp', $sql.val());
@@ -51,10 +51,10 @@ function beautifySql(){
 
 // 初始化select组件,默认选中
 function initSelectIndex($select, $selectedIndex){
-    initSelect($select);
+    let select = initSelect($select);
 
     if($selectedIndex < 0){
-        return;
+        return select;
     }
 
     $.each($select, function () {
@@ -66,9 +66,10 @@ function initSelectIndex($select, $selectedIndex){
             }
         }
     });
+    return select;
 }
 function initSelect($select){
-    $select.selectpicker({
+    return $select.selectpicker({
         "style":'dbsyncer_btn-info',
         "title":"请选择",
         "actionsBox":true,

+ 2 - 2
dbsyncer-web/src/main/resources/static/js/monitor/index.js

@@ -69,9 +69,9 @@ function bindQueryErrorDetailEvent() {
         var html = '<div class="row driver_break_word">' + json + '</div>';
         BootstrapDialog.show({
             title: "异常详细",
-            type: BootstrapDialog.TYPE_INFO,
+            size: BootstrapDialog.SIZE_WIDE,
             message: html,
-            size: BootstrapDialog.SIZE_NORMAL,
+            type: BootstrapDialog.TYPE_WARNING,
             buttons: [{
                 label: "关闭",
                 action: function (dialog) {

+ 1 - 1
pom.xml

@@ -6,7 +6,7 @@
 
     <groupId>org.ghi</groupId>
     <artifactId>dbsyncer</artifactId>
-    <version>1.2.2-RC</version>
+    <version>1.2.3-RC</version>
     <packaging>pom</packaging>
     <name>dbsyncer</name>
     <url>https://gitee.com/ghi/dbsyncer</url>