|
@@ -2,8 +2,11 @@ package org.dbsyncer.storage.support;
|
|
|
|
|
|
import org.apache.commons.dbcp.DelegatingDatabaseMetaData;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
+import org.dbsyncer.common.util.CollectionUtils;
|
|
|
import org.dbsyncer.connector.config.DatabaseConfig;
|
|
|
+import org.dbsyncer.connector.constant.DatabaseConstant;
|
|
|
import org.dbsyncer.connector.database.Database;
|
|
|
+import org.dbsyncer.connector.database.sqlbuilder.SqlBuilderQuery;
|
|
|
import org.dbsyncer.connector.enums.SqlBuilderEnum;
|
|
|
import org.dbsyncer.connector.mysql.MysqlConnector;
|
|
|
import org.dbsyncer.connector.util.DatabaseUtil;
|
|
@@ -12,6 +15,7 @@ import org.dbsyncer.storage.AbstractStorageService;
|
|
|
import org.dbsyncer.storage.StorageException;
|
|
|
import org.dbsyncer.storage.constant.ConfigConstant;
|
|
|
import org.dbsyncer.storage.enums.StorageEnum;
|
|
|
+import org.dbsyncer.storage.query.Param;
|
|
|
import org.dbsyncer.storage.query.Query;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -28,10 +32,14 @@ import java.io.*;
|
|
|
import java.lang.reflect.Field;
|
|
|
import java.sql.Connection;
|
|
|
import java.sql.DatabaseMetaData;
|
|
|
+import java.sql.Timestamp;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* @author AE86
|
|
@@ -45,9 +53,12 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
|
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
- private static final String PREFIX_TABLE = "dbsyncer_";
|
|
|
- private static final String SHOW_TABLE = "show tables where Tables_in_%s = \"%s\"";
|
|
|
- private static final String PK = ConfigConstant.CONFIG_MODEL_ID;
|
|
|
+ private static final String PREFIX_TABLE = "dbsyncer_";
|
|
|
+ private static final String SHOW_TABLE = "show tables where Tables_in_%s = \"%s\"";
|
|
|
+ private static final String TRUNCATE_TABLE = "TRUNCATE TABLE \"%s\"";
|
|
|
+ private static final String PK = ConfigConstant.CONFIG_MODEL_ID;
|
|
|
+ private static final String TABLE_CREATE_TIME = "create_time";
|
|
|
+ private static final String TABLE_UPDATE_TIME = "update_time";
|
|
|
|
|
|
private Map<String, Executor> tables = new ConcurrentHashMap<>();
|
|
|
|
|
@@ -96,44 +107,58 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
|
|
|
@Override
|
|
|
public List<Map> select(Query query) {
|
|
|
- getExecutor(query.getType(), query.getCollection());
|
|
|
-
|
|
|
- return null;
|
|
|
+ Executor executor = getExecutor(query.getType(), query.getCollection());
|
|
|
+ List<Object> args = new ArrayList<>();
|
|
|
+ String sql = buildQuerySql(query, executor, args);
|
|
|
+
|
|
|
+ List<Map> result = new ArrayList<>();
|
|
|
+ List<Map<String, Object>> list = jdbcTemplate.queryForList(sql, args.toArray());
|
|
|
+ result.addAll(list);
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void insert(StorageEnum type, String table, Map params) {
|
|
|
- getExecutor(type.getType(), table);
|
|
|
-
|
|
|
+ Executor executor = getExecutor(type, table);
|
|
|
+ String sql = executor.getInsert();
|
|
|
+ List<Object> args = getParams(executor, params);
|
|
|
+ int insert = jdbcTemplate.update(sql, args.toArray());
|
|
|
+ Assert.isTrue(insert > 0, "insert failed");
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void update(StorageEnum type, String table, Map params) {
|
|
|
- getExecutor(type.getType(), table);
|
|
|
-
|
|
|
+ Executor executor = getExecutor(type, table);
|
|
|
+ String sql = executor.getUpdate();
|
|
|
+ List<Object> args = getParams(executor, params);
|
|
|
+ args.add(params.get(ConfigConstant.CONFIG_MODEL_ID));
|
|
|
+ int update = jdbcTemplate.update(sql, args.toArray());
|
|
|
+ Assert.isTrue(update > 0, "update failed");
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void delete(StorageEnum type, String table, String id) {
|
|
|
- getExecutor(type.getType(), table);
|
|
|
-
|
|
|
+ Executor executor = getExecutor(type, table);
|
|
|
+ String sql = executor.getDelete();
|
|
|
+ int delete = jdbcTemplate.update(sql, new Object[] {id});
|
|
|
+ Assert.isTrue(delete > 0, "delete failed");
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void deleteAll(StorageEnum type, String table) {
|
|
|
- getExecutor(type.getType(), table);
|
|
|
-
|
|
|
+ String sql = String.format(TRUNCATE_TABLE, table);
|
|
|
+ jdbcTemplate.execute(sql);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void insertLog(StorageEnum type, String table, Map<String, Object> params) {
|
|
|
- getExecutor(type.getType(), table);
|
|
|
+ getExecutor(type, table);
|
|
|
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void insertData(StorageEnum type, String table, List<Map> list) {
|
|
|
- getExecutor(type.getType(), table);
|
|
|
+ getExecutor(type, table);
|
|
|
|
|
|
}
|
|
|
|
|
@@ -147,9 +172,35 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
return "_";
|
|
|
}
|
|
|
|
|
|
- private Executor getExecutor(String group, String table) {
|
|
|
+ private List<Object> getParams(Executor executor, Map params) {
|
|
|
+ return executor.getFieldPairs().stream().map(p -> p.convert(params.get(p.labelName))).collect(Collectors.toList());
|
|
|
+ }
|
|
|
+
|
|
|
+ private String buildQuerySql(Query query, Executor executor, List<Object> args) {
|
|
|
+ StringBuilder sql = new StringBuilder(executor.getQuery());
|
|
|
+ List<Param> params = query.getParams();
|
|
|
+ if (!CollectionUtils.isEmpty(params)) {
|
|
|
+ sql.append(" WHERE ");
|
|
|
+ AtomicBoolean flag = new AtomicBoolean();
|
|
|
+ params.forEach(p -> {
|
|
|
+ if (flag.get()) {
|
|
|
+ sql.append(" AND ");
|
|
|
+ }
|
|
|
+ // name=?
|
|
|
+ sql.append(p.getKey()).append("=").append("?");
|
|
|
+ args.add(p.getValue());
|
|
|
+ flag.compareAndSet(false, true);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ sql.append(DatabaseConstant.MYSQL_PAGE_SQL);
|
|
|
+ args.add((query.getPageNum() - 1) * query.getPageSize());
|
|
|
+ args.add(query.getPageSize());
|
|
|
+ return sql.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ private Executor getExecutor(StorageEnum type, String table) {
|
|
|
// 获取模板
|
|
|
- Executor executor = tables.get(group);
|
|
|
+ Executor executor = tables.get(type.getType());
|
|
|
Assert.notNull(executor, "未知的存储类型");
|
|
|
|
|
|
synchronized (tables) {
|
|
@@ -170,29 +221,29 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
|
|
|
private void initTable() {
|
|
|
// 配置
|
|
|
- List<String> configFields = Arrays.asList(
|
|
|
- ConfigConstant.CONFIG_MODEL_ID,
|
|
|
- ConfigConstant.CONFIG_MODEL_NAME,
|
|
|
- ConfigConstant.CONFIG_MODEL_TYPE,
|
|
|
- ConfigConstant.CONFIG_MODEL_CREATE_TIME,
|
|
|
- ConfigConstant.CONFIG_MODEL_UPDATE_TIME,
|
|
|
- ConfigConstant.CONFIG_MODEL_JSON);
|
|
|
+ List<FieldPair> configFields = Arrays.asList(
|
|
|
+ new FieldPair(ConfigConstant.CONFIG_MODEL_ID),
|
|
|
+ new FieldPair(ConfigConstant.CONFIG_MODEL_NAME),
|
|
|
+ new FieldPair(ConfigConstant.CONFIG_MODEL_TYPE),
|
|
|
+ new TimestampFieldPair(ConfigConstant.CONFIG_MODEL_CREATE_TIME, TABLE_CREATE_TIME),
|
|
|
+ new TimestampFieldPair(ConfigConstant.CONFIG_MODEL_UPDATE_TIME, TABLE_UPDATE_TIME),
|
|
|
+ new FieldPair(ConfigConstant.CONFIG_MODEL_JSON));
|
|
|
// 日志
|
|
|
- List<String> logFields = Arrays.asList(
|
|
|
- ConfigConstant.CONFIG_MODEL_ID,
|
|
|
- ConfigConstant.CONFIG_MODEL_NAME,
|
|
|
- ConfigConstant.CONFIG_MODEL_TYPE,
|
|
|
- ConfigConstant.CONFIG_MODEL_CREATE_TIME,
|
|
|
- ConfigConstant.CONFIG_MODEL_JSON);
|
|
|
+ List<FieldPair> logFields = Arrays.asList(
|
|
|
+ new FieldPair(ConfigConstant.CONFIG_MODEL_ID),
|
|
|
+ new FieldPair(ConfigConstant.CONFIG_MODEL_NAME),
|
|
|
+ new FieldPair(ConfigConstant.CONFIG_MODEL_TYPE),
|
|
|
+ new TimestampFieldPair(ConfigConstant.CONFIG_MODEL_CREATE_TIME, TABLE_CREATE_TIME),
|
|
|
+ new FieldPair(ConfigConstant.CONFIG_MODEL_JSON));
|
|
|
// 数据
|
|
|
- List<String> dataFields = Arrays.asList(
|
|
|
- ConfigConstant.CONFIG_MODEL_ID,
|
|
|
- ConfigConstant.CONFIG_MODEL_NAME,
|
|
|
- ConfigConstant.DATA_SUCCESS,
|
|
|
- ConfigConstant.DATA_EVENT,
|
|
|
- ConfigConstant.DATA_ERROR,
|
|
|
- ConfigConstant.CONFIG_MODEL_CREATE_TIME,
|
|
|
- ConfigConstant.CONFIG_MODEL_JSON);
|
|
|
+ List<FieldPair> dataFields = Arrays.asList(
|
|
|
+ new FieldPair(ConfigConstant.CONFIG_MODEL_ID),
|
|
|
+ new FieldPair(ConfigConstant.CONFIG_MODEL_NAME),
|
|
|
+ new FieldPair(ConfigConstant.DATA_SUCCESS),
|
|
|
+ new FieldPair(ConfigConstant.DATA_EVENT),
|
|
|
+ new FieldPair(ConfigConstant.DATA_ERROR),
|
|
|
+ new TimestampFieldPair(ConfigConstant.CONFIG_MODEL_CREATE_TIME, TABLE_CREATE_TIME),
|
|
|
+ new FieldPair(ConfigConstant.CONFIG_MODEL_JSON));
|
|
|
tables.putIfAbsent(StorageEnum.CONFIG.getType(), new Executor(StorageEnum.CONFIG, configFields));
|
|
|
tables.putIfAbsent(StorageEnum.LOG.getType(), new Executor(StorageEnum.LOG, logFields));
|
|
|
tables.putIfAbsent(StorageEnum.DATA.getType(), new Executor(StorageEnum.DATA, dataFields, true));
|
|
@@ -221,8 +272,8 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
jdbcTemplate.execute(ddl);
|
|
|
}
|
|
|
|
|
|
- List<String> fieldName = executor.getFieldName();
|
|
|
- String query = SqlBuilderEnum.QUERY.getSqlBuilder().buildSql(table, PK, fieldName, "", "", connector);
|
|
|
+ List<String> fieldName = executor.getFieldPairs().stream().map(p -> p.columnName).collect(Collectors.toList());
|
|
|
+ String query = new SqlBuilderQuery().buildStandardSql(table, PK, fieldName, "", "");
|
|
|
String insert = SqlBuilderEnum.INSERT.getSqlBuilder().buildSql(table, PK, fieldName, "", "", connector);
|
|
|
String update = SqlBuilderEnum.UPDATE.getSqlBuilder().buildSql(table, PK, fieldName, "", "", connector);
|
|
|
String delete = SqlBuilderEnum.DELETE.getSqlBuilder().buildSql(table, PK, fieldName, "", "", connector);
|
|
@@ -266,26 +317,63 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
this.config = config;
|
|
|
}
|
|
|
|
|
|
+ interface FieldHandler {
|
|
|
+ Object convert(Object val);
|
|
|
+ }
|
|
|
+
|
|
|
+ class FieldPair {
|
|
|
+ String labelName;
|
|
|
+ String columnName;
|
|
|
+ FieldHandler handler;
|
|
|
+
|
|
|
+ public FieldPair(String labelName) {
|
|
|
+ this.labelName = labelName;
|
|
|
+ this.columnName = labelName;
|
|
|
+ this.handler = val -> val;
|
|
|
+ }
|
|
|
+
|
|
|
+ public FieldPair(String labelName, String columnName) {
|
|
|
+ this.labelName = labelName;
|
|
|
+ this.columnName = columnName;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Object convert(Object val) {
|
|
|
+ return val;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ class TimestampFieldPair extends FieldPair {
|
|
|
+
|
|
|
+ public TimestampFieldPair(String labelName, String columnName) {
|
|
|
+ super(labelName, columnName);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object convert(Object val) {
|
|
|
+ return new Timestamp((Long) val);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
class Executor {
|
|
|
- private String query;
|
|
|
- private String insert;
|
|
|
- private String update;
|
|
|
- private String delete;
|
|
|
- private StorageEnum group;
|
|
|
- private List<String> fieldName;
|
|
|
- private boolean dynamicTableName;
|
|
|
+ private String query;
|
|
|
+ private String insert;
|
|
|
+ private String update;
|
|
|
+ private String delete;
|
|
|
+ private StorageEnum group;
|
|
|
+ private List<FieldPair> fieldPairs;
|
|
|
+ private boolean dynamicTableName;
|
|
|
|
|
|
public Executor() {
|
|
|
}
|
|
|
|
|
|
- public Executor(StorageEnum group, List<String> fieldName) {
|
|
|
+ public Executor(StorageEnum group, List<FieldPair> fieldPairs) {
|
|
|
this.group = group;
|
|
|
- this.fieldName = fieldName;
|
|
|
+ this.fieldPairs = fieldPairs;
|
|
|
}
|
|
|
|
|
|
- public Executor(StorageEnum group, List<String> fieldName, boolean dynamicTableName) {
|
|
|
+ public Executor(StorageEnum group, List<FieldPair> fieldPairs, boolean dynamicTableName) {
|
|
|
this.group = group;
|
|
|
- this.fieldName = fieldName;
|
|
|
+ this.fieldPairs = fieldPairs;
|
|
|
this.dynamicTableName = dynamicTableName;
|
|
|
}
|
|
|
|
|
@@ -329,8 +417,8 @@ public class MysqlStorageServiceImpl extends AbstractStorageService {
|
|
|
return group;
|
|
|
}
|
|
|
|
|
|
- public List<String> getFieldName() {
|
|
|
- return fieldName;
|
|
|
+ public List<FieldPair> getFieldPairs() {
|
|
|
+ return fieldPairs;
|
|
|
}
|
|
|
|
|
|
public boolean isDynamicTableName() {
|