AE86 3 yıl önce
ebeveyn
işleme
2462f705bd

+ 71 - 62
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -31,8 +31,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    protected abstract String getTableSql(DatabaseConfig config);
-
     @Override
     public ConnectorMapper connect(DatabaseConfig config) {
         try {
@@ -59,16 +57,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return String.format("%s-%s", config.getUrl(), config.getUsername());
     }
 
-    @Override
-    public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
-        String sql = getTableSql(connectorMapper.getConfig());
-        List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
-        if (!CollectionUtils.isEmpty(tableNames)) {
-            return tableNames.stream().map(name -> new Table(name)).collect(Collectors.toList());
-        }
-        return Collections.EMPTY_LIST;
-    }
-
     @Override
     public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) {
         String quotation = buildSqlWithQuotation();
@@ -222,6 +210,15 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return map;
     }
 
+    /**
+     * 健康检查
+     *
+     * @return
+     */
+    protected String getValidationQuery() {
+        return "select 1";
+    }
+
     /**
      * 查询语句表名和字段带上引号(默认不加)
      *
@@ -231,6 +228,21 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return "";
     }
 
+    /**
+     * 获取表列表
+     *
+     * @param connectorMapper
+     * @param sql
+     * @return
+     */
+    protected List<Table> getTable(DatabaseConnectorMapper connectorMapper, String sql) {
+        List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
+        if (!CollectionUtils.isEmpty(tableNames)) {
+            return tableNames.stream().map(name -> new Table(name)).collect(Collectors.toList());
+        }
+        return Collections.EMPTY_LIST;
+    }
+
     /**
      * 获取查询条件SQL
      *
@@ -267,6 +279,37 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return sql.toString();
     }
 
+    /**
+     * 根据过滤条件获取查询SQL
+     *
+     * @param queryOperator and/or
+     * @param filter
+     * @return
+     */
+    private String getFilterSql(String queryOperator, List<Filter> filter) {
+        List<Filter> list = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), queryOperator)).collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(list)) {
+            return "";
+        }
+
+        int size = list.size();
+        int end = size - 1;
+        StringBuilder sql = new StringBuilder();
+        sql.append("(");
+        Filter c = null;
+        String quotation = buildSqlWithQuotation();
+        for (int i = 0; i < size; i++) {
+            c = list.get(i);
+            // "USER" = 'zhangsan'
+            sql.append(quotation).append(c.getName()).append(quotation).append(c.getFilter()).append("'").append(c.getValue()).append("'");
+            if (i < end) {
+                sql.append(" ").append(queryOperator).append(" ");
+            }
+        }
+        sql.append(")");
+        return sql.toString();
+    }
+
     /**
      * 获取查询SQL
      *
@@ -319,15 +362,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return SqlBuilderEnum.getSqlBuilder(type).buildSql(config);
     }
 
-    /**
-     * 健康检查
-     *
-     * @return
-     */
-    protected String getValidationQuery() {
-        return "select 1";
-    }
-
     /**
      * 获取数据库表元数据信息
      *
@@ -399,34 +433,26 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
     }
 
     /**
-     * 根据过滤条件获取查询SQL
+     * 返回表主键
      *
-     * @param queryOperator and/or
-     * @param filter
+     * @param md
+     * @param tableName
      * @return
+     * @throws SQLException
      */
-    private String getFilterSql(String queryOperator, List<Filter> filter) {
-        List<Filter> list = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), queryOperator)).collect(Collectors.toList());
-        if (CollectionUtils.isEmpty(list)) {
-            return "";
-        }
-
-        int size = list.size();
-        int end = size - 1;
-        StringBuilder sql = new StringBuilder();
-        sql.append("(");
-        Filter c = null;
-        String quotation = buildSqlWithQuotation();
-        for (int i = 0; i < size; i++) {
-            c = list.get(i);
-            // "USER" = 'zhangsan'
-            sql.append(quotation).append(c.getName()).append(quotation).append(c.getFilter()).append("'").append(c.getValue()).append("'");
-            if (i < end) {
-                sql.append(" ").append(queryOperator).append(" ");
+    private List<String> findTablePrimaryKeys(DatabaseMetaData md, String tableName) throws SQLException {
+        //根据表名获得主键结果集
+        ResultSet rs = null;
+        List<String> primaryKeys = new ArrayList<>();
+        try {
+            rs = md.getPrimaryKeys(null, null, tableName);
+            while (rs.next()) {
+                primaryKeys.add(rs.getString("COLUMN_NAME"));
             }
+        } finally {
+            DatabaseUtil.close(rs);
         }
-        sql.append(")");
-        return sql.toString();
+        return primaryKeys;
     }
 
     /**
@@ -485,9 +511,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         try {
             // 2、设置参数
             int execute = connectorMapper.execute(databaseTemplate ->
-                    databaseTemplate.update(sql, (ps) ->
-                            batchRowsSetter(databaseTemplate.getConnection(), ps, fields, row)
-                    )
+                    databaseTemplate.update(sql, (ps) -> batchRowsSetter(databaseTemplate.getConnection(), ps, fields, row))
             );
             if (execute == 0) {
                 throw new ConnectorException(String.format("尝试执行[%s]失败", event));
@@ -517,19 +541,4 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
         return !CollectionUtils.isEmpty(pk) && pk.contains(name);
     }
 
-    private List<String> findTablePrimaryKeys(DatabaseMetaData md, String tableName) throws SQLException {
-        //根据表名获得主键结果集
-        ResultSet rs = null;
-        List<String> primaryKeys = new ArrayList<>();
-        try {
-            rs = md.getPrimaryKeys(null, null, tableName);
-            while (rs.next()) {
-                primaryKeys.add(rs.getString("COLUMN_NAME"));
-            }
-        } finally {
-            DatabaseUtil.close(rs);
-        }
-        return primaryKeys;
-    }
-
 }

+ 8 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java

@@ -1,16 +1,14 @@
 package org.dbsyncer.connector.mysql;
 
-import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.PageSqlConfig;
+import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
+import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 
-public final class MysqlConnector extends AbstractDatabaseConnector {
+import java.util.List;
 
-    @Override
-    protected String getTableSql(DatabaseConfig config) {
-        return "show tables";
-    }
+public final class MysqlConnector extends AbstractDatabaseConnector {
 
     @Override
     public String getPageSql(PageSqlConfig config) {
@@ -22,4 +20,8 @@ public final class MysqlConnector extends AbstractDatabaseConnector {
         return new Object[]{(pageIndex - 1) * pageSize, pageSize};
     }
 
+    @Override
+    public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
+        return super.getTable(connectorMapper, "show tables");
+    }
 }

+ 1 - 7
dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.connector.oracle;
 
 import org.dbsyncer.common.util.CollectionUtils;
-import org.dbsyncer.connector.config.DatabaseConfig;
 import org.dbsyncer.connector.config.PageSqlConfig;
 import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.constant.DatabaseConstant;
@@ -15,14 +14,9 @@ import java.util.stream.Collectors;
 
 public final class OracleConnector extends AbstractDatabaseConnector {
 
-    @Override
-    protected String getTableSql(DatabaseConfig config) {
-        return "SELECT TABLE_NAME,TABLE_TYPE FROM USER_TAB_COMMENTS";
-    }
-
     @Override
     public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
-        String sql = getTableSql(connectorMapper.getConfig());
+        final String sql = "SELECT TABLE_NAME,TABLE_TYPE FROM USER_TAB_COMMENTS";
         List<Map<String, Object>> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql));
         if (!CollectionUtils.isEmpty(list)) {
             return list.stream().map(r -> new Table(r.get("TABLE_NAME").toString(), r.get("TABLE_TYPE").toString())).collect(Collectors.toList());

+ 5 - 11
dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java

@@ -14,20 +14,18 @@ import java.util.List;
 
 public final class PostgreSQLConnector extends AbstractDatabaseConnector {
 
-    @Override
-    protected String getTableSql(DatabaseConfig config) {
-        return String.format("SELECT TABLENAME FROM PG_TABLES WHERE SCHEMANAME ='%s'", config.getSchema());
-    }
-
     @Override
     public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
         List<Table> list = new LinkedList<>();
         DatabaseConfig config = connectorMapper.getConfig();
-        List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(getTableSql(config), String.class));
+        final String queryTableSql = String.format("SELECT TABLENAME FROM PG_TABLES WHERE SCHEMANAME ='%s'", config.getSchema());
+        List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(queryTableSql, String.class));
         if (!CollectionUtils.isEmpty(tableNames)) {
             tableNames.forEach(name -> list.add(new Table(name, TableTypeEnum.TABLE.getCode())));
         }
-        List<String> tableViewNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(getTableViewSql(config), String.class));
+
+        final String queryTableViewSql = String.format("SELECT VIEWNAME FROM PG_VIEWS WHERE SCHEMANAME ='%s'", config.getSchema());
+        List<String> tableViewNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(queryTableViewSql, String.class));
         if (!CollectionUtils.isEmpty(tableViewNames)) {
             tableViewNames.forEach(name -> list.add(new Table(name, TableTypeEnum.VIEW.getCode())));
         }
@@ -48,8 +46,4 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector {
     protected String buildSqlWithQuotation() {
         return "\"";
     }
-
-    private String getTableViewSql(DatabaseConfig config) {
-        return String.format("SELECT VIEWNAME FROM PG_VIEWS WHERE SCHEMANAME ='%s'", config.getSchema());
-    }
 }

+ 0 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.connector.sql;
 
 import org.dbsyncer.common.util.StringUtil;
-import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.*;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
@@ -20,11 +19,6 @@ import java.util.Map;
  */
 public abstract class AbstractDQLConnector extends AbstractDatabaseConnector {
 
-    @Override
-    protected String getTableSql(DatabaseConfig config) {
-        throw new ConnectorException("Unsupported method.");
-    }
-
     @Override
     public List<Table> getTable(DatabaseConnectorMapper config) {
         DatabaseConfig cfg = config.getConfig();

+ 5 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java

@@ -10,10 +10,12 @@ import org.dbsyncer.connector.config.Table;
 import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.constant.DatabaseConstant;
 import org.dbsyncer.connector.database.AbstractDatabaseConnector;
+import org.dbsyncer.connector.database.DatabaseConnectorMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public final class SqlServerConnector extends AbstractDatabaseConnector {
@@ -31,8 +33,9 @@ public final class SqlServerConnector extends AbstractDatabaseConnector {
     }
 
     @Override
-    protected String getTableSql(DatabaseConfig config) {
-        return String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s') AND IS_MS_SHIPPED = 0", config.getSchema());
+    public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
+        DatabaseConfig config = connectorMapper.getConfig();
+        return super.getTable(connectorMapper, String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s') AND IS_MS_SHIPPED = 0", config.getSchema()));
     }
 
     @Override