ソースを参照

!58 merge
Merge pull request !58 from AE86/V_1.0.0_Beta

AE86 3 年 前
コミット
c0576dac4f
33 ファイル変更446 行追加244 行削除
  1. 4 4
      dbsyncer-biz/pom.xml
  2. 4 4
      dbsyncer-cache/pom.xml
  3. 1 1
      dbsyncer-cluster/pom.xml
  4. 1 1
      dbsyncer-common/pom.xml
  5. 6 1
      dbsyncer-connector/pom.xml
  6. 71 62
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java
  7. 5 2
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleConnection.java
  8. 5 0
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BigintSetter.java
  9. 8 6
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java
  10. 1 7
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java
  11. 5 11
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java
  12. 5 13
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java
  13. 0 13
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java
  14. 5 13
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java
  15. 0 59
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnectorMapper.java
  16. 7 4
      dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java
  17. 68 0
      dbsyncer-connector/src/main/test/SqlServerConnectionTest.java
  18. 1 1
      dbsyncer-listener/pom.xml
  19. 8 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java
  20. 115 0
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java
  21. 16 27
      dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java
  22. 1 1
      dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java
  23. 93 0
      dbsyncer-listener/src/main/test/PGReplicationTest.java
  24. 1 1
      dbsyncer-manager/pom.xml
  25. 1 1
      dbsyncer-monitor/pom.xml
  26. 1 1
      dbsyncer-parser/pom.xml
  27. 1 1
      dbsyncer-plugin/pom.xml
  28. 1 1
      dbsyncer-storage/pom.xml
  29. 2 0
      dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java
  30. 5 5
      dbsyncer-storage/src/main/test/LuceneFactoryTest.java
  31. 1 1
      dbsyncer-web/pom.xml
  32. 1 1
      dbsyncer-web/src/main/resources/application.properties
  33. 2 2
      pom.xml

+ 4 - 4
dbsyncer-biz/pom.xml

@@ -3,10 +3,10 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<parent>
-		<artifactId>dbsyncer</artifactId>
-		<groupId>org.ghi</groupId>
-		<version>1.1.6-Beta</version>
-	</parent>
+        <artifactId>dbsyncer</artifactId>
+        <groupId>org.ghi</groupId>
+        <version>1.1.7-Beta</version>
+    </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-biz</artifactId>
 

+ 4 - 4
dbsyncer-cache/pom.xml

@@ -2,10 +2,10 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<parent>
-		<artifactId>dbsyncer</artifactId>
-		<groupId>org.ghi</groupId>
-		<version>1.1.6-Beta</version>
-	</parent>
+        <artifactId>dbsyncer</artifactId>
+        <groupId>org.ghi</groupId>
+        <version>1.1.7-Beta</version>
+    </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-cache</artifactId>
 

+ 1 - 1
dbsyncer-cluster/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.6-Beta</version>
+        <version>1.1.7-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-cluster</artifactId>

+ 1 - 1
dbsyncer-common/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.6-Beta</version>
+        <version>1.1.7-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-common</artifactId>

+ 6 - 1
dbsyncer-connector/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.6-Beta</version>
+        <version>1.1.7-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-connector</artifactId>
@@ -66,6 +66,11 @@
             <artifactId>kafka-clients</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-log4j2</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>

+ 71 - 62
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -31,8 +31,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    protected abstract String getTableSql(DatabaseConfig config);
-
     @Override
     public ConnectorMapper connect(DatabaseConfig config) {
         try {
@@ -59,16 +57,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return String.format("%s-%s", config.getUrl(), config.getUsername());
     }
 
-    @Override
-    public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
-        String sql = getTableSql(connectorMapper.getConfig());
-        List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
-        if (!CollectionUtils.isEmpty(tableNames)) {
-            return tableNames.stream().map(name -> new Table(name)).collect(Collectors.toList());
-        }
-        return Collections.EMPTY_LIST;
-    }
-
     @Override
     public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) {
         String quotation = buildSqlWithQuotation();
@@ -222,6 +210,15 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return map;
     }
 
+    /**
+     * 健康检查
+     *
+     * @return
+     */
+    protected String getValidationQuery() {
+        return "select 1";
+    }
+
     /**
      * 查询语句表名和字段带上引号(默认不加)
      *
@@ -231,6 +228,21 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return "";
     }
 
+    /**
+     * 获取表列表
+     *
+     * @param connectorMapper
+     * @param sql
+     * @return
+     */
+    protected List<Table> getTable(DatabaseConnectorMapper connectorMapper, String sql) {
+        List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
+        if (!CollectionUtils.isEmpty(tableNames)) {
+            return tableNames.stream().map(name -> new Table(name)).collect(Collectors.toList());
+        }
+        return Collections.EMPTY_LIST;
+    }
+
     /**
      * 获取查询条件SQL
      *
@@ -267,6 +279,37 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return sql.toString();
     }
 
+    /**
+     * 根据过滤条件获取查询SQL
+     *
+     * @param queryOperator and/or
+     * @param filter
+     * @return
+     */
+    private String getFilterSql(String queryOperator, List<Filter> filter) {
+        List<Filter> list = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), queryOperator)).collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(list)) {
+            return "";
+        }
+
+        int size = list.size();
+        int end = size - 1;
+        StringBuilder sql = new StringBuilder();
+        sql.append("(");
+        Filter c = null;
+        String quotation = buildSqlWithQuotation();
+        for (int i = 0; i < size; i++) {
+            c = list.get(i);
+            // "USER" = 'zhangsan'
+            sql.append(quotation).append(c.getName()).append(quotation).append(c.getFilter()).append("'").append(c.getValue()).append("'");
+            if (i < end) {
+                sql.append(" ").append(queryOperator).append(" ");
+            }
+        }
+        sql.append(")");
+        return sql.toString();
+    }
+
     /**
      * 获取查询SQL
      *
@@ -319,15 +362,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return SqlBuilderEnum.getSqlBuilder(type).buildSql(config);
     }
 
-    /**
-     * 健康检查
-     *
-     * @return
-     */
-    protected String getValidationQuery() {
-        return "select 1";
-    }
-
     /**
      * 获取数据库表元数据信息
      *
@@ -399,34 +433,26 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
     }
 
     /**
-     * 根据过滤条件获取查询SQL
+     * 返回表主键
      *
-     * @param queryOperator and/or
-     * @param filter
+     * @param md
+     * @param tableName
      * @return
+     * @throws SQLException
      */
-    private String getFilterSql(String queryOperator, List<Filter> filter) {
-        List<Filter> list = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), queryOperator)).collect(Collectors.toList());
-        if (CollectionUtils.isEmpty(list)) {
-            return "";
-        }
-
-        int size = list.size();
-        int end = size - 1;
-        StringBuilder sql = new StringBuilder();
-        sql.append("(");
-        Filter c = null;
-        String quotation = buildSqlWithQuotation();
-        for (int i = 0; i < size; i++) {
-            c = list.get(i);
-            // "USER" = 'zhangsan'
-            sql.append(quotation).append(c.getName()).append(quotation).append(c.getFilter()).append("'").append(c.getValue()).append("'");
-            if (i < end) {
-                sql.append(" ").append(queryOperator).append(" ");
+    private List<String> findTablePrimaryKeys(DatabaseMetaData md, String tableName) throws SQLException {
+        //根据表名获得主键结果集
+        ResultSet rs = null;
+        List<String> primaryKeys = new ArrayList<>();
+        try {
+            rs = md.getPrimaryKeys(null, null, tableName);
+            while (rs.next()) {
+                primaryKeys.add(rs.getString("COLUMN_NAME"));
             }
+        } finally {
+            DatabaseUtil.close(rs);
         }
-        sql.append(")");
-        return sql.toString();
+        return primaryKeys;
     }
 
     /**
@@ -485,9 +511,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         try {
             // 2、设置参数
             int execute = connectorMapper.execute(databaseTemplate ->
-                    databaseTemplate.update(sql, (ps) ->
-                            batchRowsSetter(databaseTemplate.getConnection(), ps, fields, row)
-                    )
+                    databaseTemplate.update(sql, (ps) -> batchRowsSetter(databaseTemplate.getConnection(), ps, fields, row))
             );
             if (execute == 0) {
                 throw new ConnectorException(String.format("尝试执行[%s]失败", event));
@@ -517,19 +541,4 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return !CollectionUtils.isEmpty(pk) && pk.contains(name);
     }
 
-    private List<String> findTablePrimaryKeys(DatabaseMetaData md, String tableName) throws SQLException {
-        //根据表名获得主键结果集
-        ResultSet rs = null;
-        List<String> primaryKeys = new ArrayList<>();
-        try {
-            rs = md.getPrimaryKeys(null, null, tableName);
-            while (rs.next()) {
-                primaryKeys.add(rs.getString("COLUMN_NAME"));
-            }
-        } finally {
-            DatabaseUtil.close(rs);
-        }
-        return primaryKeys;
-    }
-
 }

+ 5 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleConnection.java

@@ -301,12 +301,15 @@ public class SimpleConnection implements Connection {
 
     @Override
     public <T> T unwrap(Class<T> iface) throws SQLException {
-        return null;
+        if (iface.isAssignableFrom(connection.getClass())) {
+            return iface.cast(connection);
+        }
+        throw new SQLException("Cannot unwrap to " + iface.getName());
     }
 
     @Override
     public boolean isWrapperFor(Class<?> iface) throws SQLException {
-        return false;
+        return iface.isAssignableFrom(connection.getClass());
     }
 
     public Connection getConnection() {

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BigintSetter.java

@@ -28,6 +28,11 @@ public class BigintSetter extends AbstractSetter<Long> {
             ps.setLong(i, bitInt.longValue());
             return;
         }
+        if (val instanceof Integer) {
+            Integer integer = (Integer) val;
+            ps.setLong(i, integer);
+            return;
+        }
         throw new ConnectorException(String.format("BigintSetter can not find type [%s], val [%s]", type, val));
     }
 }

+ 8 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -1,16 +1,14 @@
 package org.dbsyncer.connector.mysql;
 
-import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.PageSqlConfig;
+import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
+import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 
-public final class MysqlConnector extends AbstractDatabaseConnector {
+import java.util.List;
 
-    @Override
-    protected String getTableSql(DatabaseConfig config) {
-        return "show tables";
-    }
+public final class MysqlConnector extends AbstractDatabaseConnector {
 
     @Override
     public String getPageSql(PageSqlConfig config) {
@@ -22,4 +20,8 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
         return new Object[]{(pageIndex - 1) * pageSize, pageSize};
     }
 
+    @Override
+    public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
+        return super.getTable(connectorMapper, "show tables");
+    }
 }

+ 1 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.connector.oracle;
 
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.PageSqlConfig;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.constant.DatabaseConstant;
@@ -15,14 +14,9 @@ import java.util.stream.Collectors;
 
 public final class OracleConnector extends AbstractDatabaseConnector {
 
-    @Override
-    protected String getTableSql(DatabaseConfig config) {
-        return "SELECT TABLE_NAME,TABLE_TYPE FROM USER_TAB_COMMENTS";
-    }
-
     @Override
     public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
-        String sql = getTableSql(connectorMapper.getConfig());
+        final String sql = "SELECT TABLE_NAME,TABLE_TYPE FROM USER_TAB_COMMENTS";
         List<Map<String, Object>> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql));
         if (!CollectionUtils.isEmpty(list)) {
             return list.stream().map(r -> new Table(r.get("TABLE_NAME").toString(), r.get("TABLE_TYPE").toString())).collect(Collectors.toList());

+ 5 - 11
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -14,20 +14,18 @@ import java.util.List;
 
 public final class PostgreSQLConnector extends AbstractDatabaseConnector {
 
-    @Override
-    protected String getTableSql(DatabaseConfig config) {
-        return String.format("SELECT TABLENAME FROM PG_TABLES WHERE SCHEMANAME ='%s'", config.getSchema());
-    }
-
     @Override
     public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
         List<Table> list = new LinkedList<>();
         DatabaseConfig config = connectorMapper.getConfig();
-        List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(getTableSql(config), String.class));
+        final String queryTableSql = String.format("SELECT TABLENAME FROM PG_TABLES WHERE SCHEMANAME ='%s'", config.getSchema());
+        List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(queryTableSql, String.class));
         if (!CollectionUtils.isEmpty(tableNames)) {
             tableNames.forEach(name -> list.add(new Table(name, TableTypeEnum.TABLE.getCode())));
         }
-        List<String> tableViewNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(getTableViewSql(config), String.class));
+
+        final String queryTableViewSql = String.format("SELECT VIEWNAME FROM PG_VIEWS WHERE SCHEMANAME ='%s'", config.getSchema());
+        List<String> tableViewNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(queryTableViewSql, String.class));
         if (!CollectionUtils.isEmpty(tableViewNames)) {
             tableViewNames.forEach(name -> list.add(new Table(name, TableTypeEnum.VIEW.getCode())));
         }
@@ -48,8 +46,4 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
     protected String buildSqlWithQuotation() {
         return "\"";
     }
-
-    private String getTableViewSql(DatabaseConfig config) {
-        return String.format("SELECT VIEWNAME FROM PG_VIEWS WHERE SCHEMANAME ='%s'", config.getSchema());
-    }
 }

+ 5 - 13
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.connector.sql;
 
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
@@ -20,11 +19,6 @@ import java.util.Map;
  */
 public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
 
-    @Override
-    protected String getTableSql(DatabaseConfig config) {
-        throw new ConnectorException("Unsupported method.");
-    }
-
     @Override
     public List<Table> getTable(DatabaseConnectorMapper config) {
         DatabaseConfig cfg = config.getConfig();
@@ -50,10 +44,10 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
      * 获取DQL源配置
      *
      * @param commandConfig
-     * @param appendGroupByPK
+     * @param groupByPK
      * @return
      */
-    protected Map<String, String> getDqlSourceCommand(CommandConfig commandConfig, boolean appendGroupByPK) {
+    protected Map<String, String> getDqlSourceCommand(CommandConfig commandConfig, boolean groupByPK) {
         // 获取过滤SQL
         List<Filter> filter = commandConfig.getFilter();
         String queryFilterSql = getQueryFilterSql(filter);
@@ -73,12 +67,10 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
 
         // 获取查询总数SQL
         StringBuilder queryCount = new StringBuilder();
-        queryCount.append("SELECT COUNT(1) FROM (").append(table.getName());
-        if (StringUtil.isNotBlank(queryFilterSql)) {
-            queryCount.append(queryFilterSql);
-        }
+        queryCount.append("SELECT COUNT(1) FROM (").append(querySql);
+
         // Mysql
-        if (appendGroupByPK) {
+        if (groupByPK) {
             queryCount.append(" GROUP BY ").append(pk);
         }
         queryCount.append(") DBSYNCER_T");

+ 0 - 13
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java

@@ -2,11 +2,8 @@ package org.dbsyncer.connector.sql;
 
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
-import org.dbsyncer.connector.ConnectorMapper;
-import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.PageSqlConfig;
 import org.dbsyncer.connector.constant.DatabaseConstant;
-import org.dbsyncer.connector.sqlserver.SqlServerConnectorMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -14,16 +11,6 @@ public final class DQLSqlServerConnector extends AbstractDQLConnector {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    @Override
-    public ConnectorMapper connect(DatabaseConfig config) {
-        try {
-            return new SqlServerConnectorMapper(config);
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-            throw new ConnectorException(e.getMessage());
-        }
-    }
-
     @Override
     public String getPageSql(PageSqlConfig config) {
         if (StringUtil.isBlank(config.getPk())) {

+ 5 - 13
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -2,7 +2,6 @@ package org.dbsyncer.connector.sqlserver;
 
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
-import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.CommandConfig;
 import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.PageSqlConfig;
@@ -10,10 +9,12 @@ import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
+import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public final class SqlServerConnector extends AbstractDatabaseConnector {
@@ -21,18 +22,9 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Override
-    public ConnectorMapper connect(DatabaseConfig config) {
-        try {
-            return new SqlServerConnectorMapper(config);
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-            throw new ConnectorException(e.getMessage());
-        }
-    }
-
-    @Override
-    protected String getTableSql(DatabaseConfig config) {
-        return String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s') AND IS_MS_SHIPPED = 0", config.getSchema());
+    public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
+        DatabaseConfig config = connectorMapper.getConfig();
+        return super.getTable(connectorMapper, String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s') AND IS_MS_SHIPPED = 0", config.getSchema()));
     }
 
     @Override

+ 0 - 59
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnectorMapper.java

@@ -1,59 +0,0 @@
-package org.dbsyncer.connector.sqlserver;
-
-import org.dbsyncer.connector.ConnectorException;
-import org.dbsyncer.connector.config.DatabaseConfig;
-import org.dbsyncer.connector.database.DatabaseConnectorMapper;
-import org.dbsyncer.connector.database.DatabaseTemplate;
-import org.dbsyncer.connector.database.HandleCallback;
-import org.dbsyncer.connector.util.DatabaseUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.dao.EmptyResultDataAccessException;
-
-import java.sql.Connection;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-public final class SqlServerConnectorMapper extends DatabaseConnectorMapper {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-    private final Lock lock = new ReentrantLock(true);
-
-    public SqlServerConnectorMapper(DatabaseConfig config) {
-        super(config);
-    }
-
-    /**
-     * 使用连接时加锁(SqlServer 2008以下版本连接未释放问题)
-     *
-     * @param callback
-     * @return
-     */
-    @Override
-    public <T> T execute(HandleCallback callback) {
-        final Lock connectionLock = lock;
-        boolean locked = false;
-        Object apply = null;
-        Connection connection = null;
-        try {
-            locked = connectionLock.tryLock(60, TimeUnit.SECONDS);
-            if (locked) {
-                connection = getConnection();
-                apply = callback.apply(new DatabaseTemplate(connection));
-            }
-        } catch (EmptyResultDataAccessException e) {
-            throw e;
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-            throw new ConnectorException(e.getMessage());
-        } finally {
-            if (locked) {
-                DatabaseUtil.close(connection);
-                connectionLock.unlock();
-            }
-        }
-        return (T) apply;
-    }
-
-}

+ 7 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java

@@ -1,5 +1,6 @@
 package org.dbsyncer.connector.util;
 
+import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.ConnectorException;
 
 import java.sql.Connection;
@@ -12,10 +13,12 @@ public abstract class DatabaseUtil {
     }
 
     public static Connection getConnection(String driverClassName, String url, String username, String password) throws SQLException {
-        try {
-            Class.forName(driverClassName);
-        } catch (ClassNotFoundException e) {
-            throw new ConnectorException(e.getCause());
+        if (StringUtil.isNotBlank(driverClassName)) {
+            try {
+                Class.forName(driverClassName);
+            } catch (ClassNotFoundException e) {
+                throw new ConnectorException(e.getCause());
+            }
         }
         return DriverManager.getConnection(url, username, password);
     }

+ 68 - 0
dbsyncer-connector/src/main/test/SqlServerConnectionTest.java

@@ -0,0 +1,68 @@
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.database.DatabaseConnectorMapper;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.util.concurrent.*;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/4/11 20:19
+ */
+public class SqlServerConnectionTest {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Test
+    public void testConnection() throws InterruptedException {
+        DatabaseConfig config = new DatabaseConfig();
+        config.setUrl("jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test");
+        config.setUsername("sa");
+        config.setPassword("123");
+        config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+        final DatabaseConnectorMapper connectorMapper = new DatabaseConnectorMapper(config);
+
+        // 模拟并发
+        final int threadSize = 100;
+        final ExecutorService pool = Executors.newFixedThreadPool(threadSize);
+        final CyclicBarrier barrier = new CyclicBarrier(threadSize);
+        final CountDownLatch latch = new CountDownLatch(threadSize);
+        for (int i = 0; i < threadSize; i++) {
+            final int k = i + 3;
+            pool.submit(() -> {
+                try {
+                    barrier.await();
+
+                    // 模拟操作
+                    System.out.println(String.format("%s %s:%s", LocalDateTime.now(), Thread.currentThread().getName(), k));
+
+                    Object execute = connectorMapper.execute(tem -> tem.queryForObject("select 1", Integer.class));
+                    System.out.println(String.format("%s %s:%s execute=>%s", LocalDateTime.now(), Thread.currentThread().getName(), k, execute));
+
+                } catch (InterruptedException e) {
+                    logger.error(e.getMessage());
+                } catch (BrokenBarrierException e) {
+                    logger.error(e.getMessage());
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+
+        try {
+            latch.await();
+            logger.info("try to shutdown");
+            pool.shutdown();
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        }
+
+        TimeUnit.SECONDS.sleep(3);
+        logger.info("test end");
+    }
+}

+ 1 - 1
dbsyncer-listener/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.6-Beta</version>
+        <version>1.1.7-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-listener</artifactId>

+ 8 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java

@@ -31,6 +31,10 @@ public enum ListenerEnum {
      * log_SqlServer
      */
     LOG_SQL_SERVER(ListenerTypeEnum.LOG.getType() + ConnectorEnum.SQL_SERVER.getType(), SqlServerExtractor.class),
+    /**
+     * log_PostgreSQL
+     */
+//    LOG_POSTGRE_SQL(ListenerTypeEnum.LOG.getType() + ConnectorEnum.POSTGRE_SQL.getType(), PostgreSQLExtractor.class),
     /**
      * log_Kafka
      */
@@ -47,6 +51,10 @@ public enum ListenerEnum {
      * timing_SqlServer
      */
     TIMING_SQL_SERVER(ListenerTypeEnum.TIMING.getType() + ConnectorEnum.SQL_SERVER.getType(), DatabaseQuartzExtractor.class),
+    /**
+     * timing_PostgreSQL
+     */
+    TIMING_POSTGRE_SQL(ListenerTypeEnum.TIMING.getType() + ConnectorEnum.POSTGRE_SQL.getType(), DatabaseQuartzExtractor.class),
     /**
      * timing_Elasticsearch
      */

+ 115 - 0
dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java

@@ -0,0 +1,115 @@
+package org.dbsyncer.listener.postgresql;
+
+import org.dbsyncer.connector.config.DatabaseConfig;
+import org.dbsyncer.connector.database.DatabaseConnectorMapper;
+import org.dbsyncer.connector.util.DatabaseUtil;
+import org.dbsyncer.listener.AbstractExtractor;
+import org.dbsyncer.listener.ListenerException;
+import org.postgresql.PGConnection;
+import org.postgresql.replication.PGReplicationStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/4/10 22:36
+ */
+public class PostgreSQLExtractor extends AbstractExtractor {
+
+    private static final String GET_VALIDATION = "SELECT 1";
+    private static final String GET_ROLE = "SELECT r.rolcanlogin AS rolcanlogin, r.rolreplication AS rolreplication, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rds_superuser') AS BOOL) IS TRUE AS aws_superuser, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rdsadmin') AS BOOL) IS TRUE AS aws_admin, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rdsrepladmin') AS BOOL) IS TRUE AS aws_repladmin FROM pg_roles r WHERE r.rolname = current_user";
+    private static final String GET_WAL_LEVEL = "SHOW WAL_LEVEL";
+    private static final String DEFAULT_WAL_LEVEL = "logical";
+    private static final String DEFAULT_SLOT_NAME = "DBSYNCER_SLOT";
+    private static final String DEFAULT_PLUGIN_NAME = "wal2json";
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+    private final Lock connectLock = new ReentrantLock();
+    private volatile boolean connected;
+    private Connection connection;
+    private PGReplicationStream stream;
+    private DatabaseConfig config;
+    private DatabaseConnectorMapper connectorMapper;
+
+    @Override
+    public void start() {
+        try {
+            connectLock.lock();
+            if (connected) {
+                logger.error("PostgreSQLExtractor is already started");
+                return;
+            }
+
+            connect();
+
+            connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_VALIDATION, Integer.class));
+            logger.info("Successfully tested connection for {} with user '{}'", config.getUrl(), config.getUsername());
+
+            final String walLevel = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_WAL_LEVEL, String.class));
+            if (!DEFAULT_WAL_LEVEL.equals(walLevel)) {
+                throw new ListenerException(String.format("Postgres server wal_level property must be \"%s\" but is: %s", DEFAULT_WAL_LEVEL, walLevel));
+            }
+
+            final boolean hasAuth = connectorMapper.execute(databaseTemplate -> {
+                Map rs = databaseTemplate.queryForObject(GET_ROLE, Map.class);
+                Boolean login = (Boolean) rs.getOrDefault("rolcanlogin", false);
+                Boolean replication = (Boolean) rs.getOrDefault("rolreplication", false);
+                Boolean superuser = (Boolean) rs.getOrDefault("aws_superuser", false);
+                Boolean admin = (Boolean) rs.getOrDefault("aws_admin", false);
+                Boolean replicationAdmin = (Boolean) rs.getOrDefault("aws_repladmin", false);
+                return login && (replication || superuser || admin || replicationAdmin);
+            });
+            if (!hasAuth) {
+                throw new ListenerException(String.format("Postgres roles LOGIN and REPLICATION are not assigned to user: %s", config.getUsername()));
+            }
+            connected = true;
+        } catch (Exception e) {
+            logger.error("启动失败:{}", e.getMessage());
+            throw new ListenerException(e);
+        } finally {
+            connectLock.unlock();
+            close();
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            connectLock.lock();
+            connected = false;
+            DatabaseUtil.close(stream);
+            DatabaseUtil.close(connection);
+        } catch (Exception e) {
+            logger.error("关闭失败:{}", e.getMessage());
+        } finally {
+            connectLock.unlock();
+        }
+    }
+
+    private void connect() throws SQLException {
+        if (connectorFactory.isAlive(connectorConfig)) {
+            config = (DatabaseConfig) connectorConfig;
+            connectorMapper = (DatabaseConnectorMapper) connectorFactory.connect(config);
+
+            connection = DatabaseUtil.getConnection(config.getDriverClassName(), config.getUrl(), config.getUsername(), config.getPassword());
+            PGConnection replConnection = connection.unwrap(PGConnection.class);
+            replConnection.getReplicationAPI()
+                    .createReplicationSlot()
+                    .logical()
+                    .withSlotName(DEFAULT_SLOT_NAME)
+                    .withOutputPlugin(DEFAULT_PLUGIN_NAME)
+                    .make();
+            stream = replConnection.getReplicationAPI()
+                    .replicationStream()
+                    .logical()
+                    .withSlotName(DEFAULT_SLOT_NAME)
+                    .start();
+        }
+    }
+}

+ 16 - 27
dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java

@@ -18,7 +18,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -48,12 +47,9 @@ public class SqlServerExtractor extends AbstractExtractor {
 
     private static final String LSN_POSITION = "position";
     private static final long DEFAULT_POLL_INTERVAL_MILLIS = 300;
-    private static final int PREPARED_STATEMENT_CACHE_CAPACITY = 500;
     private static final int OFFSET_COLUMNS = 4;
-    private final Map<String, PreparedStatement> preparedStatementCache = new ConcurrentHashMap<>(PREPARED_STATEMENT_CACHE_CAPACITY);
     private final Lock connectLock = new ReentrantLock();
     private volatile boolean connected;
-    private volatile boolean connectionClosed;
     private static Set<String> tables;
     private static Set<SqlServerChangeTable> changeTables;
     private DatabaseConnectorMapper connectorMapper;
@@ -103,8 +99,6 @@ public class SqlServerExtractor extends AbstractExtractor {
                 worker.interrupt();
                 worker = null;
             }
-            preparedStatementCache.values().forEach(this::close);
-            preparedStatementCache.clear();
             connected = false;
         }
     }
@@ -125,7 +119,6 @@ public class SqlServerExtractor extends AbstractExtractor {
             connectorMapper = (DatabaseConnectorMapper) connectorFactory.connect(cfg);
             serverName = cfg.getUrl();
             schema = cfg.getSchema();
-            connectionClosed = false;
         }
     }
 
@@ -296,35 +289,26 @@ public class SqlServerExtractor extends AbstractExtractor {
     }
 
     private <T> T query(String preparedQuerySql, StatementPreparer statementPreparer, ResultSetMapper<T> mapper) {
-        if (connectionClosed) {
-            connect();
-            return null;
-        }
         Object execute = connectorMapper.execute(databaseTemplate -> {
-            if (!preparedStatementCache.containsKey(preparedQuerySql)) {
-                preparedStatementCache.putIfAbsent(preparedQuerySql, databaseTemplate.getConnection().prepareStatement(preparedQuerySql));
-            }
-            PreparedStatement ps = preparedStatementCache.get(preparedQuerySql);
-            if (ps.getConnection().isClosed() || ps.isClosed()) {
-                preparedStatementCache.clear();
-                connectionClosed = true;
-                return null;
-            }
-            if (null != statementPreparer) {
-                statementPreparer.accept(ps);
-            }
+            PreparedStatement ps = null;
             ResultSet rs = null;
+            T apply = null;
             try {
+                ps = databaseTemplate.getConnection().prepareStatement(preparedQuerySql);
+                if (null != statementPreparer) {
+                    statementPreparer.accept(ps);
+                }
                 rs = ps.executeQuery();
-                return mapper.apply(rs);
+                apply = mapper.apply(rs);
             } catch (SQLServerException e) {
                 // 为过程或函数 cdc.fn_cdc_get_all_changes_ ...  提供的参数数目不足。
             } catch (Exception e) {
                 logger.error(e.getMessage());
             } finally {
                 close(rs);
+                close(ps);
             }
-            return null;
+            return apply;
         });
         return (T) execute;
     }
@@ -345,8 +329,13 @@ public class SqlServerExtractor extends AbstractExtractor {
 
                     lastLsn = stopLsn;
                     snapshot.put(LSN_POSITION, lastLsn.toString());
-                } catch (InterruptedException e) {
-                    break;
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                    try {
+                        TimeUnit.SECONDS.sleep(1);
+                    } catch (InterruptedException ex) {
+                        logger.error(ex.getMessage());
+                    }
                 }
             }
         }

+ 1 - 1
dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java

@@ -199,7 +199,7 @@ public class ChangeDataCaptureTest {
     public void start() throws SQLException {
         String username = "sa";
         String password = "123";
-        String url = "jdbc:sqlserver://127.0.0.1:1434;DatabaseName=test";
+        String url = "jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test";
         schema = "dbo";
         connection = DriverManager.getConnection(url, username, password);
         if (connection != null) {

+ 93 - 0
dbsyncer-listener/src/main/test/PGReplicationTest.java

@@ -0,0 +1,93 @@
+import org.dbsyncer.connector.util.DatabaseUtil;
+import org.junit.Test;
+import org.postgresql.PGConnection;
+import org.postgresql.replication.LogSequenceNumber;
+import org.postgresql.replication.PGReplicationStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2022/4/8 23:06
+ */
+public class PGReplicationTest {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+    private Connection connection;
+
+    @Test
+    public void testPG() throws SQLException, InterruptedException {
+        String url = "jdbc:postgresql://127.0.0.1:5432/postgres";
+        String driverClassNam = "org.postgresql.Driver";
+        String username = "postgres";
+        String password = "123456";
+        connection = DatabaseUtil.getConnection(driverClassNam, url, username, password);
+
+        LogSequenceNumber currentLSN = query("SELECT pg_current_wal_lsn()", rs -> LogSequenceNumber.valueOf(rs.getString(1)));
+
+        PGConnection replConnection = connection.unwrap(PGConnection.class);
+        PGReplicationStream stream = replConnection
+                .getReplicationAPI()
+                .replicationStream()
+                .logical()
+                .withSlotName("test_slot")
+                .withStartPosition(currentLSN)
+                .start();
+        while (true) {
+            //non blocking receive message
+            ByteBuffer msg = stream.readPending();
+
+            if (msg == null) {
+                TimeUnit.MILLISECONDS.sleep(10L);
+                continue;
+            }
+            int offset = msg.arrayOffset();
+            byte[] source = msg.array();
+            int length = source.length - offset;
+            System.out.println(new String(source, offset, length));
+        }
+
+    }
+
+    public <T> T query(String sql, ResultSetMapper mapper) {
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+        T apply = null;
+        try {
+            ps = connection.prepareStatement(sql);
+            rs = ps.executeQuery();
+            if (rs.next()) {
+                apply = (T) mapper.apply(rs);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        } finally {
+            close(rs);
+            close(ps);
+        }
+        return apply;
+    }
+
+    private void close(AutoCloseable closeable) {
+        if (null != closeable) {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                logger.error(e.getMessage());
+            }
+        }
+    }
+
+    public interface ResultSetMapper<T> {
+        T apply(ResultSet rs) throws SQLException;
+    }
+
+}

+ 1 - 1
dbsyncer-manager/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.6-Beta</version>
+        <version>1.1.7-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-manager</artifactId>

+ 1 - 1
dbsyncer-monitor/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.6-Beta</version>
+        <version>1.1.7-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-monitor</artifactId>

+ 1 - 1
dbsyncer-parser/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.6-Beta</version>
+        <version>1.1.7-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-parser</artifactId>

+ 1 - 1
dbsyncer-plugin/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.6-Beta</version>
+        <version>1.1.7-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-plugin</artifactId>

+ 1 - 1
dbsyncer-storage/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.6-Beta</version>
+        <version>1.1.7-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-storage</artifactId>

+ 2 - 0
dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java

@@ -194,6 +194,8 @@ public class Shard {
     private void execute(Object value, Callback callback) throws IOException {
         if (null != value && indexWriter.isOpen()) {
             callback.execute();
+            indexWriter.flush();
+            indexWriter.commit();
             return;
         }
         logger.error(value.toString());

+ 5 - 5
dbsyncer-storage/src/main/test/LuceneFactoryTest.java

@@ -49,7 +49,6 @@ public class LuceneFactoryTest {
 
     @After
     public void tearDown() throws IOException {
-        shard.close();
         shard.deleteAll();
     }
 
@@ -69,9 +68,10 @@ public class LuceneFactoryTest {
                     // 模拟操作
                     System.out.println(String.format("%s:%s", Thread.currentThread().getName(), k));
 
-                    Document update = ParamsUtil.convertData2Doc(createMap(k));
-                    IndexableField field = update.getField(ConfigConstant.CONFIG_MODEL_ID);
-                    shard.update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), update);
+                    Document data = ParamsUtil.convertData2Doc(createMap(k));
+                    //IndexableField field = data.getField(ConfigConstant.CONFIG_MODEL_ID);
+                    //shard.update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), data);
+                    shard.insert(data);
 
                 } catch (InterruptedException e) {
                     logger.error(e.getMessage());
@@ -114,7 +114,7 @@ public class LuceneFactoryTest {
         params.put(ConfigConstant.DATA_EVENT, ConnectorConstant.OPERTION_UPDATE);
         params.put(ConfigConstant.DATA_ERROR, "");
         Map<String, Object> row = new HashMap<>();
-        row.put("id", "1");
+        row.put("id", i);
         row.put("name", "中文");
         row.put("tel", "15800001234");
         row.put("update_time", System.currentTimeMillis());

+ 1 - 1
dbsyncer-web/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-		<version>1.1.6-Beta</version>
+        <version>1.1.7-Beta</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-web</artifactId>

+ 1 - 1
dbsyncer-web/src/main/resources/application.properties

@@ -25,7 +25,7 @@ management.endpoints.web.exposure.include=*
 management.endpoint.health.show-details=always
 management.health.elasticsearch.enabled=false
 info.app.name=DBSyncer
-info.app.version=1.1.6-Beta
+info.app.version=1.1.7-Beta
 info.app.copyright=&copy;2021 ${info.app.name}(${info.app.version})<footer>Designed By <a href='https://gitee.com/ghi/dbsyncer' target='_blank' >AE86</a></footer>
 
 #All < Trace < Debug < Info < Warn < Error < Fatal < OFF

+ 2 - 2
pom.xml

@@ -6,7 +6,7 @@
 
     <groupId>org.ghi</groupId>
     <artifactId>dbsyncer</artifactId>
-	<version>1.1.6-Beta</version>
+    <version>1.1.7-Beta</version>
     <packaging>pom</packaging>
     <name>dbsyncer</name>
     <url>https://gitee.com/ghi/dbsyncer</url>
@@ -44,7 +44,7 @@
         <ojdbc6.version>11.2.0.4.0-atlassian-hosted</ojdbc6.version>
         <mysql.version>5.1.40</mysql.version>
         <mysql-binlog.version>0.21.0</mysql-binlog.version>
-        <mssql-jdbc.version>8.2.0.jre8</mssql-jdbc.version>
+        <mssql-jdbc.version>7.4.1.jre8</mssql-jdbc.version>
         <postgresql.version>42.3.3</postgresql.version>
         <kafka.version>0.9.0.0</kafka.version>
         <json.version>20090211</json.version>