|
@@ -27,7 +27,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
|
|
|
|
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
- protected abstract String getTableSql(DatabaseConfig config);
|
|
|
+ protected abstract String getTableSql();
|
|
|
|
|
|
@Override
|
|
|
public ConnectorMapper connect(DatabaseConfig config) {
|
|
@@ -56,9 +56,13 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public List<String> getTable(DatabaseConnectorMapper connectorMapper) {
|
|
|
- String sql = getTableSql(connectorMapper.getConfig());
|
|
|
- return connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
|
|
|
+ public List<Table> getTable(DatabaseConnectorMapper connectorMapper) {
|
|
|
+ String sql = getTableSql();
|
|
|
+ List<String> tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class));
|
|
|
+ if(!CollectionUtils.isEmpty(tableNames)){
|
|
|
+ return tableNames.stream().map(name -> new Table(name)).collect(Collectors.toList());
|
|
|
+ }
|
|
|
+ return Collections.EMPTY_LIST;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -153,10 +157,9 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
|
|
|
throw new ConnectorException("writer data can not be empty.");
|
|
|
}
|
|
|
|
|
|
- Field pkField = null;
|
|
|
+ Field pkField = getPrimaryKeyField(fields);
|
|
|
// Update / Delete
|
|
|
if (isUpdate(event) || isDelete(event)) {
|
|
|
- pkField = getPrimaryKeyField(fields);
|
|
|
if (isDelete(event)) {
|
|
|
fields.clear();
|
|
|
}
|
|
@@ -165,10 +168,10 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
|
|
|
|
|
|
int size = fields.size();
|
|
|
Result result = new Result();
|
|
|
- int update = 0;
|
|
|
+ int execute = 0;
|
|
|
try {
|
|
|
// 2、设置参数
|
|
|
- update = connectorMapper.execute(databaseTemplate ->
|
|
|
+ execute = connectorMapper.execute(databaseTemplate ->
|
|
|
databaseTemplate.update(sql, (ps) -> {
|
|
|
Field f = null;
|
|
|
for (int i = 0; i < size; i++) {
|
|
@@ -179,25 +182,36 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
|
|
|
);
|
|
|
} catch (Exception e) {
|
|
|
// 记录错误数据
|
|
|
- result.getFailData().add(data);
|
|
|
- result.getFail().set(1);
|
|
|
- result.getError().append("SQL:").append(sql).append(System.lineSeparator())
|
|
|
- .append("DATA:").append(data).append(System.lineSeparator())
|
|
|
- .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
|
|
|
- logger.error("SQL:{}, DATA:{}, ERROR:{}", sql, data, e.getMessage());
|
|
|
+ if(!config.isUpdateRowIfInsertFailed()){
|
|
|
+ result.getFailData().add(data);
|
|
|
+ result.getFail().set(1);
|
|
|
+ result.getError().append("SQL:").append(sql).append(System.lineSeparator())
|
|
|
+ .append("DATA:").append(data).append(System.lineSeparator())
|
|
|
+ .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
|
|
|
+ logger.error("SQL:{}, DATA:{}, ERROR:{}", sql, data, e.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- // 更新失败尝试插入
|
|
|
- if (0 == update && isUpdate(event) && null != pkField && !config.isRetry()) {
|
|
|
- // 插入前检查有无数据
|
|
|
- String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
|
|
|
- if (!existRow(connectorMapper, queryCount, data.get(pkField.getName()))) {
|
|
|
- fields.remove(fields.size() - 1);
|
|
|
- config.setEvent(ConnectorConstant.OPERTION_INSERT);
|
|
|
+ if (0 == execute && !config.isRetry() && null != pkField) {
|
|
|
+ // 不存在转insert
|
|
|
+ if(isUpdate(event)){
|
|
|
+ String queryCount = config.getCommand().get(ConnectorConstant.OPERTION_QUERY_COUNT_EXIST);
|
|
|
+ if(!existRow(connectorMapper, queryCount, data.get(pkField.getName()))){
|
|
|
+ fields.remove(fields.size() - 1);
|
|
|
+ config.setEvent(ConnectorConstant.OPERTION_INSERT);
|
|
|
+ config.setRetry(true);
|
|
|
+ logger.warn("{}表执行{}失败, 尝试执行{}", config.getTable(), event, config.getEvent());
|
|
|
+ return writer(connectorMapper, config);
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ // 存在转update
|
|
|
+ if(isInsert(event)){
|
|
|
+ config.setEvent(ConnectorConstant.OPERTION_UPDATE);
|
|
|
config.setRetry(true);
|
|
|
- logger.warn("{}表执行{}失败, 尝试执行{}", config.getTable(), event, config.getEvent());
|
|
|
- result = writer(connectorMapper, config);
|
|
|
+ return writer(connectorMapper, config);
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
return result;
|
|
|
}
|
|
@@ -223,7 +237,10 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
|
|
|
if (StringUtil.isNotBlank(queryFilterSql)) {
|
|
|
queryCount.append(queryFilterSql);
|
|
|
}
|
|
|
- queryCount.append(" GROUP BY ").append(pk).append(") DBSYNCER_T");
|
|
|
+ if(!StringUtil.isBlank(pk)){
|
|
|
+ queryCount.append(" GROUP BY ").append(pk);
|
|
|
+ }
|
|
|
+ queryCount.append(") DBSYNCER_T");
|
|
|
map.put(ConnectorConstant.OPERTION_QUERY_COUNT, queryCount.toString());
|
|
|
return map;
|
|
|
}
|
|
@@ -260,12 +277,12 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
|
|
|
* @param config
|
|
|
* @return
|
|
|
*/
|
|
|
- protected List<String> getDqlTable(DatabaseConnectorMapper config) {
|
|
|
+ protected List<Table> getDqlTable(DatabaseConnectorMapper config) {
|
|
|
MetaInfo metaInfo = getDqlMetaInfo(config);
|
|
|
Assert.notNull(metaInfo, "SQL解析异常.");
|
|
|
DatabaseConfig cfg = config.getConfig();
|
|
|
- List<String> tables = new ArrayList<>();
|
|
|
- tables.add(cfg.getSql());
|
|
|
+ List<Table> tables = new ArrayList<>();
|
|
|
+ tables.add(new Table(cfg.getSql()));
|
|
|
return tables;
|
|
|
}
|
|
|
|