Ver Fonte

!192 merge
Merge pull request !192 from AE86/ae86_dev

AE86 há 1 ano atrás
pai
commit
4e0092d862

+ 1 - 1
dbsyncer-biz/pom.xml

@@ -5,7 +5,7 @@
 	<parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.7</version>
+        <version>1.2.7_1103</version>
     </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-biz</artifactId>

+ 1 - 1
dbsyncer-cache/pom.xml

@@ -4,7 +4,7 @@
 	<parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.7</version>
+        <version>1.2.7_1103</version>
     </parent>
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>dbsyncer-cache</artifactId>

+ 1 - 1
dbsyncer-cluster/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.7</version>
+        <version>1.2.7_1103</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-cluster</artifactId>

+ 1 - 1
dbsyncer-common/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.7</version>
+        <version>1.2.7_1103</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-common</artifactId>

+ 1 - 1
dbsyncer-connector/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.7</version>
+        <version>1.2.7_1103</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-connector</artifactId>

+ 5 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -36,6 +36,7 @@ import org.elasticsearch.client.indices.GetIndexRequest;
 import org.elasticsearch.client.indices.GetIndexResponse;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
@@ -114,7 +115,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             List<Table> tables = new ArrayList<>();
             // 6.x 版本
             if (Version.V_7_0_0.after(connectorMapper.getVersion())) {
-                Map<String, Object> sourceMap = mappingMetaData.sourceAsMap();
+                Map<String, Object> sourceMap = XContentHelper.convertToMap(mappingMetaData.source().compressedReference(), true, null).v2();
                 sourceMap.keySet().forEach(tableName -> tables.add(new Table(tableName)));
                 return tables;
             }
@@ -136,15 +137,15 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             GetIndexRequest request = new GetIndexRequest(config.getIndex());
             GetIndexResponse indexResponse = connectorMapper.getConnection().indices().get(request, RequestOptions.DEFAULT);
             MappingMetadata mappingMetaData = indexResponse.getMappings().get(config.getIndex());
-            Map<String, Object> sourceMap = mappingMetaData.sourceAsMap();
             // 6.x 版本
             if (Version.V_7_0_0.after(connectorMapper.getVersion())) {
+                Map<String, Object> sourceMap = XContentHelper.convertToMap(mappingMetaData.source().compressedReference(), true, null).v2();
                 parseProperties(fields, (Map) sourceMap.get(tableName));
                 return new MetaInfo().setColumn(fields);
             }
 
             // 7.x 版本以上
-            parseProperties(fields, sourceMap);
+            parseProperties(fields, mappingMetaData.sourceAsMap());
             return new MetaInfo().setColumn(fields);
         } catch (IOException e) {
             logger.error(e.getMessage());
@@ -224,7 +225,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             final String pk = pkFields.get(0).getName();
             data.forEach(row -> addRequest(request, cfg.getIndex(), config.getTableName(), config.getEvent(), String.valueOf(row.get(pk)), row));
 
-            BulkResponse response = connectorMapper.getConnection().bulk(request, RequestOptions.DEFAULT);
+            BulkResponse response = connectorMapper.getConnection().bulkWithVersion(request, RequestOptions.DEFAULT);
             RestStatus restStatus = response.status();
             if (restStatus.getStatus() != RestStatus.OK.getStatus()) {
                 throw new ConnectorException(String.format("error code:%s", restStatus.getStatus()));

+ 13 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/EasyRestHighLevelClient.java

@@ -2,6 +2,8 @@ package org.dbsyncer.connector.es;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Cancellable;
@@ -36,6 +38,17 @@ public class EasyRestHighLevelClient extends RestHighLevelClient {
         super(restClient, doClose, namedXContentEntries);
     }
 
+    /**
+     * Executes a bulk request using the Bulk API.
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API on elastic.co</a>
+     * @param bulkRequest the request
+     * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return the response
+     */
+    public final BulkResponse bulkWithVersion(BulkRequest bulkRequest, RequestOptions options) throws IOException {
+        return performRequestAndParseEntity(bulkRequest, RequestConverters::bulk, options, BulkResponse::fromXContent, emptySet());
+    }
+
     /**
      * Executes a search request using the Search API.
      * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on elastic.co</a>

+ 2 - 3
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/RequestConverters.java

@@ -157,10 +157,9 @@ final class RequestConverters {
                     if (Strings.hasLength(action.index())) {
                         metadata.field("_index", action.index());
                     }
+                    // 为了兼容6.x版本,必须传入
                     if (Strings.hasLength(action.type())) {
-                        if (MapperService.SINGLE_MAPPING_NAME.equals(action.type()) == false) {
-                            metadata.field("_type", action.type());
-                        }
+                        metadata.field("_type", action.type());
                     }
                     if (Strings.hasLength(action.id())) {
                         metadata.field("_id", action.id());

+ 1 - 1
dbsyncer-listener/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.7</version>
+        <version>1.2.7_1103</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-listener</artifactId>

+ 1 - 1
dbsyncer-manager/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.7</version>
+        <version>1.2.7_1103</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-manager</artifactId>

+ 1 - 1
dbsyncer-monitor/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.7</version>
+        <version>1.2.7_1103</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-monitor</artifactId>

+ 1 - 1
dbsyncer-parser/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.7</version>
+        <version>1.2.7_1103</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-parser</artifactId>

+ 1 - 1
dbsyncer-plugin/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.7</version>
+        <version>1.2.7_1103</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-plugin</artifactId>

+ 1 - 1
dbsyncer-storage/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.7</version>
+        <version>1.2.7_1103</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-storage</artifactId>

+ 1 - 1
dbsyncer-web/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>dbsyncer</artifactId>
         <groupId>org.ghi</groupId>
-        <version>1.2.7</version>
+        <version>1.2.7_1103</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>dbsyncer-web</artifactId>

+ 1 - 1
pom.xml

@@ -6,7 +6,7 @@
 
     <groupId>org.ghi</groupId>
     <artifactId>dbsyncer</artifactId>
-    <version>1.2.7</version>
+    <version>1.2.7_1103</version>
     <packaging>pom</packaging>
     <name>dbsyncer</name>
     <url>https://gitee.com/ghi/dbsyncer</url>