Browse Source

兼容ES6.x客户端API, 允许使用type=_doc类型

AE86 1 year ago
parent
commit
a8a9cf38c3

+ 5 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -36,6 +36,7 @@ import org.elasticsearch.client.indices.GetIndexRequest;
 import org.elasticsearch.client.indices.GetIndexResponse;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
@@ -114,7 +115,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             List<Table> tables = new ArrayList<>();
             // 6.x 版本
             if (Version.V_7_0_0.after(connectorMapper.getVersion())) {
-                Map<String, Object> sourceMap = mappingMetaData.sourceAsMap();
+                Map<String, Object> sourceMap = XContentHelper.convertToMap(mappingMetaData.source().compressedReference(), true, null).v2();
                 sourceMap.keySet().forEach(tableName -> tables.add(new Table(tableName)));
                 return tables;
             }
@@ -136,15 +137,15 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             GetIndexRequest request = new GetIndexRequest(config.getIndex());
             GetIndexResponse indexResponse = connectorMapper.getConnection().indices().get(request, RequestOptions.DEFAULT);
             MappingMetadata mappingMetaData = indexResponse.getMappings().get(config.getIndex());
-            Map<String, Object> sourceMap = mappingMetaData.sourceAsMap();
             // 6.x 版本
             if (Version.V_7_0_0.after(connectorMapper.getVersion())) {
+                Map<String, Object> sourceMap = XContentHelper.convertToMap(mappingMetaData.source().compressedReference(), true, null).v2();
                 parseProperties(fields, (Map) sourceMap.get(tableName));
                 return new MetaInfo().setColumn(fields);
             }
 
             // 7.x 版本以上
-            parseProperties(fields, sourceMap);
+            parseProperties(fields, mappingMetaData.sourceAsMap());
             return new MetaInfo().setColumn(fields);
         } catch (IOException e) {
             logger.error(e.getMessage());
@@ -224,7 +225,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             final String pk = pkFields.get(0).getName();
             data.forEach(row -> addRequest(request, cfg.getIndex(), config.getTableName(), config.getEvent(), String.valueOf(row.get(pk)), row));
 
-            BulkResponse response = connectorMapper.getConnection().bulk(request, RequestOptions.DEFAULT);
+            BulkResponse response = connectorMapper.getConnection().bulkWithVersion(request, RequestOptions.DEFAULT);
             RestStatus restStatus = response.status();
             if (restStatus.getStatus() != RestStatus.OK.getStatus()) {
                 throw new ConnectorException(String.format("error code:%s", restStatus.getStatus()));

+ 13 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/EasyRestHighLevelClient.java

@@ -2,6 +2,8 @@ package org.dbsyncer.connector.es;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Cancellable;
@@ -36,6 +38,17 @@ public class EasyRestHighLevelClient extends RestHighLevelClient {
         super(restClient, doClose, namedXContentEntries);
     }
 
+    /**
+     * Executes a bulk request using the Bulk API.
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API on elastic.co</a>
+     * @param bulkRequest the request
+     * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return the response
+     */
+    public final BulkResponse bulkWithVersion(BulkRequest bulkRequest, RequestOptions options) throws IOException {
+        return performRequestAndParseEntity(bulkRequest, RequestConverters::bulk, options, BulkResponse::fromXContent, emptySet());
+    }
+
     /**
      * Executes a search request using the Search API.
      * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on elastic.co</a>

+ 2 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/RequestConverters.java

@@ -157,10 +157,9 @@ final class RequestConverters {
                     if (Strings.hasLength(action.index())) {
                         metadata.field("_index", action.index());
                     }
+                    // 为了兼容6.x版本,必须传入
                     if (Strings.hasLength(action.type())) {
-                        if (MapperService.SINGLE_MAPPING_NAME.equals(action.type()) == false) {
-                            metadata.field("_type", action.type());
-                        }
+                        metadata.field("_type", action.type());
                     }
                     if (Strings.hasLength(action.id())) {
                         metadata.field("_id", action.id());