|
@@ -22,16 +22,18 @@ import org.elasticsearch.client.RestHighLevelClient;
|
|
import org.elasticsearch.client.indices.GetIndexRequest;
|
|
import org.elasticsearch.client.indices.GetIndexRequest;
|
|
import org.elasticsearch.client.indices.GetIndexResponse;
|
|
import org.elasticsearch.client.indices.GetIndexResponse;
|
|
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
|
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
|
|
|
+import org.elasticsearch.common.unit.TimeValue;
|
|
import org.elasticsearch.common.xcontent.StatusToXContentObject;
|
|
import org.elasticsearch.common.xcontent.StatusToXContentObject;
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
import org.elasticsearch.rest.RestStatus;
|
|
import org.elasticsearch.rest.RestStatus;
|
|
|
|
+import org.elasticsearch.search.SearchHit;
|
|
|
|
+import org.elasticsearch.search.SearchHits;
|
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
-import java.util.Collections;
|
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
|
|
@@ -105,16 +107,6 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
|
|
return new MetaInfo().setColumn(fields);
|
|
return new MetaInfo().setColumn(fields);
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
|
|
|
|
- return Collections.EMPTY_MAP;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
|
|
|
|
- return Collections.EMPTY_MAP;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@Override
|
|
@Override
|
|
public long getCount(ESConnectorMapper connectorMapper, Map<String, String> command) {
|
|
public long getCount(ESConnectorMapper connectorMapper, Map<String, String> command) {
|
|
try {
|
|
try {
|
|
@@ -123,7 +115,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
|
|
sourceBuilder.trackTotalHits(true);
|
|
sourceBuilder.trackTotalHits(true);
|
|
sourceBuilder.from(0);
|
|
sourceBuilder.from(0);
|
|
sourceBuilder.size(0);
|
|
sourceBuilder.size(0);
|
|
- SearchRequest request = new SearchRequest(new String[]{config.getIndex()}, sourceBuilder);
|
|
|
|
|
|
+ SearchRequest request = new SearchRequest(new String[] {config.getIndex()}, sourceBuilder);
|
|
SearchResponse response = connectorMapper.getConnection().search(request, RequestOptions.DEFAULT);
|
|
SearchResponse response = connectorMapper.getConnection().search(request, RequestOptions.DEFAULT);
|
|
return response.getHits().getTotalHits();
|
|
return response.getHits().getTotalHits();
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -134,7 +126,31 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Result reader(ESConnectorMapper connectorMapper, ReaderConfig config) {
|
|
public Result reader(ESConnectorMapper connectorMapper, ReaderConfig config) {
|
|
- return null;
|
|
|
|
|
|
+ ESConfig cfg = connectorMapper.getConfig();
|
|
|
|
+ SearchSourceBuilder builder = new SearchSourceBuilder();
|
|
|
|
+ // 1、过滤条件
|
|
|
|
+ //sourceBuilder.query(q);
|
|
|
|
+ builder.from((config.getPageIndex() - 1) * config.getPageSize());
|
|
|
|
+ builder.size(config.getPageSize());
|
|
|
|
+ builder.timeout(TimeValue.timeValueMillis(10));
|
|
|
|
+ // 2、过滤字段
|
|
|
|
+ //sourceBuilder.fetchSource(new String[]{"id", "name"}, null);
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ // 3、查询结果
|
|
|
|
+ SearchRequest rq = new SearchRequest(new String[]{cfg.getIndex()}, builder);
|
|
|
|
+ SearchResponse searchResponse = connectorMapper.getConnection().search(rq, RequestOptions.DEFAULT);
|
|
|
|
+ SearchHits hits = searchResponse.getHits();
|
|
|
|
+ SearchHit[] searchHits = hits.getHits();
|
|
|
|
+ List<Map<String, Object>> list = new ArrayList<>();
|
|
|
|
+ for (SearchHit hit : searchHits) {
|
|
|
|
+ list.add(hit.getSourceAsMap());
|
|
|
|
+ }
|
|
|
|
+ return new Result(new ArrayList<>(list));
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ logger.error(e.getMessage());
|
|
|
|
+ throw new ConnectorException(e.getMessage());
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -197,7 +213,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
- throw new ConnectorException("Unsupported event");
|
|
|
|
|
|
+ throw new ConnectorException(String.format("Unsupported event: %s", config.getEvent()));
|
|
}
|
|
}
|
|
|
|
|
|
private Result execute(ESConnectorMapper connectorMapper, Map<String, Object> data, String id, RequestMapper mapper) {
|
|
private Result execute(ESConnectorMapper connectorMapper, Map<String, Object> data, String id, RequestMapper mapper) {
|