|
@@ -63,9 +63,11 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
@Override
|
|
|
public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) {
|
|
|
String quotation = buildSqlWithQuotation();
|
|
|
- StringBuilder queryMetaSql = new StringBuilder("SELECT * FROM ").append(quotation).append(tableName).append(quotation).append(
|
|
|
- " WHERE 1 != 1");
|
|
|
- return connectorMapper.execute(databaseTemplate -> getMetaInfo(databaseTemplate, queryMetaSql.toString(), tableName));
|
|
|
+ DatabaseConfig config = connectorMapper.getConfig();
|
|
|
+ String queryMetaSql = new StringBuilder("SELECT * FROM ").append(getSchema(config, quotation)).append(quotation).append(tableName)
|
|
|
+ .append(quotation).append(" WHERE 1!=1").toString();
|
|
|
+
|
|
|
+ return connectorMapper.execute(databaseTemplate -> getMetaInfo(databaseTemplate, queryMetaSql, config.getSchema(), tableName));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -92,7 +94,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
Collections.addAll(config.getArgs(), getPageArgs(config.getPageIndex(), config.getPageSize()));
|
|
|
|
|
|
// 3、执行SQL
|
|
|
- List<Map<String, Object>> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
|
|
|
+ List<Map<String, Object>> list = connectorMapper.execute(
|
|
|
+ databaseTemplate -> databaseTemplate.queryForList(querySql, config.getArgs().toArray()));
|
|
|
|
|
|
// 4、返回结果集
|
|
|
return new Result(list);
|
|
@@ -120,6 +123,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
if (!isInsert(event)) {
|
|
|
if (isDelete(event)) {
|
|
|
fields.clear();
|
|
|
+ } else if (isUpdate(event)) {
|
|
|
+ fields.remove(pkField);
|
|
|
}
|
|
|
fields.add(pkField);
|
|
|
}
|
|
@@ -142,19 +147,15 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
})
|
|
|
);
|
|
|
} catch (Exception e) {
|
|
|
- result.addFailData(data);
|
|
|
- result.getError().append(e.getMessage());
|
|
|
+ logger.error(e.getMessage());
|
|
|
+ data.forEach(row -> forceUpdate(result, connectorMapper, config, pkField, row));
|
|
|
}
|
|
|
|
|
|
if (null != execute) {
|
|
|
int batchSize = execute.length;
|
|
|
for (int i = 0; i < batchSize; i++) {
|
|
|
if (execute[i] == 0) {
|
|
|
- if (config.isForceUpdate()) {
|
|
|
- forceUpdate(result, connectorMapper, config, pkField, data.get(i));
|
|
|
- } else {
|
|
|
- result.getFailData().add(data.get(i));
|
|
|
- }
|
|
|
+ forceUpdate(result, connectorMapper, config, pkField, data.get(i));
|
|
|
continue;
|
|
|
}
|
|
|
result.getSuccessData().add(data.get(i));
|
|
@@ -166,47 +167,38 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
@Override
|
|
|
public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
|
|
|
// 获取过滤SQL
|
|
|
- List<Filter> filter = commandConfig.getFilter();
|
|
|
- String queryFilterSql = getQueryFilterSql(filter);
|
|
|
+ final String queryFilterSql = getQueryFilterSql(commandConfig.getFilter());
|
|
|
+ final String quotation = buildSqlWithQuotation();
|
|
|
|
|
|
// 获取查询SQL
|
|
|
- Table table = commandConfig.getTable();
|
|
|
Map<String, String> map = new HashMap<>();
|
|
|
-
|
|
|
- String query = ConnectorConstant.OPERTION_QUERY;
|
|
|
- map.put(query, buildSql(query, commandConfig, queryFilterSql));
|
|
|
-
|
|
|
+ String schema = getSchema((DatabaseConfig) commandConfig.getConnectorConfig(), quotation);
|
|
|
+ map.put(ConnectorConstant.OPERTION_QUERY, buildSql(ConnectorConstant.OPERTION_QUERY, commandConfig, schema, queryFilterSql));
|
|
|
// 获取查询总数SQL
|
|
|
- String quotation = buildSqlWithQuotation();
|
|
|
- String pk = findOriginalTablePrimaryKey(commandConfig, quotation);
|
|
|
- StringBuilder queryCount = new StringBuilder();
|
|
|
- queryCount.append("SELECT COUNT(1) FROM (SELECT 1 FROM ").append(quotation).append(table.getName()).append(quotation);
|
|
|
- if (StringUtil.isNotBlank(queryFilterSql)) {
|
|
|
- queryCount.append(queryFilterSql);
|
|
|
- }
|
|
|
- queryCount.append(" GROUP BY ").append(pk).append(") DBSYNCER_T");
|
|
|
- map.put(ConnectorConstant.OPERTION_QUERY_COUNT, queryCount.toString());
|
|
|
+ map.put(ConnectorConstant.OPERTION_QUERY_COUNT, getQueryCountSql(commandConfig, schema, quotation, queryFilterSql));
|
|
|
return map;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
|
|
|
+ String quotation = buildSqlWithQuotation();
|
|
|
+ String schema = getSchema((DatabaseConfig) commandConfig.getConnectorConfig(), quotation);
|
|
|
+
|
|
|
// 获取增删改SQL
|
|
|
Map<String, String> map = new HashMap<>();
|
|
|
String insert = SqlBuilderEnum.INSERT.getName();
|
|
|
- map.put(insert, buildSql(insert, commandConfig, null));
|
|
|
+ map.put(insert, buildSql(insert, commandConfig, schema, null));
|
|
|
|
|
|
String update = SqlBuilderEnum.UPDATE.getName();
|
|
|
- map.put(update, buildSql(update, commandConfig, null));
|
|
|
+ map.put(update, buildSql(update, commandConfig, schema, null));
|
|
|
|
|
|
String delete = SqlBuilderEnum.DELETE.getName();
|
|
|
- map.put(delete, buildSql(delete, commandConfig, null));
|
|
|
+ map.put(delete, buildSql(delete, commandConfig, schema, null));
|
|
|
|
|
|
// 获取查询数据行是否存在
|
|
|
- String quotation = buildSqlWithQuotation();
|
|
|
String pk = findOriginalTablePrimaryKey(commandConfig, quotation);
|
|
|
- StringBuilder queryCount = new StringBuilder().append("SELECT COUNT(1) FROM ").append(quotation).append(commandConfig.getTable().getName()).append(
|
|
|
- quotation).append(" WHERE ").append(pk).append(" = ?");
|
|
|
+ StringBuilder queryCount = new StringBuilder().append("SELECT COUNT(1) FROM ").append(schema).append(quotation).append(
|
|
|
+ commandConfig.getTable().getName()).append(quotation).append(" WHERE ").append(pk).append(" = ?");
|
|
|
String queryCountExist = ConnectorConstant.OPERTION_QUERY_COUNT_EXIST;
|
|
|
map.put(queryCountExist, queryCount.toString());
|
|
|
return map;
|
|
@@ -230,6 +222,21 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 获取架构名
|
|
|
+ *
|
|
|
+ * @param config
|
|
|
+ * @param quotation
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ protected String getSchema(DatabaseConfig config, String quotation) {
|
|
|
+ StringBuilder schema = new StringBuilder();
|
|
|
+ if (StringUtil.isNotBlank(config.getSchema())) {
|
|
|
+ schema.append(quotation).append(config.getSchema()).append(quotation).append(".");
|
|
|
+ }
|
|
|
+ return schema.toString();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 获取表列表
|
|
|
*
|
|
@@ -245,6 +252,27 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
return Collections.EMPTY_LIST;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 获取查询总数SQL
|
|
|
+ *
|
|
|
+ * @param commandConfig
|
|
|
+ * @param schema
|
|
|
+ * @param quotation
|
|
|
+ * @param queryFilterSql
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ protected String getQueryCountSql(CommandConfig commandConfig, String schema, String quotation, String queryFilterSql) {
|
|
|
+ String table = commandConfig.getTable().getName();
|
|
|
+ String pk = findOriginalTablePrimaryKey(commandConfig, quotation);
|
|
|
+ StringBuilder queryCount = new StringBuilder();
|
|
|
+ queryCount.append("SELECT COUNT(1) FROM (SELECT 1 FROM ").append(schema).append(quotation).append(table).append(quotation);
|
|
|
+ if (StringUtil.isNotBlank(queryFilterSql)) {
|
|
|
+ queryCount.append(queryFilterSql);
|
|
|
+ }
|
|
|
+ queryCount.append(" GROUP BY ").append(pk).append(") DBSYNCER_T");
|
|
|
+ return queryCount.toString();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 获取查询条件SQL
|
|
|
*
|
|
@@ -317,10 +345,11 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
*
|
|
|
* @param type {@link SqlBuilderEnum}
|
|
|
* @param commandConfig
|
|
|
+ * @param schema
|
|
|
* @param queryFilterSQL
|
|
|
* @return
|
|
|
*/
|
|
|
- protected String buildSql(String type, CommandConfig commandConfig, String queryFilterSQL) {
|
|
|
+ protected String buildSql(String type, CommandConfig commandConfig, String schema, String queryFilterSQL) {
|
|
|
Table table = commandConfig.getTable();
|
|
|
if (null == table) {
|
|
|
logger.error("Table can not be null.");
|
|
@@ -359,7 +388,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
pk = findOriginalTablePrimaryKey(commandConfig, "");
|
|
|
}
|
|
|
|
|
|
- SqlBuilderConfig config = new SqlBuilderConfig(this, tableName, pk, fields, queryFilterSQL, buildSqlWithQuotation());
|
|
|
+ SqlBuilderConfig config = new SqlBuilderConfig(this, schema, tableName, pk, fields, queryFilterSQL, buildSqlWithQuotation());
|
|
|
return SqlBuilderEnum.getSqlBuilder(type).buildSql(config);
|
|
|
}
|
|
|
|
|
@@ -368,10 +397,11 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
*
|
|
|
* @param databaseTemplate
|
|
|
* @param metaSql 查询元数据
|
|
|
+ * @param schema 架构名
|
|
|
* @param tableName 表名
|
|
|
* @return
|
|
|
*/
|
|
|
- protected MetaInfo getMetaInfo(DatabaseTemplate databaseTemplate, String metaSql, String tableName) throws SQLException {
|
|
|
+ protected MetaInfo getMetaInfo(DatabaseTemplate databaseTemplate, String metaSql, String schema, String tableName) throws SQLException {
|
|
|
SqlRowSet sqlRowSet = databaseTemplate.queryForRowSet(metaSql);
|
|
|
ResultSetWrappingSqlRowSet rowSet = (ResultSetWrappingSqlRowSet) sqlRowSet;
|
|
|
SqlRowSetMetaData metaData = rowSet.getMetaData();
|
|
@@ -384,7 +414,10 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
List<Field> fields = new ArrayList<>(columnCount);
|
|
|
Map<String, List<String>> tables = new HashMap<>();
|
|
|
try {
|
|
|
- DatabaseMetaData md = databaseTemplate.getConnection().getMetaData();
|
|
|
+ Connection connection = databaseTemplate.getConnection();
|
|
|
+ DatabaseMetaData md = connection.getMetaData();
|
|
|
+ final String catalog = connection.getCatalog();
|
|
|
+ schema = StringUtil.isNotBlank(schema) ? schema : null;
|
|
|
String name = null;
|
|
|
String label = null;
|
|
|
String typeName = null;
|
|
@@ -394,7 +427,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
for (int i = 1; i <= columnCount; i++) {
|
|
|
table = StringUtil.isNotBlank(tableName) ? tableName : metaData.getTableName(i);
|
|
|
if (null == tables.get(table)) {
|
|
|
- tables.putIfAbsent(table, findTablePrimaryKeys(md, table));
|
|
|
+ tables.putIfAbsent(table, findTablePrimaryKeys(md, catalog, schema, table));
|
|
|
}
|
|
|
name = metaData.getColumnName(i);
|
|
|
label = metaData.getColumnLabel(i);
|
|
@@ -441,16 +474,18 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
* 返回表主键
|
|
|
*
|
|
|
* @param md
|
|
|
+ * @param catalog
|
|
|
+ * @param schema
|
|
|
* @param tableName
|
|
|
* @return
|
|
|
* @throws SQLException
|
|
|
*/
|
|
|
- private List<String> findTablePrimaryKeys(DatabaseMetaData md, String tableName) throws SQLException {
|
|
|
+ private List<String> findTablePrimaryKeys(DatabaseMetaData md, String catalog, String schema, String tableName) throws SQLException {
|
|
|
//根据表名获得主键结果集
|
|
|
ResultSet rs = null;
|
|
|
List<String> primaryKeys = new ArrayList<>();
|
|
|
try {
|
|
|
- rs = md.getPrimaryKeys(null, null, tableName);
|
|
|
+ rs = md.getPrimaryKeys(catalog, schema, tableName);
|
|
|
while (rs.next()) {
|
|
|
primaryKeys.add(rs.getString("COLUMN_NAME"));
|
|
|
}
|
|
@@ -509,6 +544,8 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
if (!isInsert(event)) {
|
|
|
if (isDelete(event)) {
|
|
|
fields.clear();
|
|
|
+ } else if (isUpdate(event)) {
|
|
|
+ fields.remove(pkField);
|
|
|
}
|
|
|
fields.add(pkField);
|
|
|
}
|
|
@@ -534,7 +571,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
private boolean existRow(DatabaseConnectorMapper connectorMapper, String sql, Object value) {
|
|
|
int rowNum = 0;
|
|
|
try {
|
|
|
- rowNum = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, new Object[]{value}, Integer.class));
|
|
|
+ rowNum = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, new Object[] {value}, Integer.class));
|
|
|
} catch (Exception e) {
|
|
|
logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, value);
|
|
|
}
|