|
@@ -16,6 +16,7 @@ import org.dbsyncer.connector.enums.SqlBuilderEnum;
|
|
|
import org.dbsyncer.connector.model.Field;
|
|
|
import org.dbsyncer.connector.util.DatabaseUtil;
|
|
|
import org.dbsyncer.storage.AbstractStorageService;
|
|
|
+import org.dbsyncer.storage.NullExecutorException;
|
|
|
import org.dbsyncer.storage.constant.ConfigConstant;
|
|
|
import org.dbsyncer.storage.enums.StorageEnum;
|
|
|
import org.dbsyncer.storage.query.AbstractFilter;
|
|
@@ -102,6 +103,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
protected Paging select(String sharding, Query query) {
|
|
|
Paging paging = new Paging(query.getPageNum(), query.getPageSize());
|
|
|
Executor executor = getExecutor(query.getType(), sharding);
|
|
|
+ if (executor == null) {
|
|
|
+ return paging;
|
|
|
+ }
|
|
|
List<Object> queryCountArgs = new ArrayList<>();
|
|
|
String queryCountSql = buildQueryCountSql(query, executor, queryCountArgs);
|
|
|
Long total = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(queryCountSql, queryCountArgs.toArray(), Long.class));
|
|
@@ -122,6 +126,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
@Override
|
|
|
protected void delete(String sharding, Query query) {
|
|
|
Executor executor = getExecutor(query.getType(), sharding);
|
|
|
+ if (executor == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
StringBuilder sql = new StringBuilder("DELETE FROM `").append(executor.getTable()).append("`");
|
|
|
List<Object> params = new ArrayList<>();
|
|
|
buildQuerySqlWithParams(query, params, sql, new ArrayList<>());
|
|
@@ -172,6 +179,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
@Override
|
|
|
protected void batchDelete(StorageEnum type, String sharding, List<String> ids) {
|
|
|
final Executor executor = getExecutor(type, sharding);
|
|
|
+ if (executor == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
final String sql = executor.getDelete();
|
|
|
final List<Object[]> args = ids.stream().map(id -> new Object[]{id}).collect(Collectors.toList());
|
|
|
connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, args));
|
|
@@ -188,6 +198,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
}
|
|
|
|
|
|
final Executor executor = getExecutor(type, sharding);
|
|
|
+ if (executor == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
final String sql = mapper.getSql(executor);
|
|
|
final List<Object[]> args = list.stream().map(row -> mapper.getArgs(executor, row)).collect(Collectors.toList());
|
|
|
connectorMapper.execute(databaseTemplate -> databaseTemplate.batchUpdate(sql, args));
|
|
@@ -196,7 +209,9 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
private Executor getExecutor(StorageEnum type, String sharding) {
|
|
|
return tables.computeIfAbsent(sharding, (table) -> {
|
|
|
Executor executor = tables.get(type.getType());
|
|
|
- Assert.notNull(executor, "未知的存储类型");
|
|
|
+ if (executor == null) {
|
|
|
+ throw new NullExecutorException("未知的存储类型");
|
|
|
+ }
|
|
|
|
|
|
Executor newExecutor = new Executor(executor.getType(), executor.getFields(), executor.isSystemTable(), executor.isOrderByUpdateTime());
|
|
|
return createTableIfNotExist(table, newExecutor);
|