|
@@ -67,6 +67,7 @@ import java.sql.Types;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
+import java.util.Iterator;
|
|
import java.util.LinkedHashMap;
|
|
import java.util.LinkedHashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
@@ -188,8 +189,19 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
|
|
// 6.x 版本
|
|
// 6.x 版本
|
|
if (Version.V_7_0_0.after(connectorInstance.getVersion())) {
|
|
if (Version.V_7_0_0.after(connectorInstance.getVersion())) {
|
|
Map<String, Object> sourceMap = XContentHelper.convertToMap(mappingMetaData.source().compressedReference(), true, null).v2();
|
|
Map<String, Object> sourceMap = XContentHelper.convertToMap(mappingMetaData.source().compressedReference(), true, null).v2();
|
|
- parseProperties(fields, (Map) sourceMap.get(INDEX_TYPE));
|
|
|
|
- return new MetaInfo().setColumn(fields);
|
|
|
|
|
|
+ if (CollectionUtils.isEmpty(sourceMap)) {
|
|
|
|
+ throw new ElasticsearchException("未获取到索引配置");
|
|
|
|
+ }
|
|
|
|
+ Iterator<String> iterator = sourceMap.keySet().iterator();
|
|
|
|
+ String indexType = null;
|
|
|
|
+ if (iterator.hasNext()) {
|
|
|
|
+ indexType = iterator.next();
|
|
|
|
+ parseProperties(fields, (Map) sourceMap.get(indexType));
|
|
|
|
+ }
|
|
|
|
+ if (StringUtil.isBlank(indexType)) {
|
|
|
|
+ throw new ElasticsearchException("索引type为空");
|
|
|
|
+ }
|
|
|
|
+ return new MetaInfo().setColumn(fields).setIndexType(indexType);
|
|
}
|
|
}
|
|
|
|
|
|
// 7.x 版本以上
|
|
// 7.x 版本以上
|
|
@@ -265,13 +277,11 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
|
|
final Result result = new Result();
|
|
final Result result = new Result();
|
|
final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
|
|
final List<Field> pkFields = PrimaryKeyUtil.findConfigPrimaryKeyFields(config);
|
|
try {
|
|
try {
|
|
- BulkRequest request = new BulkRequest();
|
|
|
|
- // 默认取第一个主键
|
|
|
|
|
|
+ final BulkRequest request = new BulkRequest();
|
|
final String pk = pkFields.get(0).getName();
|
|
final String pk = pkFields.get(0).getName();
|
|
- String indexName = config.getCommand().get(TARGET_INDEX_NAME);
|
|
|
|
- // 6.x 版本
|
|
|
|
- final String type = Version.V_7_0_0.after(connectorInstance.getVersion()) ? INDEX_TYPE : null;
|
|
|
|
- data.forEach(row -> addRequest(request, indexName, type, config.getEvent(), String.valueOf(row.get(pk)), row));
|
|
|
|
|
|
+ final String indexName = config.getCommand().get(TARGET_INDEX_NAME);
|
|
|
|
+ final String indexType = config.getCommand().get(INDEX_TYPE);
|
|
|
|
+ data.forEach(row -> addRequest(request, indexName, indexType, config.getEvent(), String.valueOf(row.get(pk)), row));
|
|
|
|
|
|
BulkResponse response = connectorInstance.getConnection().bulkWithVersion(request, RequestOptions.DEFAULT);
|
|
BulkResponse response = connectorInstance.getConnection().bulkWithVersion(request, RequestOptions.DEFAULT);
|
|
RestStatus restStatus = response.status();
|
|
RestStatus restStatus = response.status();
|
|
@@ -314,6 +324,7 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
|
|
PrimaryKeyUtil.findTablePrimaryKeys(table);
|
|
PrimaryKeyUtil.findTablePrimaryKeys(table);
|
|
Map<String, String> command = new HashMap<>();
|
|
Map<String, String> command = new HashMap<>();
|
|
command.put(TARGET_INDEX_NAME, table.getName());
|
|
command.put(TARGET_INDEX_NAME, table.getName());
|
|
|
|
+ command.put(INDEX_TYPE, table.getIndexType());
|
|
return command;
|
|
return command;
|
|
}
|
|
}
|
|
|
|
|