|
@@ -1,20 +1,27 @@
|
|
|
-package org.dbsyncer.connector.es;
|
|
|
+/**
|
|
|
+ * DBSyncer Copyright 2020-2023 All Rights Reserved.
|
|
|
+ */
|
|
|
+package org.dbsyncer.connector.elasticsearch;
|
|
|
|
|
|
import org.dbsyncer.common.model.Result;
|
|
|
import org.dbsyncer.common.util.CollectionUtils;
|
|
|
import org.dbsyncer.common.util.JsonUtil;
|
|
|
import org.dbsyncer.common.util.StringUtil;
|
|
|
-import org.dbsyncer.connector.ConnectorException;
|
|
|
-import org.dbsyncer.connector.config.ESConfig;
|
|
|
-import org.dbsyncer.connector.enums.ESFieldTypeEnum;
|
|
|
-import org.dbsyncer.connector.enums.FilterEnum;
|
|
|
-import org.dbsyncer.connector.util.ESUtil;
|
|
|
+import org.dbsyncer.connector.elasticsearch.cdc.ESQuartzListener;
|
|
|
+import org.dbsyncer.connector.elasticsearch.config.ESConfig;
|
|
|
+import org.dbsyncer.connector.elasticsearch.enums.ESFieldTypeEnum;
|
|
|
+import org.dbsyncer.connector.elasticsearch.schema.ESDateValueMapper;
|
|
|
+import org.dbsyncer.connector.elasticsearch.schema.ESOtherValueMapper;
|
|
|
+import org.dbsyncer.connector.elasticsearch.util.ESUtil;
|
|
|
+import org.dbsyncer.connector.elasticsearch.validator.ESConfigValidator;
|
|
|
import org.dbsyncer.sdk.config.CommandConfig;
|
|
|
import org.dbsyncer.sdk.config.ReaderConfig;
|
|
|
import org.dbsyncer.sdk.config.WriterBatchConfig;
|
|
|
import org.dbsyncer.sdk.connector.AbstractConnector;
|
|
|
+import org.dbsyncer.sdk.connector.ConfigValidator;
|
|
|
import org.dbsyncer.sdk.connector.ConnectorInstance;
|
|
|
import org.dbsyncer.sdk.constant.ConnectorConstant;
|
|
|
+import org.dbsyncer.sdk.enums.FilterEnum;
|
|
|
import org.dbsyncer.sdk.enums.ListenerTypeEnum;
|
|
|
import org.dbsyncer.sdk.enums.OperationEnum;
|
|
|
import org.dbsyncer.sdk.listener.Listener;
|
|
@@ -24,6 +31,7 @@ import org.dbsyncer.sdk.model.MetaInfo;
|
|
|
import org.dbsyncer.sdk.model.Table;
|
|
|
import org.dbsyncer.sdk.spi.ConnectorService;
|
|
|
import org.dbsyncer.sdk.util.PrimaryKeyUtil;
|
|
|
+import org.elasticsearch.ElasticsearchException;
|
|
|
import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
@@ -49,7 +57,6 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
import org.elasticsearch.search.sort.SortOrder;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
import java.io.IOException;
|
|
@@ -62,8 +69,14 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
-@Component
|
|
|
-public final class ESConnector extends AbstractConnector implements ConnectorService<ESConnectorInstance, ESConfig> {
|
|
|
+/**
|
|
|
+ * ES连接器实现
|
|
|
+ *
|
|
|
+ * @Author AE86
|
|
|
+ * @Version 1.0.0
|
|
|
+ * @Date 2023-11-25 23:10
|
|
|
+ */
|
|
|
+public final class ElasticsearchConnector extends AbstractConnector implements ConnectorService<ESConnectorInstance, ESConfig> {
|
|
|
|
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
@@ -71,6 +84,8 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
|
|
|
|
|
|
private final Map<String, FilterMapper> filters = new LinkedHashMap<>();
|
|
|
|
|
|
+ private final ESConfigValidator configValidator = new ESConfigValidator();
|
|
|
+
|
|
|
@PostConstruct
|
|
|
private void init() {
|
|
|
VALUE_MAPPERS.put(Types.DATE, new ESDateValueMapper());
|
|
@@ -110,6 +125,11 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
|
|
|
return new ESConnectorInstance(config);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public ConfigValidator getConfigValidator() {
|
|
|
+ return configValidator;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void disconnect(ESConnectorInstance connectorInstance) {
|
|
|
connectorInstance.close();
|
|
@@ -122,7 +142,7 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
|
|
|
return client.ping(RequestOptions.DEFAULT);
|
|
|
} catch (IOException e) {
|
|
|
logger.error(e.getMessage());
|
|
|
- throw new ConnectorException(e.getMessage());
|
|
|
+ throw new ElasticsearchException(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -151,7 +171,7 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
|
|
|
return tables;
|
|
|
} catch (IOException e) {
|
|
|
logger.error(e.getMessage());
|
|
|
- throw new ConnectorException(e);
|
|
|
+ throw new ElasticsearchException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -175,7 +195,7 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
|
|
|
return new MetaInfo().setColumn(fields);
|
|
|
} catch (IOException e) {
|
|
|
logger.error(e.getMessage());
|
|
|
- throw new ConnectorException(e);
|
|
|
+ throw new ElasticsearchException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -196,7 +216,7 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
|
|
|
return response.getHits().getTotalHits().value;
|
|
|
} catch (IOException e) {
|
|
|
logger.error(e.getMessage());
|
|
|
- throw new ConnectorException(e);
|
|
|
+ throw new ElasticsearchException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -230,7 +250,7 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
|
|
|
return new Result(list);
|
|
|
} catch (IOException e) {
|
|
|
logger.error(e.getMessage());
|
|
|
- throw new ConnectorException(e.getMessage());
|
|
|
+ throw new ElasticsearchException(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -239,7 +259,7 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
|
|
|
List<Map> data = config.getData();
|
|
|
if (CollectionUtils.isEmpty(data) || CollectionUtils.isEmpty(config.getFields())) {
|
|
|
logger.error("writer data can not be empty.");
|
|
|
- throw new ConnectorException("writer data can not be empty.");
|
|
|
+ throw new ElasticsearchException("writer data can not be empty.");
|
|
|
}
|
|
|
|
|
|
final Result result = new Result();
|
|
@@ -254,7 +274,7 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
|
|
|
BulkResponse response = connectorInstance.getConnection().bulkWithVersion(request, RequestOptions.DEFAULT);
|
|
|
RestStatus restStatus = response.status();
|
|
|
if (restStatus.getStatus() != RestStatus.OK.getStatus()) {
|
|
|
- throw new ConnectorException(String.format("error code:%s", restStatus.getStatus()));
|
|
|
+ throw new ElasticsearchException(String.format("error code:%s", restStatus.getStatus()));
|
|
|
}
|
|
|
result.addSuccessData(data);
|
|
|
} catch (Exception e) {
|
|
@@ -302,7 +322,7 @@ public final class ESConnector extends AbstractConnector implements ConnectorSer
|
|
|
private void parseProperties(List<Field> fields, Map<String, Object> sourceMap) {
|
|
|
Map<String, Object> properties = (Map<String, Object>) sourceMap.get(ESUtil.PROPERTIES);
|
|
|
if (CollectionUtils.isEmpty(properties)) {
|
|
|
- throw new ConnectorException("查询字段不能为空.");
|
|
|
+ throw new ElasticsearchException("查询字段不能为空.");
|
|
|
}
|
|
|
properties.forEach((fieldName, c) -> {
|
|
|
Map fieldDesc = (Map) c;
|