AE86 %!s(int64=3) %!d(string=hai) anos
pai
achega
577fea87b4

+ 6 - 0
dbsyncer-common/src/main/java/org/dbsyncer/common/util/JsonUtil.java

@@ -3,6 +3,8 @@ package org.dbsyncer.common.util;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 
+import java.util.List;
+
 public abstract class JsonUtil {
 
     public static String objToJson(Object obj) {
@@ -13,4 +15,8 @@ public abstract class JsonUtil {
         return JSON.parseObject(json, valueType);
     }
 
+    public static <T> List<T> jsonToArray(String json, Class<T> valueType) {
+        return JSON.parseArray(json, valueType);
+    }
+
 }

+ 5 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/constant/ConnectorConstant.java

@@ -27,6 +27,11 @@ public class ConnectorConstant {
      */
     public static final String OPERTION_QUERY = "QUERY";
 
+    /**
+     * 查询过滤条件
+     */
+    public static final String OPERTION_QUERY_FILTER = "QUERY_FILTER";
+
     /**
      * 查询总数
      */

+ 52 - 52
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -68,58 +68,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return connectorMapper.execute(databaseTemplate -> DatabaseUtil.getMetaInfo(databaseTemplate, queryMetaSql.toString(), tableName));
     }
 
-    @Override
-    public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
-        // 获取过滤SQL
-        List<Filter> filter = commandConfig.getFilter();
-        String queryFilterSql = getQueryFilterSql(filter);
-
-        // 获取查询SQL
-        Table table = commandConfig.getTable();
-        Map<String, String> map = new HashMap<>();
-
-        String query = ConnectorConstant.OPERTION_QUERY;
-        map.put(query, buildSql(query, table, commandConfig.getOriginalTable(), queryFilterSql));
-
-        // 获取查询总数SQL
-        String quotation = buildSqlWithQuotation();
-        String pk = DatabaseUtil.findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
-        StringBuilder queryCount = new StringBuilder();
-        queryCount.append("SELECT COUNT(1) FROM (SELECT 1 FROM ").append(quotation).append(table.getName()).append(quotation);
-        if (StringUtil.isNotBlank(queryFilterSql)) {
-            queryCount.append(queryFilterSql);
-        }
-        queryCount.append(" GROUP BY ").append(pk).append(") DBSYNCER_T");
-        map.put(ConnectorConstant.OPERTION_QUERY_COUNT, queryCount.toString());
-        return map;
-    }
-
-    @Override
-    public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
-        // 获取增删改SQL
-        Map<String, String> map = new HashMap<>();
-        Table table = commandConfig.getTable();
-        Table originalTable = commandConfig.getOriginalTable();
-
-        String insert = SqlBuilderEnum.INSERT.getName();
-        map.put(insert, buildSql(insert, table, originalTable, null));
-
-        String update = SqlBuilderEnum.UPDATE.getName();
-        map.put(update, buildSql(update, table, originalTable, null));
-
-        String delete = SqlBuilderEnum.DELETE.getName();
-        map.put(delete, buildSql(delete, table, originalTable, null));
-
-        // 获取查询数据行是否存在
-        String quotation = buildSqlWithQuotation();
-        String pk = DatabaseUtil.findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
-        StringBuilder queryCount = new StringBuilder().append("SELECT COUNT(1) FROM ").append(quotation).append(table.getName()).append(
-                quotation).append(" WHERE ").append(pk).append(" = ?");
-        String queryCountExist = ConnectorConstant.OPERTION_QUERY_COUNT_EXIST;
-        map.put(queryCountExist, queryCount.toString());
-        return map;
-    }
-
     @Override
     public long getCount(DatabaseConnectorMapper connectorMapper, Map<String, String> command) {
         // 1、获取select SQL
@@ -254,6 +202,58 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return result;
     }
 
+    @Override
+    public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
+        // 获取过滤SQL
+        List<Filter> filter = commandConfig.getFilter();
+        String queryFilterSql = getQueryFilterSql(filter);
+
+        // 获取查询SQL
+        Table table = commandConfig.getTable();
+        Map<String, String> map = new HashMap<>();
+
+        String query = ConnectorConstant.OPERTION_QUERY;
+        map.put(query, buildSql(query, table, commandConfig.getOriginalTable(), queryFilterSql));
+
+        // 获取查询总数SQL
+        String quotation = buildSqlWithQuotation();
+        String pk = DatabaseUtil.findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
+        StringBuilder queryCount = new StringBuilder();
+        queryCount.append("SELECT COUNT(1) FROM (SELECT 1 FROM ").append(quotation).append(table.getName()).append(quotation);
+        if (StringUtil.isNotBlank(queryFilterSql)) {
+            queryCount.append(queryFilterSql);
+        }
+        queryCount.append(" GROUP BY ").append(pk).append(") DBSYNCER_T");
+        map.put(ConnectorConstant.OPERTION_QUERY_COUNT, queryCount.toString());
+        return map;
+    }
+
+    @Override
+    public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
+        // 获取增删改SQL
+        Map<String, String> map = new HashMap<>();
+        Table table = commandConfig.getTable();
+        Table originalTable = commandConfig.getOriginalTable();
+
+        String insert = SqlBuilderEnum.INSERT.getName();
+        map.put(insert, buildSql(insert, table, originalTable, null));
+
+        String update = SqlBuilderEnum.UPDATE.getName();
+        map.put(update, buildSql(update, table, originalTable, null));
+
+        String delete = SqlBuilderEnum.DELETE.getName();
+        map.put(delete, buildSql(delete, table, originalTable, null));
+
+        // 获取查询数据行是否存在
+        String quotation = buildSqlWithQuotation();
+        String pk = DatabaseUtil.findTablePrimaryKey(commandConfig.getOriginalTable(), quotation);
+        StringBuilder queryCount = new StringBuilder().append("SELECT COUNT(1) FROM ").append(quotation).append(table.getName()).append(
+                quotation).append(" WHERE ").append(pk).append(" = ?");
+        String queryCountExist = ConnectorConstant.OPERTION_QUERY_COUNT_EXIST;
+        map.put(queryCountExist, queryCount.toString());
+        return map;
+    }
+
     /**
      * 获取DQL表信息
      *

+ 8 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/enums/OperationEnum.java

@@ -25,6 +25,14 @@ public enum OperationEnum {
         this.name = name;
     }
 
+    public static boolean isAnd(String name) {
+        return AND.getName().equals(name);
+    }
+
+    public static boolean isOr(String name) {
+        return OR.getName().equals(name);
+    }
+
     public String getName() {
         return name;
     }

+ 92 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -2,13 +2,17 @@ package org.dbsyncer.connector.es;
 
 import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
 import org.dbsyncer.connector.AbstractConnector;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.constant.ConnectorConstant;
 import org.dbsyncer.connector.enums.ESFieldTypeEnum;
+import org.dbsyncer.connector.enums.FilterEnum;
+import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.util.ESUtil;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
@@ -25,6 +29,8 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.StatusToXContentObject;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
@@ -32,15 +38,30 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.PostConstruct;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 public final class ESConnector extends AbstractConnector implements Connector<ESConnectorMapper, ESConfig> {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
+    private static Map<String, FilterMapper> map = new ConcurrentHashMap();
+
+    static {
+        map.putIfAbsent(FilterEnum.EQUAL.getName(), (builder, k, v) -> builder.must(QueryBuilders.matchQuery(k, v)));
+        map.putIfAbsent(FilterEnum.NOT_EQUAL.getName(), (builder, k, v) -> builder.mustNot(QueryBuilders.matchQuery(k, v)));
+        map.putIfAbsent(FilterEnum.GT.getName(), (builder, k, v) -> builder.filter(QueryBuilders.rangeQuery(k).gt(v)));
+        map.putIfAbsent(FilterEnum.LT.getName(), (builder, k, v) -> builder.filter(QueryBuilders.rangeQuery(k).lt(v)));
+        map.putIfAbsent(FilterEnum.GT_AND_EQUAL.getName(), (builder, k, v) -> builder.filter(QueryBuilders.rangeQuery(k).gte(v)));
+        map.putIfAbsent(FilterEnum.LT_AND_EQUAL.getName(), (builder, k, v) -> builder.filter(QueryBuilders.rangeQuery(k).lte(v)));
+    }
+
     @Override
     public ConnectorMapper connect(ESConfig config) {
         return new ESConnectorMapper(config, ESUtil.getConnection(config));
@@ -115,7 +136,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             sourceBuilder.trackTotalHits(true);
             sourceBuilder.from(0);
             sourceBuilder.size(0);
-            SearchRequest request = new SearchRequest(new String[] {config.getIndex()}, sourceBuilder);
+            SearchRequest request = new SearchRequest(new String[]{config.getIndex()}, sourceBuilder);
             SearchResponse response = connectorMapper.getConnection().search(request, RequestOptions.DEFAULT);
             return response.getHits().getTotalHits();
         } catch (IOException e) {
@@ -128,16 +149,12 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
     public Result reader(ESConnectorMapper connectorMapper, ReaderConfig config) {
         ESConfig cfg = connectorMapper.getConfig();
         SearchSourceBuilder builder = new SearchSourceBuilder();
-        // 1、过滤条件
-        //sourceBuilder.query(q);
+        genSearchSourceBuilder(builder, config.getCommand());
         builder.from((config.getPageIndex() - 1) * config.getPageSize());
         builder.size(config.getPageSize());
         builder.timeout(TimeValue.timeValueMillis(10));
-        // 2、过滤字段
-        //sourceBuilder.fetchSource(new String[]{"id", "name"}, null);
 
         try {
-            // 3、查询结果
             SearchRequest rq = new SearchRequest(new String[]{cfg.getIndex()}, builder);
             SearchResponse searchResponse = connectorMapper.getConnection().search(rq, RequestOptions.DEFAULT);
             SearchHits hits = searchResponse.getHits();
@@ -216,6 +233,71 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         throw new ConnectorException(String.format("Unsupported event: %s", config.getEvent()));
     }
 
+    @Override
+    public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
+        Map<String, String> command = new HashMap<>();
+        // 查询字段
+        Table table = commandConfig.getTable();
+        List<Field> column = table.getColumn();
+        if (!CollectionUtils.isEmpty(column)) {
+            List<String> fieldNames = column.stream().map(c -> c.getName()).collect(Collectors.toList());
+            command.put(ConnectorConstant.OPERTION_QUERY, StringUtil.join(fieldNames, ","));
+        }
+
+        // 过滤条件
+        List<Filter> filter = commandConfig.getFilter();
+        if (!CollectionUtils.isEmpty(filter)) {
+            command.put(ConnectorConstant.OPERTION_QUERY_FILTER, JsonUtil.objToJson(filter));
+        }
+        return command;
+    }
+
+    private void genSearchSourceBuilder(SearchSourceBuilder builder, Map<String, String> command) {
+        // 查询字段
+        String fieldNamesJson = command.get(ConnectorConstant.OPERTION_QUERY);
+        if (!StringUtil.isBlank(fieldNamesJson)) {
+            builder.fetchSource(StringUtil.split(fieldNamesJson, ","), null);
+        }
+
+        // 过滤条件
+        String filterJson = command.get(ConnectorConstant.OPERTION_QUERY_FILTER);
+        if (StringUtil.isBlank(filterJson)) {
+            return;
+        }
+        List<Filter> filters = JsonUtil.jsonToArray(filterJson, Filter.class);
+        if (CollectionUtils.isEmpty(filters)) {
+            return;
+        }
+        // where (id = 1 and name = 'tom') or id = 2
+        BoolQueryBuilder q = QueryBuilders.boolQuery();
+        BoolQueryBuilder and = QueryBuilders.boolQuery();
+        for (Filter f : filters) {
+            if (OperationEnum.isAnd(f.getOperation())) {
+                genBoolQuery(and, f);
+                continue;
+            }
+
+            if (OperationEnum.isOr(f.getOperation())) {
+                genBoolQuery(q, f);
+            }
+        }
+
+        if (q.hasClauses() && and.hasClauses()) {
+            q.should(and);
+        } else {
+            q.filter(and);
+        }
+
+        builder.query(q);
+
+    }
+
+    private void genBoolQuery(BoolQueryBuilder builder, Filter f) {
+        if (map.containsKey(f.getFilter())) {
+            map.get(f.getFilter()).apply(builder, f.getName(), f.getValue());
+        }
+    }
+
     private Result execute(ESConnectorMapper connectorMapper, Map<String, Object> data, String id, RequestMapper mapper) {
         Result result = new Result();
         final ESConfig config = connectorMapper.getConfig();
@@ -242,4 +324,8 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
     private interface RequestMapper {
         StatusToXContentObject apply(String index, String type, String id) throws IOException;
     }
+
+    private interface FilterMapper {
+        void apply(BoolQueryBuilder builder, String key, String value);
+    }
 }