Browse Source

!146 merge
Merge pull request !146 from AE86/V_1.0.0_RC

AE86 1 year ago
parent
commit
9c8c0d97a1

+ 5 - 5
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -216,7 +216,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             BulkRequest request = new BulkRequest();
             // 默认取第一个主键
             final String pk = pkFields.get(0).getName();
-            data.forEach(row -> addRequest(request, cfg.getIndex(), config.getEvent(), String.valueOf(row.get(pk)), row));
+            data.forEach(row -> addRequest(request, cfg.getIndex(), config.getTableName(), config.getEvent(), String.valueOf(row.get(pk)), row));
 
             BulkResponse response = connectorMapper.getConnection().bulk(request, RequestOptions.DEFAULT);
             RestStatus restStatus = response.status();
@@ -325,21 +325,21 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         }
     }
 
-    private void addRequest(BulkRequest request, String index, String event, String id, Map data) {
+    private void addRequest(BulkRequest request, String index, String type, String event, String id, Map data) {
         if (isUpdate(event)) {
-            UpdateRequest req = new UpdateRequest(index, id);
+            UpdateRequest req = new UpdateRequest(index, type, id);
             req.doc(data, XContentType.JSON);
             request.add(req);
             return;
         }
         if (isInsert(event)) {
-            IndexRequest req = new IndexRequest(index);
+            IndexRequest req = new IndexRequest(index, type, id);
             req.source(data, XContentType.JSON);
             request.add(req);
             return;
         }
         if (isDelete(event)) {
-            request.add(new DeleteRequest(index, id));
+            request.add(new DeleteRequest(index, type, id));
         }
     }