Sfoglia il codice sorgente

fix bug

Signed-off-by: AE86 <836391306@qq.com>
AE86 1 anno fa
parent
commit
98c1ae2706

+ 13 - 8
dbsyncer-connector/dbsyncer-connector-elasticsearch/src/main/java/org/dbsyncer/connector/elasticsearch/ElasticsearchConnector.java

@@ -86,7 +86,9 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
 
     private final String TYPE = "Elasticsearch";
 
-    public static final String INDEX_NAME = "_index";
+    public static final String SOURCE_INDEX_NAME = "_source_index";
+
+    public static final String TARGET_INDEX_NAME = "_target_index";
 
     public static final String INDEX_TYPE = "_doc";
 
@@ -210,7 +212,7 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
             }
             builder.from(0);
             builder.size(0);
-            SearchRequest request = new SearchRequest(new String[]{command.get(INDEX_NAME)}, builder);
+            SearchRequest request = new SearchRequest(new String[]{command.get(SOURCE_INDEX_NAME)}, builder);
             SearchResponse response = connectorInstance.getConnection().searchWithVersion(request, RequestOptions.DEFAULT);
             return response.getHits().getTotalHits().value;
         } catch (IOException e) {
@@ -237,7 +239,7 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
         }
 
         try {
-            SearchRequest rq = new SearchRequest(new String[]{config.getCommand().get(INDEX_NAME)}, builder);
+            SearchRequest rq = new SearchRequest(new String[]{config.getCommand().get(SOURCE_INDEX_NAME)}, builder);
             SearchResponse searchResponse = connectorInstance.getConnection().searchWithVersion(rq, RequestOptions.DEFAULT);
             SearchHits hits = searchResponse.getHits();
             SearchHit[] searchHits = hits.getHits();
@@ -266,9 +268,9 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
             BulkRequest request = new BulkRequest();
             // 默认取第一个主键
             final String pk = pkFields.get(0).getName();
-            String indexName = config.getCommand().get(INDEX_NAME);
+            String indexName = config.getCommand().get(TARGET_INDEX_NAME);
             // 6.x 版本
-            final String type = Version.V_7_0_0.after(connectorInstance.getVersion()) ? INDEX_TYPE : indexName;
+            final String type = Version.V_7_0_0.after(connectorInstance.getVersion()) ? INDEX_TYPE : null;
             data.forEach(row -> addRequest(request, indexName, type, config.getEvent(), String.valueOf(row.get(pk)), row));
 
             BulkResponse response = connectorInstance.getConnection().bulkWithVersion(request, RequestOptions.DEFAULT);
@@ -291,7 +293,7 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
         Map<String, String> command = new HashMap<>();
         // 查询字段
         Table table = commandConfig.getTable();
-        command.put(INDEX_NAME, table.getName());
+        command.put(SOURCE_INDEX_NAME, table.getName());
         List<Field> column = table.getColumn();
         if (!CollectionUtils.isEmpty(column)) {
             List<String> fieldNames = column.stream().map(c -> c.getName()).collect(Collectors.toList());
@@ -308,8 +310,11 @@ public final class ElasticsearchConnector extends AbstractConnector implements C
 
     @Override
     public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
-        PrimaryKeyUtil.findTablePrimaryKeys(commandConfig.getTable());
-        return Collections.EMPTY_MAP;
+        Table table = commandConfig.getTable();
+        PrimaryKeyUtil.findTablePrimaryKeys(table);
+        Map<String, String> command = new HashMap<>();
+        command.put(TARGET_INDEX_NAME, table.getName());
+        return command;
     }
 
     @Override