فهرست منبع

支持ES版本8以上

AE86 6 ماه پیش
والد
کامیت
5dc13e98a4
12فایلهای تغییر یافته به همراه864 افزوده شده و 37 حذف شده
  1. 1 1
      README.md
  2. 25 27
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/ElasticsearchConnector.java
  3. 3 3
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/EasyRestHighLevelClient.java
  4. 13 0
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/EasyVersion.java
  5. 8 4
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/RequestConverters.java
  6. 544 0
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/bulk/BulkItemResponse.java
  7. 183 0
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/bulk/BulkResponse.java
  8. 20 0
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/index/EasyDeleteRequest.java
  9. 25 0
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/index/EasyIndexRequest.java
  10. 25 0
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/index/EasyUpdateRequest.java
  11. 15 0
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/index/RemoveTypeRequest.java
  12. 2 2
      dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/cdc/ESQuartzListener.java

+ 1 - 1
README.md

@@ -24,7 +24,7 @@
 | Oracle     | ✔ |  ✔ | 11g-19c |
 | SqlServer  | ✔ |  ✔ | 2008以上 |
 | PostgreSQL | ✔ |  ✔ | 9.5.25以上 |
-| ES         | ✔ |  ✔ | 6.x-7.x |
+| ES         | ✔ |  ✔ | 6.0.0-8.15.3 |
 | Kafka      | 开发中 |  ✔ | 2.10-0.9.0.0以上 |
 | File       | ✔ |  ✔ | *.txt, *.unl |
 | SQL        | ✔ |  | 支持以上关系型数据库 |

+ 25 - 27
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/ElasticsearchConnector.java

@@ -7,6 +7,11 @@ import org.dbsyncer.common.model.Result;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.JsonUtil;
 import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.elasticsearch.api.EasyVersion;
+import org.dbsyncer.connector.elasticsearch.api.bulk.BulkResponse;
+import org.dbsyncer.connector.elasticsearch.api.index.EasyDeleteRequest;
+import org.dbsyncer.connector.elasticsearch.api.index.EasyIndexRequest;
+import org.dbsyncer.connector.elasticsearch.api.index.EasyUpdateRequest;
 import org.dbsyncer.connector.elasticsearch.cdc.ESQuartzListener;
 import org.dbsyncer.connector.elasticsearch.config.ESConfig;
 import org.dbsyncer.connector.elasticsearch.enums.ESFieldTypeEnum;
@@ -32,15 +37,10 @@ import org.dbsyncer.sdk.model.Table;
 import org.dbsyncer.sdk.spi.ConnectorService;
 import org.dbsyncer.sdk.util.PrimaryKeyUtil;
 import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.GetAliasesResponse;
 import org.elasticsearch.client.IndicesClient;
 import org.elasticsearch.client.RequestOptions;
@@ -87,11 +87,11 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
 
     private final String TYPE = "Elasticsearch";
 
-    public static final String SOURCE_INDEX_NAME = "_source_index";
+    public static final String _SOURCE_INDEX = "_source_index";
 
-    public static final String TARGET_INDEX_NAME = "_target_index";
+    public static final String _TARGET_INDEX = "_target_index";
 
-    public static final String INDEX_TYPE = "_doc";
+    public static final String _TYPE = "_type";
 
     private final Map<String, FilterMapper> filters = new LinkedHashMap<>();
 
@@ -186,8 +186,8 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
             GetIndexRequest request = new GetIndexRequest(index);
             GetIndexResponse indexResponse = connectorInstance.getConnection().indices().get(request, RequestOptions.DEFAULT);
             MappingMetadata mappingMetaData = indexResponse.getMappings().get(index);
-            // 6.x 版本
-            if (Version.V_7_0_0.after(connectorInstance.getVersion())) {
+            // 低于7.x 版本
+            if (EasyVersion.V_7_0_0.after(connectorInstance.getVersion())) {
                 Map<String, Object> sourceMap = XContentHelper.convertToMap(mappingMetaData.source().compressedReference(), true, null).v2();
                 if (CollectionUtils.isEmpty(sourceMap)) {
                     throw new ElasticsearchException("未获取到索引配置");
@@ -219,12 +219,12 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
             SearchSourceBuilder builder = new SearchSourceBuilder();
             genSearchSourceBuilder(builder, command);
             // 7.x 版本以上
-            if (Version.V_7_0_0.onOrBefore(connectorInstance.getVersion())) {
+            if (EasyVersion.V_7_0_0.onOrBefore(connectorInstance.getVersion())) {
                 builder.trackTotalHits(true);
             }
             builder.from(0);
             builder.size(0);
-            SearchRequest request = new SearchRequest(new String[]{command.get(SOURCE_INDEX_NAME)}, builder);
+            SearchRequest request = new SearchRequest(new String[]{command.get(_SOURCE_INDEX)}, builder);
             SearchResponse response = connectorInstance.getConnection().searchWithVersion(request, RequestOptions.DEFAULT);
             return response.getHits().getTotalHits().value;
         } catch (IOException e) {
@@ -251,7 +251,7 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
         }
 
         try {
-            SearchRequest rq = new SearchRequest(new String[]{config.getCommand().get(SOURCE_INDEX_NAME)}, builder);
+            SearchRequest rq = new SearchRequest(new String[]{config.getCommand().get(_SOURCE_INDEX)}, builder);
             SearchResponse searchResponse = connectorInstance.getConnection().searchWithVersion(rq, RequestOptions.DEFAULT);
             SearchHits hits = searchResponse.getHits();
             SearchHit[] searchHits = hits.getHits();
@@ -282,9 +282,11 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
         try {
             final BulkRequest request = new BulkRequest();
             final String pk = pkFields.get(0).getName();
-            final String indexName = config.getCommand().get(TARGET_INDEX_NAME);
-            final String indexType = config.getCommand().get(INDEX_TYPE);
-            data.forEach(row -> addRequest(request, indexName, indexType, config.getEvent(), String.valueOf(row.get(pk)), row));
+            final String indexName = config.getCommand().get(_TARGET_INDEX);
+            final String type = config.getCommand().get(_TYPE);
+            // 8.x 版本以上废弃Type
+            boolean removeType = EasyVersion.V_8_0_0.onOrBefore(connectorInstance.getVersion());
+            data.forEach(row -> addRequest(request, indexName, type, removeType, config.getEvent(), String.valueOf(row.get(pk)), row));
 
             BulkResponse response = connectorInstance.getConnection().bulkWithVersion(request, RequestOptions.DEFAULT);
             RestStatus restStatus = response.status();
@@ -306,7 +308,7 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
         Map<String, String> command = new HashMap<>();
         // 查询字段
         Table table = commandConfig.getTable();
-        command.put(SOURCE_INDEX_NAME, table.getName());
+        command.put(_SOURCE_INDEX, table.getName());
         List<Field> column = table.getColumn();
         if (!CollectionUtils.isEmpty(column)) {
             List<String> fieldNames = column.stream().map(c -> c.getName()).collect(Collectors.toList());
@@ -326,8 +328,8 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
         Table table = commandConfig.getTable();
         PrimaryKeyUtil.findTablePrimaryKeys(table);
         Map<String, String> command = new HashMap<>();
-        command.put(TARGET_INDEX_NAME, table.getName());
-        command.put(INDEX_TYPE, table.getIndexType());
+        command.put(_TARGET_INDEX, table.getName());
+        command.put(_TYPE, table.getIndexType());
         return command;
     }
 
@@ -416,21 +418,17 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
         }
     }
 
-    private void addRequest(BulkRequest request, String index, String type, String event, String id, Map data) {
+    private void addRequest(BulkRequest request, String index, String type, boolean removeType, String event, String id, Map data) {
         if (isUpdate(event)) {
-            UpdateRequest req = new UpdateRequest(index, type, id);
-            req.doc(data, XContentType.JSON);
-            request.add(req);
+            request.add(new EasyUpdateRequest(index, type, removeType, id, data, XContentType.JSON));
             return;
         }
         if (isInsert(event)) {
-            IndexRequest req = new IndexRequest(index, type, id);
-            req.source(data, XContentType.JSON);
-            request.add(req);
+            request.add(new EasyIndexRequest(index, type, removeType, id, data, XContentType.JSON));
             return;
         }
         if (isDelete(event)) {
-            request.add(new DeleteRequest(index, type, id));
+            request.add(new EasyDeleteRequest(index, type, removeType, id));
         }
     }
 

+ 3 - 3
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/EasyRestHighLevelClient.java

@@ -3,10 +3,10 @@
  */
 package org.dbsyncer.connector.elasticsearch.api;
 
+import org.dbsyncer.connector.elasticsearch.api.bulk.BulkResponse;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Cancellable;
@@ -56,7 +56,7 @@ public final class EasyRestHighLevelClient extends RestHighLevelClient {
      * @return the response
      */
     public final BulkResponse bulkWithVersion(BulkRequest bulkRequest, RequestOptions options) throws IOException {
-        return performRequestAndParseEntity(bulkRequest, RequestConverters::bulk, options, BulkResponse::fromXContent, emptySet());
+        return performRequest(bulkRequest, RequestConverters::bulk, options, response -> parseEntity(response.getEntity(), BulkResponse::fromXContent), emptySet());
     }
 
     /**
@@ -105,4 +105,4 @@ public final class EasyRestHighLevelClient extends RestHighLevelClient {
     public void setVersion(Version version) {
         this.version = version;
     }
-}
+}

+ 13 - 0
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/EasyVersion.java

@@ -0,0 +1,13 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
+package org.dbsyncer.connector.elasticsearch.api;
+
+import org.elasticsearch.Version;
+
+public class EasyVersion {
+
+    public static final Version V_7_0_0 = Version.V_7_0_0;
+    public static final Version V_8_0_0 = Version.fromId(8000099);
+
+}

+ 8 - 4
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/RequestConverters.java

@@ -12,6 +12,7 @@ import org.apache.http.client.methods.HttpPut;
 import org.apache.http.entity.ContentType;
 import org.apache.http.nio.entity.NByteArrayEntity;
 import org.apache.lucene.util.BytesRef;
+import org.dbsyncer.connector.elasticsearch.api.index.RemoveTypeRequest;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
@@ -167,9 +168,12 @@ final class RequestConverters {
                     if (Strings.hasLength(action.index())) {
                         metadata.field("_index", action.index());
                     }
-                    // 为了兼容6.x版本,必须传入
-                    if (Strings.hasLength(action.type())) {
-                        metadata.field("_type", action.type());
+                    if (action instanceof RemoveTypeRequest) {
+                        RemoveTypeRequest req = (RemoveTypeRequest) action;
+                        // 为了兼容6.x版本,必须传入
+                        if (!req.isRemoveType() && Strings.hasLength(action.type())) {
+                            metadata.field("_type", action.type());
+                        }
                     }
                     if (Strings.hasLength(action.id())) {
                         metadata.field("_id", action.id());
@@ -1252,4 +1256,4 @@ final class RequestConverters {
             }
         }
     }
-}
+}

+ 544 - 0
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/bulk/BulkItemResponse.java

@@ -0,0 +1,544 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
+package org.dbsyncer.connector.elasticsearch.api.bulk;
+
+import org.dbsyncer.common.util.StringUtil;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.DocWriteRequest.OpType;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.bulk.BulkItemRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.common.CheckedConsumer;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.StatusToXContentObject;
+import org.elasticsearch.common.xcontent.ToXContentFragment;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.index.seqno.SequenceNumbers;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.rest.RestStatus;
+
+import java.io.IOException;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
+import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownField;
+
+/**
+ * Represents a single item response for an action executed as part of the bulk API. Holds the index/type/id
+ * of the relevant action, and if it has failed or not (with the failure message in case it failed).
+ */
+public class BulkItemResponse implements Writeable, StatusToXContentObject {
+
+    private static final String _INDEX = "_index";
+    private static final String _TYPE = "_type";
+    private static final String _ID = "_id";
+    private static final String STATUS = "status";
+    private static final String ERROR = "error";
+
+    @Override
+    public RestStatus status() {
+        return failure == null ? response.status() : failure.getStatus();
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.startObject(opType.getLowercase());
+        if (failure == null) {
+            response.innerToXContent(builder, params);
+            builder.field(STATUS, response.status().getStatus());
+        } else {
+            builder.field(_INDEX, failure.getIndex());
+            builder.field(_TYPE, failure.getType());
+            builder.field(_ID, failure.getId());
+            builder.field(STATUS, failure.getStatus().getStatus());
+            builder.startObject(ERROR);
+            ElasticsearchException.generateThrowableXContent(builder, params, failure.getCause());
+            builder.endObject();
+        }
+        builder.endObject();
+        builder.endObject();
+        return builder;
+    }
+
+    /**
+     * Reads a {@link BulkItemResponse} from a {@link XContentParser}.
+     *
+     * @param parser the {@link XContentParser}
+     * @param id the id to assign to the parsed {@link BulkItemResponse}. It is usually the index of
+     *           the item in the {@link BulkResponse#getItems} array.
+     */
+    public static BulkItemResponse fromXContent(XContentParser parser, int id) throws IOException {
+        ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
+
+        XContentParser.Token token = parser.nextToken();
+        ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser);
+
+        String currentFieldName = parser.currentName();
+        token = parser.nextToken();
+
+        final OpType opType = OpType.fromString(currentFieldName);
+        ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);
+
+        DocWriteResponse.Builder builder = null;
+        CheckedConsumer<XContentParser, IOException> itemParser = null;
+
+        if (opType == OpType.INDEX || opType == OpType.CREATE) {
+            final IndexResponse.Builder indexResponseBuilder = new IndexResponse.Builder();
+            builder = indexResponseBuilder;
+            itemParser = (indexParser) -> IndexResponse.parseXContentFields(indexParser, indexResponseBuilder);
+
+        } else if (opType == OpType.UPDATE) {
+            final UpdateResponse.Builder updateResponseBuilder = new UpdateResponse.Builder();
+            builder = updateResponseBuilder;
+            itemParser = (updateParser) -> UpdateResponse.parseXContentFields(updateParser, updateResponseBuilder);
+
+        } else if (opType == OpType.DELETE) {
+            final DeleteResponse.Builder deleteResponseBuilder = new DeleteResponse.Builder();
+            builder = deleteResponseBuilder;
+            itemParser = (deleteParser) -> DeleteResponse.parseXContentFields(deleteParser, deleteResponseBuilder);
+        } else {
+            throwUnknownField(currentFieldName, parser.getTokenLocation());
+        }
+
+        RestStatus status = null;
+        ElasticsearchException exception = null;
+        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+            if (token == XContentParser.Token.FIELD_NAME) {
+                currentFieldName = parser.currentName();
+            }
+
+            if (ERROR.equals(currentFieldName)) {
+                if (token == XContentParser.Token.START_OBJECT) {
+                    exception = ElasticsearchException.fromXContent(parser);
+                }
+            } else if (STATUS.equals(currentFieldName)) {
+                if (token == XContentParser.Token.VALUE_NUMBER) {
+                    status = RestStatus.fromCode(parser.intValue());
+                }
+            } else {
+                itemParser.accept(parser);
+            }
+        }
+
+        ensureExpectedToken(XContentParser.Token.END_OBJECT, token, parser);
+        token = parser.nextToken();
+        ensureExpectedToken(XContentParser.Token.END_OBJECT, token, parser);
+
+        BulkItemResponse bulkItemResponse;
+        if (exception != null) {
+            BulkItemResponse.Failure failure = new BulkItemResponse.Failure(builder.getShardId().getIndexName(), builder.getType(), builder.getId(), exception, status);
+            bulkItemResponse = new BulkItemResponse(id, opType, failure);
+        } else {
+            builder.setType(builder.getType() == null ? StringUtil.EMPTY : builder.getType());
+            bulkItemResponse = new BulkItemResponse(id, opType, builder.build());
+        }
+        return bulkItemResponse;
+    }
+
+    /**
+     * Represents a failure.
+     */
+    public static class Failure implements Writeable, ToXContentFragment {
+        public static final String INDEX_FIELD = "index";
+        public static final String TYPE_FIELD = "type";
+        public static final String ID_FIELD = "id";
+        public static final String CAUSE_FIELD = "cause";
+        public static final String STATUS_FIELD = "status";
+
+        private final String index;
+        private final String type;
+        private final String id;
+        private final Exception cause;
+        private final RestStatus status;
+        private final long seqNo;
+        private final long term;
+        private final boolean aborted;
+
+        public static final ConstructingObjectParser<BulkItemResponse.Failure, Void> PARSER =
+                new ConstructingObjectParser<>(
+                        "bulk_failures",
+                        true,
+                        a ->
+                                new BulkItemResponse.Failure(
+                                        (String)a[0], (String)a[1], (String)a[2], (Exception)a[3], RestStatus.fromCode((int)a[4])
+                                )
+                );
+        static {
+            PARSER.declareString(constructorArg(), new ParseField(INDEX_FIELD));
+            PARSER.declareString(constructorArg(), new ParseField(TYPE_FIELD));
+            PARSER.declareString(optionalConstructorArg(), new ParseField(ID_FIELD));
+            PARSER.declareObject(constructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), new ParseField(CAUSE_FIELD));
+            PARSER.declareInt(constructorArg(), new ParseField(STATUS_FIELD));
+        }
+
+        /**
+         * For write failures before operation was assigned a sequence number.
+         *
+         * use @{link {@link #Failure(String, String, String, Exception, long, long)}}
+         * to record operation sequence no with failure
+         */
+        public Failure(String index, String type, String id, Exception cause) {
+            this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbers.UNASSIGNED_SEQ_NO,
+                    SequenceNumbers.UNASSIGNED_PRIMARY_TERM, false);
+        }
+
+        public Failure(String index, String type, String id, Exception cause, boolean aborted) {
+            this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbers.UNASSIGNED_SEQ_NO,
+                    SequenceNumbers.UNASSIGNED_PRIMARY_TERM, aborted);
+        }
+
+        public Failure(String index, String type, String id, Exception cause, RestStatus status) {
+            this(index, type, id, cause, status, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, false);
+        }
+
+        /** For write failures after operation was assigned a sequence number. */
+        public Failure(String index, String type, String id, Exception cause, long seqNo, long term) {
+            this(index, type, id, cause, ExceptionsHelper.status(cause), seqNo, term, false);
+        }
+
+        private Failure(String index, String type, String id, Exception cause, RestStatus status, long seqNo, long term, boolean aborted) {
+            this.index = index;
+            this.type = type;
+            this.id = id;
+            this.cause = cause;
+            this.status = status;
+            this.seqNo = seqNo;
+            this.term = term;
+            this.aborted = aborted;
+        }
+
+        /**
+         * Read from a stream.
+         */
+        public Failure(StreamInput in) throws IOException {
+            index = in.readString();
+            type = in.readString();
+            id = in.readOptionalString();
+            cause = in.readException();
+            status = ExceptionsHelper.status(cause);
+            seqNo = in.readZLong();
+            if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
+                term = in.readVLong();
+            } else {
+                term = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
+            }
+            aborted = in.readBoolean();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeString(index);
+            out.writeString(type);
+            out.writeOptionalString(id);
+            out.writeException(cause);
+            out.writeZLong(seqNo);
+            if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
+                out.writeVLong(term);
+            }
+            out.writeBoolean(aborted);
+        }
+
+        /**
+         * The index name of the action.
+         */
+        public String getIndex() {
+            return this.index;
+        }
+
+        /**
+         * The type of the action.
+         */
+        public String getType() {
+            return type;
+        }
+
+        /**
+         * The id of the action.
+         */
+        public String getId() {
+            return id;
+        }
+
+        /**
+         * The failure message.
+         */
+        public String getMessage() {
+            return this.cause.toString();
+        }
+
+        /**
+         * The rest status.
+         */
+        public RestStatus getStatus() {
+            return this.status;
+        }
+
+        /**
+         * The actual cause of the failure.
+         */
+        public Exception getCause() {
+            return cause;
+        }
+
+        /**
+         * The operation sequence number generated by primary
+         * NOTE: {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
+         * indicates sequence number was not generated by primary
+         */
+        public long getSeqNo() {
+            return seqNo;
+        }
+
+        /**
+         * The operation primary term of the primary
+         * NOTE: {@link SequenceNumbers#UNASSIGNED_PRIMARY_TERM}
+         * indicates primary term was not assigned by primary
+         */
+        public long getTerm() {
+            return term;
+        }
+
+        /**
+         * Whether this failure is the result of an <em>abort</em>.
+         * If {@code true}, the request to which this failure relates should never be retried, regardless of the {@link #getCause() cause}.
+         * @see BulkItemRequest#abort(String, Exception)
+         */
+        public boolean isAborted() {
+            return aborted;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.field(INDEX_FIELD, index);
+            builder.field(TYPE_FIELD, type);
+            if (id != null) {
+                builder.field(ID_FIELD, id);
+            }
+            builder.startObject(CAUSE_FIELD);
+            ElasticsearchException.generateThrowableXContent(builder, params, cause);
+            builder.endObject();
+            builder.field(STATUS_FIELD, status.getStatus());
+            return builder;
+        }
+
+        public static BulkItemResponse.Failure fromXContent(XContentParser parser) {
+            return PARSER.apply(parser, null);
+        }
+
+        @Override
+        public String toString() {
+            return Strings.toString(this);
+        }
+    }
+
+    private int id;
+
+    private OpType opType;
+
+    private DocWriteResponse response;
+
+    private BulkItemResponse.Failure failure;
+
+    BulkItemResponse() {}
+
+    BulkItemResponse(ShardId shardId, StreamInput in) throws IOException {
+        id = in.readVInt();
+        opType = OpType.fromId(in.readByte());
+
+        byte type = in.readByte();
+        if (type == 0) {
+            response = new IndexResponse(shardId, in);
+        } else if (type == 1) {
+            response = new DeleteResponse(shardId, in);
+        } else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
+            response = new UpdateResponse(shardId, in);
+        } else if (type != 2) {
+            throw new IllegalArgumentException("Unexpected type [" + type + "]");
+        }
+
+        if (in.readBoolean()) {
+            failure = new BulkItemResponse.Failure(in);
+        }
+    }
+
+    BulkItemResponse(StreamInput in) throws IOException {
+        id = in.readVInt();
+        opType = OpType.fromId(in.readByte());
+
+        byte type = in.readByte();
+        if (type == 0) {
+            response = new IndexResponse(in);
+        } else if (type == 1) {
+            response = new DeleteResponse(in);
+        } else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
+            response = new UpdateResponse(in);
+        } else if (type != 2) {
+            throw new IllegalArgumentException("Unexpected type [" + type + "]");
+        }
+
+        if (in.readBoolean()) {
+            failure = new BulkItemResponse.Failure(in);
+        }
+    }
+
+    public BulkItemResponse(int id, OpType opType, DocWriteResponse response) {
+        this.id = id;
+        this.response = response;
+        this.opType = opType;
+    }
+
+    public BulkItemResponse(int id, OpType opType, BulkItemResponse.Failure failure) {
+        this.id = id;
+        this.opType = opType;
+        this.failure = failure;
+    }
+
+    /**
+     * The numeric order of the item matching the same request order in the bulk request.
+     */
+    public int getItemId() {
+        return id;
+    }
+
+    /**
+     * The operation type ("index", "create" or "delete").
+     */
+    public OpType getOpType() {
+        return this.opType;
+    }
+
+    /**
+     * The index name of the action.
+     */
+    public String getIndex() {
+        if (failure != null) {
+            return failure.getIndex();
+        }
+        return response.getIndex();
+    }
+
+    /**
+     * The type of the action.
+     */
+    public String getType() {
+        if (failure != null) {
+            return failure.getType();
+        }
+        return response.getType();
+    }
+
+    /**
+     * The id of the action.
+     */
+    public String getId() {
+        if (failure != null) {
+            return failure.getId();
+        }
+        return response.getId();
+    }
+
+    /**
+     * The version of the action.
+     */
+    public long getVersion() {
+        if (failure != null) {
+            return -1;
+        }
+        return response.getVersion();
+    }
+
+    /**
+     * The actual response ({@link IndexResponse} or {@link DeleteResponse}). {@code null} in
+     * case of failure.
+     */
+    public <T extends DocWriteResponse> T getResponse() {
+        return (T) response;
+    }
+
+    /**
+     * Is this a failed execution of an operation.
+     */
+    public boolean isFailed() {
+        return failure != null;
+    }
+
+    /**
+     * The failure message, {@code null} if it did not fail.
+     */
+    public String getFailureMessage() {
+        if (failure != null) {
+            return failure.getMessage();
+        }
+        return null;
+    }
+
+    /**
+     * The actual failure object if there was a failure.
+     */
+    public BulkItemResponse.Failure getFailure() {
+        return this.failure;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeVInt(id);
+        out.writeByte(opType.getId());
+
+        if (response == null) {
+            out.writeByte((byte) 2);
+        } else {
+            writeResponseType(out);
+            response.writeTo(out);
+        }
+        if (failure == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            failure.writeTo(out);
+        }
+    }
+
+    public void writeThin(StreamOutput out) throws IOException {
+        out.writeVInt(id);
+        out.writeByte(opType.getId());
+
+        if (response == null) {
+            out.writeByte((byte) 2);
+        } else {
+            writeResponseType(out);
+            response.writeThin(out);
+        }
+        if (failure == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            failure.writeTo(out);
+        }
+    }
+
+    private void writeResponseType(StreamOutput out) throws IOException {
+        if (response instanceof IndexResponse) {
+            out.writeByte((byte) 0);
+        } else if (response instanceof DeleteResponse) {
+            out.writeByte((byte) 1);
+        } else if (response instanceof UpdateResponse) {
+            out.writeByte((byte) 3); // make 3 instead of 2, because 2 is already in use for 'no responses'
+        } else {
+            throw new IllegalStateException("Unexpected response type found [" + response.getClass() + "]");
+        }
+    }
+}

+ 183 - 0
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/bulk/BulkResponse.java

@@ -0,0 +1,183 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
+package org.dbsyncer.connector.elasticsearch.api.bulk;
+
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.StatusToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.rest.RestStatus;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
+import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownField;
+import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownToken;
+
+/**
+ * A response of a bulk execution. Holding a response for each item responding (in order) of the
+ * bulk requests. Each item holds the index/type/id is operated on, and if it failed or not (with the
+ * failure message).
+ */
+public class BulkResponse extends ActionResponse implements Iterable<BulkItemResponse>, StatusToXContentObject {
+
+    private static final String ITEMS = "items";
+    private static final String ERRORS = "errors";
+    private static final String TOOK = "took";
+    private static final String INGEST_TOOK = "ingest_took";
+
+    public static final long NO_INGEST_TOOK = -1L;
+
+    private final BulkItemResponse[] responses;
+    private final long tookInMillis;
+    private final long ingestTookInMillis;
+
+    public BulkResponse(StreamInput in) throws IOException {
+        super(in);
+        responses = in.readArray(BulkItemResponse::new, BulkItemResponse[]::new);
+        tookInMillis = in.readVLong();
+        ingestTookInMillis = in.readZLong();
+    }
+
+    public BulkResponse(BulkItemResponse[] responses, long tookInMillis) {
+        this(responses, tookInMillis, NO_INGEST_TOOK);
+    }
+
+    public BulkResponse(BulkItemResponse[] responses, long tookInMillis, long ingestTookInMillis) {
+        this.responses = responses;
+        this.tookInMillis = tookInMillis;
+        this.ingestTookInMillis = ingestTookInMillis;
+    }
+
+    /**
+     * How long the bulk execution took. Excluding ingest preprocessing.
+     */
+    public TimeValue getTook() {
+        return new TimeValue(tookInMillis);
+    }
+
+    /**
+     * If ingest is enabled returns the bulk ingest preprocessing time, otherwise 0 is returned.
+     */
+    public TimeValue getIngestTook() {
+        return new TimeValue(ingestTookInMillis);
+    }
+
+    /**
+     * If ingest is enabled returns the bulk ingest preprocessing time. in milliseconds, otherwise -1 is returned.
+     */
+    public long getIngestTookInMillis() {
+        return ingestTookInMillis;
+    }
+
+    /**
+     * Has anything failed with the execution.
+     */
+    public boolean hasFailures() {
+        for (BulkItemResponse response : responses) {
+            if (response.isFailed()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public String buildFailureMessage() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("failure in bulk execution:");
+        for (int i = 0; i < responses.length; i++) {
+            BulkItemResponse response = responses[i];
+            if (response.isFailed()) {
+                sb.append("\n[").append(i)
+                        .append("]: index [").append(response.getIndex()).append("], type [")
+                        .append(response.getType()).append("], id [").append(response.getId())
+                        .append("], message [").append(response.getFailureMessage()).append("]");
+            }
+        }
+        return sb.toString();
+    }
+
+    /**
+     * The items representing each action performed in the bulk operation (in the same order!).
+     */
+    public BulkItemResponse[] getItems() {
+        return responses;
+    }
+
+    @Override
+    public Iterator<BulkItemResponse> iterator() {
+        return Arrays.stream(responses).iterator();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeArray(responses);
+        out.writeVLong(tookInMillis);
+        out.writeZLong(ingestTookInMillis);
+    }
+
+    @Override
+    public RestStatus status() {
+        return RestStatus.OK;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(TOOK, tookInMillis);
+        if (ingestTookInMillis != BulkResponse.NO_INGEST_TOOK) {
+            builder.field(INGEST_TOOK, ingestTookInMillis);
+        }
+        builder.field(ERRORS, hasFailures());
+        builder.startArray(ITEMS);
+        for (BulkItemResponse item : this) {
+            item.toXContent(builder, params);
+        }
+        builder.endArray();
+        builder.endObject();
+        return builder;
+    }
+
+    public static BulkResponse fromXContent(XContentParser parser) throws IOException {
+        XContentParser.Token token = parser.nextToken();
+        ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);
+
+        long took = -1L;
+        long ingestTook = NO_INGEST_TOOK;
+        List<BulkItemResponse> items = new ArrayList<>();
+
+        String currentFieldName = parser.currentName();
+        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+            if (token == XContentParser.Token.FIELD_NAME) {
+                currentFieldName = parser.currentName();
+            } else if (token.isValue()) {
+                if (TOOK.equals(currentFieldName)) {
+                    took = parser.longValue();
+                } else if (INGEST_TOOK.equals(currentFieldName)) {
+                    ingestTook = parser.longValue();
+                } else if (ERRORS.equals(currentFieldName) == false) {
+                    throwUnknownField(currentFieldName, parser.getTokenLocation());
+                }
+            } else if (token == XContentParser.Token.START_ARRAY) {
+                if (ITEMS.equals(currentFieldName)) {
+                    while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
+                        items.add(BulkItemResponse.fromXContent(parser, items.size()));
+                    }
+                } else {
+                    throwUnknownField(currentFieldName, parser.getTokenLocation());
+                }
+            } else {
+                throwUnknownToken(token, parser.getTokenLocation());
+            }
+        }
+        return new BulkResponse(items.toArray(new BulkItemResponse[items.size()]), took, ingestTook);
+    }
+}

+ 20 - 0
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/index/EasyDeleteRequest.java

@@ -0,0 +1,20 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
+package org.dbsyncer.connector.elasticsearch.api.index;
+
+import org.elasticsearch.action.delete.DeleteRequest;
+
+public class EasyDeleteRequest extends DeleteRequest implements RemoveTypeRequest {
+    private boolean removeType;
+
+    public EasyDeleteRequest(String index, String type, boolean removeType, String id) {
+        super(index, type, id);
+        this.removeType = removeType;
+    }
+
+    @Override
+    public boolean isRemoveType() {
+        return removeType;
+    }
+}

+ 25 - 0
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/index/EasyIndexRequest.java

@@ -0,0 +1,25 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
+package org.dbsyncer.connector.elasticsearch.api.index;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.util.Map;
+
+public class EasyIndexRequest extends IndexRequest implements RemoveTypeRequest {
+
+    private boolean removeType;
+
+    public EasyIndexRequest(String index, String type, boolean removeType, String id, Map data, XContentType xContentType) {
+        super(index, type, id);
+        source(data, xContentType);
+        this.removeType = removeType;
+    }
+
+    @Override
+    public boolean isRemoveType() {
+        return removeType;
+    }
+}

+ 25 - 0
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/index/EasyUpdateRequest.java

@@ -0,0 +1,25 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
+package org.dbsyncer.connector.elasticsearch.api.index;
+
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.util.Map;
+
+public class EasyUpdateRequest extends UpdateRequest implements RemoveTypeRequest {
+
+    private boolean removeType;
+
+    public EasyUpdateRequest(String index, String type, boolean removeType, String id, Map data, XContentType xContentType) {
+        super(index, type, id);
+        doc(data, xContentType);
+        this.removeType = removeType;
+    }
+
+    @Override
+    public boolean isRemoveType() {
+        return removeType;
+    }
+}

+ 15 - 0
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/api/index/RemoveTypeRequest.java

@@ -0,0 +1,15 @@
+/**
+ * DBSyncer Copyright 2020-2024 All Rights Reserved.
+ */
+package org.dbsyncer.connector.elasticsearch.api.index;
+
+public interface RemoveTypeRequest {
+
+    /**
+     * 8.x 版本以上废弃Type
+     *
+     * @return
+     */
+    boolean isRemoveType();
+
+}

+ 2 - 2
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/cdc/ESQuartzListener.java

@@ -80,7 +80,7 @@ public final class ESQuartzListener extends AbstractQuartzListener {
         }
         point.setCommand(ConnectorConstant.OPERTION_QUERY, command.get(ConnectorConstant.OPERTION_QUERY));
         point.setCommand(ConnectorConstant.OPERTION_QUERY_FILTER, JsonUtil.objToJson(filters));
-        point.setCommand(ElasticsearchConnector.SOURCE_INDEX_NAME, command.get(ElasticsearchConnector.SOURCE_INDEX_NAME));
+        point.setCommand(ElasticsearchConnector._SOURCE_INDEX, command.get(ElasticsearchConnector._SOURCE_INDEX));
         return point;
     }
-}
+}