|
@@ -219,136 +219,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
return map;
|
|
|
}
|
|
|
|
|
|
- private void forceUpdate(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField,
|
|
|
- Map row) {
|
|
|
- String event = config.getEvent();
|
|
|
- if (!config.isForceUpdate()) {
|
|
|
- result.getFailData().add(row);
|
|
|
- result.getError().append("SQL:").append(config.getCommand().get(event)).append(System.lineSeparator())
|
|
|
- .append("DATA:").append(row).append(System.lineSeparator())
|
|
|
- .append("ERROR:").append("Row data does not exist.").append(System.lineSeparator());
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // 不存在转insert
|
|
|
- if (isUpdate(event)) {
|
|
|
- String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
|
|
|
- if (!existRow(connectorMapper, queryCount, row.get(pkField.getName()))) {
|
|
|
- logger.warn("{}表执行{}失败, 尝试执行{}, {}", config.getTableName(), event, ConnectorConstant.OPERTION_INSERT, row);
|
|
|
- writer(result, connectorMapper, config, pkField, row, ConnectorConstant.OPERTION_INSERT);
|
|
|
- }
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // 存在转update
|
|
|
- if (isInsert(config.getEvent())) {
|
|
|
- logger.warn("{}表执行{}失败, 尝试执行{}, {}", config.getTableName(), event, ConnectorConstant.OPERTION_UPDATE, row);
|
|
|
- writer(result, connectorMapper, config, pkField, row, ConnectorConstant.OPERTION_UPDATE);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void writer(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField, Map row,
|
|
|
- String event) {
|
|
|
- // 1、获取 SQL
|
|
|
- String sql = config.getCommand().get(event);
|
|
|
-
|
|
|
- List<Field> fields = new ArrayList<>(config.getFields());
|
|
|
- // Update / Delete
|
|
|
- if (!isInsert(event)) {
|
|
|
- if (isDelete(event)) {
|
|
|
- fields.clear();
|
|
|
- }
|
|
|
- fields.add(pkField);
|
|
|
- }
|
|
|
-
|
|
|
- try {
|
|
|
- // 2、设置参数
|
|
|
- int execute = connectorMapper.execute(databaseTemplate ->
|
|
|
- databaseTemplate.update(sql, (ps) ->
|
|
|
- batchRowsSetter(databaseTemplate.getConnection(), ps, fields, row)
|
|
|
- )
|
|
|
- );
|
|
|
- if (execute == 0) {
|
|
|
- throw new ConnectorException(String.format("尝试执行[%s]失败", event));
|
|
|
- }
|
|
|
- result.getSuccessData().add(row);
|
|
|
- } catch (Exception e) {
|
|
|
- result.getFailData().add(row);
|
|
|
- result.getError().append("SQL:").append(sql).append(System.lineSeparator())
|
|
|
- .append("DATA:").append(row).append(System.lineSeparator())
|
|
|
- .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
|
|
|
- logger.error("执行{}失败: {}, DATA:{}", event, e.getMessage(), row);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取DQL表信息
|
|
|
- *
|
|
|
- * @param config
|
|
|
- * @return
|
|
|
- */
|
|
|
- protected List<Table> getDqlTable(DatabaseConnectorMapper config) {
|
|
|
- MetaInfo metaInfo = getDqlMetaInfo(config);
|
|
|
- Assert.notNull(metaInfo, "SQL解析异常.");
|
|
|
- DatabaseConfig cfg = config.getConfig();
|
|
|
- List<Table> tables = new ArrayList<>();
|
|
|
- tables.add(new Table(cfg.getSql()));
|
|
|
- return tables;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取DQL元信息
|
|
|
- *
|
|
|
- * @param config
|
|
|
- * @return
|
|
|
- */
|
|
|
- protected MetaInfo getDqlMetaInfo(DatabaseConnectorMapper config) {
|
|
|
- DatabaseConfig cfg = config.getConfig();
|
|
|
- String sql = cfg.getSql().toUpperCase();
|
|
|
- String queryMetaSql = StringUtil.contains(sql, " WHERE ") ? sql + " AND 1!=1 " : sql + " WHERE 1!=1 ";
|
|
|
- return config.execute(databaseTemplate -> getMetaInfo(databaseTemplate, queryMetaSql, cfg.getTable()));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取DQL源配置
|
|
|
- *
|
|
|
- * @param commandConfig
|
|
|
- * @param appendGroupByPK
|
|
|
- * @return
|
|
|
- */
|
|
|
- protected Map<String, String> getDqlSourceCommand(CommandConfig commandConfig, boolean appendGroupByPK) {
|
|
|
- // 获取过滤SQL
|
|
|
- List<Filter> filter = commandConfig.getFilter();
|
|
|
- String queryFilterSql = getQueryFilterSql(filter);
|
|
|
-
|
|
|
- // 获取查询SQL
|
|
|
- Table table = commandConfig.getTable();
|
|
|
- Map<String, String> map = new HashMap<>();
|
|
|
- String querySql = table.getName();
|
|
|
-
|
|
|
- // 存在条件
|
|
|
- if (StringUtil.isNotBlank(queryFilterSql)) {
|
|
|
- querySql += queryFilterSql;
|
|
|
- }
|
|
|
- String quotation = buildSqlWithQuotation();
|
|
|
- String pk = findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
|
|
|
- map.put(SqlBuilderEnum.QUERY.getName(), getPageSql(new PageSqlConfig(querySql, pk)));
|
|
|
-
|
|
|
- // 获取查询总数SQL
|
|
|
- StringBuilder queryCount = new StringBuilder();
|
|
|
- queryCount.append("SELECT COUNT(1) FROM (").append(table.getName());
|
|
|
- if (StringUtil.isNotBlank(queryFilterSql)) {
|
|
|
- queryCount.append(queryFilterSql);
|
|
|
- }
|
|
|
- // Mysql
|
|
|
- if (appendGroupByPK) {
|
|
|
- queryCount.append(" GROUP BY ").append(pk);
|
|
|
- }
|
|
|
- queryCount.append(") DBSYNCER_T");
|
|
|
- map.put(ConnectorConstant.OPERTION_QUERY_COUNT, queryCount.toString());
|
|
|
- return map;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* 查询语句表名和字段带上引号(默认不加)
|
|
|
*
|
|
@@ -455,67 +325,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
return "select 1";
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 根据过滤条件获取查询SQL
|
|
|
- *
|
|
|
- * @param queryOperator and/or
|
|
|
- * @param filter
|
|
|
- * @return
|
|
|
- */
|
|
|
- private String getFilterSql(String queryOperator, List<Filter> filter) {
|
|
|
- List<Filter> list = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), queryOperator)).collect(Collectors.toList());
|
|
|
- if (CollectionUtils.isEmpty(list)) {
|
|
|
- return "";
|
|
|
- }
|
|
|
-
|
|
|
- int size = list.size();
|
|
|
- int end = size - 1;
|
|
|
- StringBuilder sql = new StringBuilder();
|
|
|
- sql.append("(");
|
|
|
- Filter c = null;
|
|
|
- String quotation = buildSqlWithQuotation();
|
|
|
- for (int i = 0; i < size; i++) {
|
|
|
- c = list.get(i);
|
|
|
- // "USER" = 'zhangsan'
|
|
|
- sql.append(quotation).append(c.getName()).append(quotation).append(c.getFilter()).append("'").append(c.getValue()).append("'");
|
|
|
- if (i < end) {
|
|
|
- sql.append(" ").append(queryOperator).append(" ");
|
|
|
- }
|
|
|
- }
|
|
|
- sql.append(")");
|
|
|
- return sql.toString();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * @param connection 连接
|
|
|
- * @param ps 参数构造器
|
|
|
- * @param fields 同步字段,例如[{name=ID, type=4}, {name=NAME, type=12}]
|
|
|
- * @param row 同步字段对应的值,例如{ID=123, NAME=张三11}
|
|
|
- */
|
|
|
- private void batchRowsSetter(Connection connection, PreparedStatement ps, List<Field> fields, Map row) {
|
|
|
- Field f = null;
|
|
|
- int type;
|
|
|
- Object val = null;
|
|
|
- int size = fields.size();
|
|
|
- for (int i = 0; i < size; i++) {
|
|
|
- // 取出字段和对应值
|
|
|
- f = fields.get(i);
|
|
|
- type = f.getType();
|
|
|
- val = row.get(f.getName());
|
|
|
- SetterEnum.getSetter(type).set(connection, ps, i + 1, type, val);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private boolean existRow(DatabaseConnectorMapper connectorMapper, String sql, Object value) {
|
|
|
- int rowNum = 0;
|
|
|
- try {
|
|
|
- rowNum = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, new Object[] {value}, Integer.class));
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, value);
|
|
|
- }
|
|
|
- return rowNum > 0;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* 获取数据库表元数据信息
|
|
|
*
|
|
@@ -524,7 +333,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
* @param tableName 表名
|
|
|
* @return
|
|
|
*/
|
|
|
- private MetaInfo getMetaInfo(DatabaseTemplate databaseTemplate, String metaSql, String tableName) throws SQLException {
|
|
|
+ protected MetaInfo getMetaInfo(DatabaseTemplate databaseTemplate, String metaSql, String tableName) throws SQLException {
|
|
|
SqlRowSet sqlRowSet = databaseTemplate.queryForRowSet(metaSql);
|
|
|
ResultSetWrappingSqlRowSet rowSet = (ResultSetWrappingSqlRowSet) sqlRowSet;
|
|
|
SqlRowSetMetaData metaData = rowSet.getMetaData();
|
|
@@ -569,7 +378,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
* @param quotation
|
|
|
* @return
|
|
|
*/
|
|
|
- private String findTablePrimaryKey(Table table, String quotation) {
|
|
|
+ protected String findTablePrimaryKey(Table table, String quotation) {
|
|
|
if (null != table) {
|
|
|
List<Field> column = table.getColumn();
|
|
|
if (!CollectionUtils.isEmpty(column)) {
|
|
@@ -586,6 +395,129 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 根据过滤条件获取查询SQL
|
|
|
+ *
|
|
|
+ * @param queryOperator and/or
|
|
|
+ * @param filter
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private String getFilterSql(String queryOperator, List<Filter> filter) {
|
|
|
+ List<Filter> list = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), queryOperator)).collect(Collectors.toList());
|
|
|
+ if (CollectionUtils.isEmpty(list)) {
|
|
|
+ return "";
|
|
|
+ }
|
|
|
+
|
|
|
+ int size = list.size();
|
|
|
+ int end = size - 1;
|
|
|
+ StringBuilder sql = new StringBuilder();
|
|
|
+ sql.append("(");
|
|
|
+ Filter c = null;
|
|
|
+ String quotation = buildSqlWithQuotation();
|
|
|
+ for (int i = 0; i < size; i++) {
|
|
|
+ c = list.get(i);
|
|
|
+ // "USER" = 'zhangsan'
|
|
|
+ sql.append(quotation).append(c.getName()).append(quotation).append(c.getFilter()).append("'").append(c.getValue()).append("'");
|
|
|
+ if (i < end) {
|
|
|
+ sql.append(" ").append(queryOperator).append(" ");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ sql.append(")");
|
|
|
+ return sql.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param connection 连接
|
|
|
+ * @param ps 参数构造器
|
|
|
+ * @param fields 同步字段,例如[{name=ID, type=4}, {name=NAME, type=12}]
|
|
|
+ * @param row 同步字段对应的值,例如{ID=123, NAME=张三11}
|
|
|
+ */
|
|
|
+ private void batchRowsSetter(Connection connection, PreparedStatement ps, List<Field> fields, Map row) {
|
|
|
+ Field f = null;
|
|
|
+ int type;
|
|
|
+ Object val = null;
|
|
|
+ int size = fields.size();
|
|
|
+ for (int i = 0; i < size; i++) {
|
|
|
+ // 取出字段和对应值
|
|
|
+ f = fields.get(i);
|
|
|
+ type = f.getType();
|
|
|
+ val = row.get(f.getName());
|
|
|
+ SetterEnum.getSetter(type).set(connection, ps, i + 1, type, val);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void forceUpdate(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField,
|
|
|
+ Map row) {
|
|
|
+ String event = config.getEvent();
|
|
|
+ if (!config.isForceUpdate()) {
|
|
|
+ result.getFailData().add(row);
|
|
|
+ result.getError().append("SQL:").append(config.getCommand().get(event)).append(System.lineSeparator())
|
|
|
+ .append("DATA:").append(row).append(System.lineSeparator())
|
|
|
+ .append("ERROR:").append("Row data does not exist.").append(System.lineSeparator());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 不存在转insert
|
|
|
+ if (isUpdate(event)) {
|
|
|
+ String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
|
|
|
+ if (!existRow(connectorMapper, queryCount, row.get(pkField.getName()))) {
|
|
|
+ logger.warn("{}表执行{}失败, 尝试执行{}, {}", config.getTableName(), event, ConnectorConstant.OPERTION_INSERT, row);
|
|
|
+ writer(result, connectorMapper, config, pkField, row, ConnectorConstant.OPERTION_INSERT);
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 存在转update
|
|
|
+ if (isInsert(config.getEvent())) {
|
|
|
+ logger.warn("{}表执行{}失败, 尝试执行{}, {}", config.getTableName(), event, ConnectorConstant.OPERTION_UPDATE, row);
|
|
|
+ writer(result, connectorMapper, config, pkField, row, ConnectorConstant.OPERTION_UPDATE);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void writer(Result result, DatabaseConnectorMapper connectorMapper, WriterBatchConfig config, Field pkField, Map row,
|
|
|
+ String event) {
|
|
|
+ // 1、获取 SQL
|
|
|
+ String sql = config.getCommand().get(event);
|
|
|
+
|
|
|
+ List<Field> fields = new ArrayList<>(config.getFields());
|
|
|
+ // Update / Delete
|
|
|
+ if (!isInsert(event)) {
|
|
|
+ if (isDelete(event)) {
|
|
|
+ fields.clear();
|
|
|
+ }
|
|
|
+ fields.add(pkField);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 2、设置参数
|
|
|
+ int execute = connectorMapper.execute(databaseTemplate ->
|
|
|
+ databaseTemplate.update(sql, (ps) ->
|
|
|
+ batchRowsSetter(databaseTemplate.getConnection(), ps, fields, row)
|
|
|
+ )
|
|
|
+ );
|
|
|
+ if (execute == 0) {
|
|
|
+ throw new ConnectorException(String.format("尝试执行[%s]失败", event));
|
|
|
+ }
|
|
|
+ result.getSuccessData().add(row);
|
|
|
+ } catch (Exception e) {
|
|
|
+ result.getFailData().add(row);
|
|
|
+ result.getError().append("SQL:").append(sql).append(System.lineSeparator())
|
|
|
+ .append("DATA:").append(row).append(System.lineSeparator())
|
|
|
+ .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
|
|
|
+ logger.error("执行{}失败: {}, DATA:{}", event, e.getMessage(), row);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean existRow(DatabaseConnectorMapper connectorMapper, String sql, Object value) {
|
|
|
+ int rowNum = 0;
|
|
|
+ try {
|
|
|
+ rowNum = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(sql, new Object[]{value}, Integer.class));
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("检查数据行存在异常:{},SQL:{},参数:{}", e.getMessage(), sql, value);
|
|
|
+ }
|
|
|
+ return rowNum > 0;
|
|
|
+ }
|
|
|
+
|
|
|
private boolean isPk(Map<String, List<String>> tables, String tableName, String name) {
|
|
|
List<String> pk = tables.get(tableName);
|
|
|
return !CollectionUtils.isEmpty(pk) && pk.contains(name);
|