|
@@ -2,11 +2,11 @@ package org.dbsyncer.connector.database;
|
|
|
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.dbsyncer.common.util.CollectionUtils;
|
|
|
-import org.dbsyncer.connector.template.CommandTemplate;
|
|
|
import org.dbsyncer.connector.ConnectorException;
|
|
|
import org.dbsyncer.connector.config.*;
|
|
|
import org.dbsyncer.connector.enums.OperationEnum;
|
|
|
import org.dbsyncer.connector.enums.SqlBuilderEnum;
|
|
|
+import org.dbsyncer.connector.template.CommandTemplate;
|
|
|
import org.dbsyncer.connector.util.DatabaseUtil;
|
|
|
import org.dbsyncer.connector.util.JDBCUtil;
|
|
|
import org.slf4j.Logger;
|
|
@@ -17,10 +17,7 @@ import org.springframework.util.Assert;
|
|
|
import java.sql.Connection;
|
|
|
import java.sql.PreparedStatement;
|
|
|
import java.sql.SQLException;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
+import java.util.*;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
public abstract class AbstractDatabaseConnector implements Database {
|
|
@@ -78,7 +75,7 @@ public abstract class AbstractDatabaseConnector implements Database {
|
|
|
String metaSql = getMetaSql(cfg, tableName);
|
|
|
metaInfo = DatabaseUtil.getMetaInfo(jdbcTemplate, metaSql);
|
|
|
} catch (Exception e) {
|
|
|
- logger.error("getMetaInfo failed", e);
|
|
|
+ logger.error(e.getMessage());
|
|
|
} finally {
|
|
|
// 释放连接
|
|
|
this.close(jdbcTemplate);
|
|
@@ -88,12 +85,34 @@ public abstract class AbstractDatabaseConnector implements Database {
|
|
|
|
|
|
@Override
|
|
|
public Map<String, String> getSourceCommand(CommandTemplate commandTemplate) {
|
|
|
- return null;
|
|
|
+ // 获取过滤SQL
|
|
|
+ List<Filter> filter = commandTemplate.getFilter();
|
|
|
+ String queryFilterSql = getQueryFilterSql(filter);
|
|
|
+
|
|
|
+ // 获取查询SQL
|
|
|
+ Table table = commandTemplate.getTable();
|
|
|
+ String type = SqlBuilderEnum.QUERY.getName();
|
|
|
+ String querySql = getQuerySql(type, table, queryFilterSql);
|
|
|
+ Map<String, String> map = new HashMap<>();
|
|
|
+ map.put(type, querySql);
|
|
|
+ return map;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Map<String, String> getTargetCommand(CommandTemplate commandTemplate) {
|
|
|
- return null;
|
|
|
+ // 获取增删改SQL
|
|
|
+ Map<String, String> map = new HashMap<>();
|
|
|
+ Table table = commandTemplate.getTable();
|
|
|
+
|
|
|
+ String insert = SqlBuilderEnum.INSERT.getName();
|
|
|
+ map.put(insert, getQuerySql(insert, table, null));
|
|
|
+
|
|
|
+ String update = SqlBuilderEnum.UPDATE.getName();
|
|
|
+ map.put(update, getQuerySql(update, table, null));
|
|
|
+
|
|
|
+ String delete = SqlBuilderEnum.DELETE.getName();
|
|
|
+ map.put(delete, getQuerySql(delete, table, null));
|
|
|
+ return map;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -110,40 +129,51 @@ public abstract class AbstractDatabaseConnector implements Database {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public String getQueryFilterSql(List<Filter> filter) {
|
|
|
- if (CollectionUtils.isEmpty(filter)) {
|
|
|
- return "";
|
|
|
- }
|
|
|
- // 过滤条件SQL
|
|
|
- StringBuilder condition = new StringBuilder();
|
|
|
-
|
|
|
- // 拼接并且SQL
|
|
|
- String addSql = getFilterSql(OperationEnum.AND.getName(), filter);
|
|
|
- // 如果Add条件存在
|
|
|
- if (StringUtils.isNotBlank(addSql)) {
|
|
|
- condition.append(addSql);
|
|
|
- }
|
|
|
-
|
|
|
- // 拼接或者SQL
|
|
|
- String orSql = getFilterSql(OperationEnum.OR.getName(), filter);
|
|
|
- // 如果Or条件和Add条件都存在
|
|
|
- if (StringUtils.isNotBlank(orSql) && StringUtils.isNotBlank(addSql)) {
|
|
|
- condition.append(" OR ").append(orSql);
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * 获取DQL表信息
|
|
|
+ *
|
|
|
+ * @param config
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ protected List<String> getDqlTable(ConnectorConfig config) {
|
|
|
+ MetaInfo metaInfo = getDqlMetaInfo(config);
|
|
|
+ Assert.notNull(metaInfo, "SQL解析异常.");
|
|
|
+ DatabaseConfig cfg = (DatabaseConfig) config;
|
|
|
+ return Arrays.asList(cfg.getSql());
|
|
|
+ }
|
|
|
|
|
|
- // 如果有条件加上 WHERE
|
|
|
- StringBuilder queryFilterSql = new StringBuilder();
|
|
|
- if (StringUtils.isNotBlank(condition.toString())) {
|
|
|
- // WHERE (USER.USERNAME = 'zhangsan' AND USER.AGE='20') OR (USER.TEL='18299996666')
|
|
|
- queryFilterSql.insert(0, " WHERE ").append(condition);
|
|
|
+ /**
|
|
|
+ * 获取DQl元信息
|
|
|
+ *
|
|
|
+ * @param config
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ protected MetaInfo getDqlMetaInfo(ConnectorConfig config) {
|
|
|
+ DatabaseConfig cfg = (DatabaseConfig) config;
|
|
|
+ JdbcTemplate jdbcTemplate = null;
|
|
|
+ MetaInfo metaInfo = null;
|
|
|
+ try {
|
|
|
+ jdbcTemplate = getJdbcTemplate(cfg);
|
|
|
+ metaInfo = DatabaseUtil.getMetaInfo(jdbcTemplate, cfg.getSql());
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error(e.getMessage());
|
|
|
+ } finally {
|
|
|
+ // 释放连接
|
|
|
+ this.close(jdbcTemplate);
|
|
|
}
|
|
|
- return queryFilterSql.toString();
|
|
|
+ return metaInfo;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public String getJdbcSql(String opertion, DatabaseConfig config, Table table, String queryFilterSQL) {
|
|
|
- if(null == table){
|
|
|
+ /**
|
|
|
+ * 获取查询SQL
|
|
|
+ *
|
|
|
+ * @param type {@link SqlBuilderEnum}
|
|
|
+ * @param table
|
|
|
+ * @param queryFilterSQL
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private String getQuerySql(String type, Table table, String queryFilterSQL) {
|
|
|
+ if (null == table) {
|
|
|
logger.error("Table can not be null.");
|
|
|
throw new ConnectorException("Table can not be null.");
|
|
|
}
|
|
@@ -155,9 +185,9 @@ public abstract class AbstractDatabaseConnector implements Database {
|
|
|
// 获取主键
|
|
|
String pk = null;
|
|
|
// 去掉重复的查询字段
|
|
|
- List<String> filedNames = new ArrayList<String>();
|
|
|
+ List<String> filedNames = new ArrayList<>();
|
|
|
for (Field c : column) {
|
|
|
- if(c.isPk()){
|
|
|
+ if (c.isPk()) {
|
|
|
pk = c.getName();
|
|
|
}
|
|
|
String name = c.getName();
|
|
@@ -166,10 +196,6 @@ public abstract class AbstractDatabaseConnector implements Database {
|
|
|
filedNames.add(name);
|
|
|
}
|
|
|
}
|
|
|
- if(StringUtils.isBlank(pk)){
|
|
|
- logger.error("Table primary key can not be empty.");
|
|
|
- throw new ConnectorException("Table primary key can not be empty.");
|
|
|
- }
|
|
|
if (CollectionUtils.isEmpty(filedNames)) {
|
|
|
logger.error("The filedNames can not be empty.");
|
|
|
throw new ConnectorException("The filedNames can not be empty.");
|
|
@@ -179,92 +205,43 @@ public abstract class AbstractDatabaseConnector implements Database {
|
|
|
logger.error("Table name can not be empty.");
|
|
|
throw new ConnectorException("Table name can not be empty.");
|
|
|
}
|
|
|
- return SqlBuilderEnum.getSqlBuilder(opertion).buildSql(config, tableName, pk, filedNames, queryFilterSQL, this);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String getJdbcSqlQuartzRange(String tableName, String quartzFiled, String queryFilterSQL) {
|
|
|
- quartzFiled = tableName + "." + quartzFiled;
|
|
|
- StringBuilder f = new StringBuilder();
|
|
|
- // 如果没有加过滤条件就拼接WHERE语法
|
|
|
- // TB_USER.LASTDATE > ? AND TB_USER.LASTDATE <= ?
|
|
|
- f.append(StringUtils.isBlank(queryFilterSQL) ? " WHERE " : " AND ");
|
|
|
- // LASTDATE > '2017-11-10 11:07:41' AND LASTDATE <= '2017-11-10 11:30:01' ORDER BY LASTDATE
|
|
|
- f.append(quartzFiled).append(" > ?").append(" AND ").append(quartzFiled).append(" <= ?").append(" ORDER BY ").append(quartzFiled);
|
|
|
- return f.toString();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String getJdbcSqlQuartzAll(String tableName, String quartzFiled, String queryFilterSQL) {
|
|
|
- StringBuilder f = new StringBuilder();
|
|
|
- // 如果没有加过滤条件就拼接WHERE语法
|
|
|
- f.append(StringUtils.isBlank(queryFilterSQL) ? " WHERE " : " AND ");
|
|
|
- // TB_USER.LASTDATE <= ?
|
|
|
- f.append(tableName).append(".").append(quartzFiled).append(" <= ?").append(" ORDER BY ").append(tableName).append(".").append(quartzFiled);
|
|
|
- // TB_USER.LASTDATE <= '2017-11-10 11:07:41'
|
|
|
- return f.toString();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String getJdbcSqlQuartzMax(String tableName, String quartzFiled) {
|
|
|
- StringBuilder f = new StringBuilder();
|
|
|
- // SELECT MAX(USER.LASTDATE) FROM TB_USER
|
|
|
- f.append("SELECT MAX(").append(tableName).append(".").append(quartzFiled).append(") FROM ").append(tableName);
|
|
|
- return f.toString();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void batchRowsSetter(PreparedStatement ps, List<Field> fields, Map<String, Object> row) {
|
|
|
- if (CollectionUtils.isEmpty(fields)) {
|
|
|
- logger.error("Rows fields can not be empty.");
|
|
|
- throw new ConnectorException(String.format("Rows fields can not be empty."));
|
|
|
- }
|
|
|
- int fieldSize = fields.size();
|
|
|
- Field f = null;
|
|
|
- int type;
|
|
|
- Object val = null;
|
|
|
- for (int i = 0; i < fieldSize; i++) {
|
|
|
- // 取出字段和对应值
|
|
|
- f = fields.get(i);
|
|
|
- type = f.getType();
|
|
|
- val = row.get(f.getName());
|
|
|
- DatabaseUtil.preparedStatementSetter(ps, i + 1, type, val);
|
|
|
- }
|
|
|
+ return SqlBuilderEnum.getSqlBuilder(type).buildSql(tableName, pk, filedNames, queryFilterSQL, this);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 获取DQL表信息
|
|
|
+ * 获取查询条件SQL
|
|
|
*
|
|
|
- * @param config
|
|
|
+ * @param filter
|
|
|
* @return
|
|
|
*/
|
|
|
- protected List<String> getDqlTable(ConnectorConfig config) {
|
|
|
- MetaInfo metaInfo = getDqlMetaInfo(config);
|
|
|
- Assert.notNull(metaInfo, "SQL解析异常.");
|
|
|
- DatabaseConfig cfg = (DatabaseConfig) config;
|
|
|
- return Arrays.asList(cfg.getSql());
|
|
|
- }
|
|
|
+ private String getQueryFilterSql(List<Filter> filter) {
|
|
|
+ if (CollectionUtils.isEmpty(filter)) {
|
|
|
+ return "";
|
|
|
+ }
|
|
|
+ // 过滤条件SQL
|
|
|
+ StringBuilder condition = new StringBuilder();
|
|
|
|
|
|
- /**
|
|
|
- * 获取DQl元信息
|
|
|
- *
|
|
|
- * @param config
|
|
|
- * @return
|
|
|
- */
|
|
|
- protected MetaInfo getDqlMetaInfo(ConnectorConfig config) {
|
|
|
- DatabaseConfig cfg = (DatabaseConfig) config;
|
|
|
- JdbcTemplate jdbcTemplate = null;
|
|
|
- MetaInfo metaInfo = null;
|
|
|
- try {
|
|
|
- jdbcTemplate = getJdbcTemplate(cfg);
|
|
|
- metaInfo = DatabaseUtil.getMetaInfo(jdbcTemplate, cfg.getSql());
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error(e.getMessage());
|
|
|
- } finally {
|
|
|
- // 释放连接
|
|
|
- this.close(jdbcTemplate);
|
|
|
+ // 拼接并且SQL
|
|
|
+ String addSql = getFilterSql(OperationEnum.AND.getName(), filter);
|
|
|
+ // 如果Add条件存在
|
|
|
+ if (StringUtils.isNotBlank(addSql)) {
|
|
|
+ condition.append(addSql);
|
|
|
}
|
|
|
- return metaInfo;
|
|
|
+
|
|
|
+ // 拼接或者SQL
|
|
|
+ String orSql = getFilterSql(OperationEnum.OR.getName(), filter);
|
|
|
+ // 如果Or条件和Add条件都存在
|
|
|
+ if (StringUtils.isNotBlank(orSql) && StringUtils.isNotBlank(addSql)) {
|
|
|
+ condition.append(" OR ").append(orSql);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果有条件加上 WHERE
|
|
|
+ StringBuilder sql = new StringBuilder();
|
|
|
+ if (StringUtils.isNotBlank(condition.toString())) {
|
|
|
+ // WHERE (USER.USERNAME = 'zhangsan' AND USER.AGE='20') OR (USER.TEL='18299996666')
|
|
|
+ sql.insert(0, " WHERE ").append(condition);
|
|
|
+ }
|
|
|
+ return sql.toString();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -276,7 +253,7 @@ public abstract class AbstractDatabaseConnector implements Database {
|
|
|
*/
|
|
|
private String getFilterSql(String queryOperator, List<Filter> filter) {
|
|
|
List<Filter> list = filter.stream().filter(f -> StringUtils.equals(f.getOperation(), queryOperator)).collect(Collectors.toList());
|
|
|
- if(CollectionUtils.isEmpty(list)){
|
|
|
+ if (CollectionUtils.isEmpty(list)) {
|
|
|
return "";
|
|
|
}
|
|
|
|
|
@@ -297,4 +274,27 @@ public abstract class AbstractDatabaseConnector implements Database {
|
|
|
return sql.toString();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * @param ps 参数构造器
|
|
|
+ * @param fields 同步字段,例如[{name=ID, type=4}, {name=NAME, type=12}]
|
|
|
+ * @param row 同步字段对应的值,例如{ID=123, NAME=张三11}
|
|
|
+ */
|
|
|
+ private void batchRowsSetter(PreparedStatement ps, List<Field> fields, Map<String, Object> row) {
|
|
|
+ if (CollectionUtils.isEmpty(fields)) {
|
|
|
+ logger.error("Rows fields can not be empty.");
|
|
|
+ throw new ConnectorException(String.format("Rows fields can not be empty."));
|
|
|
+ }
|
|
|
+ int fieldSize = fields.size();
|
|
|
+ Field f = null;
|
|
|
+ int type;
|
|
|
+ Object val = null;
|
|
|
+ for (int i = 0; i < fieldSize; i++) {
|
|
|
+ // 取出字段和对应值
|
|
|
+ f = fields.get(i);
|
|
|
+ type = f.getType();
|
|
|
+ val = row.get(f.getName());
|
|
|
+ DatabaseUtil.preparedStatementSetter(ps, i + 1, type, val);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|