Selaa lähdekoodia

统一批量写入

AE86 3 vuotta sitten
vanhempi
säilyke
1abf92646c

+ 0 - 8
dbsyncer-connector/src/main/java/org/dbsyncer/connector/Connector.java

@@ -90,14 +90,6 @@ public interface Connector<M, C> {
      */
     Result writer(M connectorMapper, WriterBatchConfig config);
 
-    /**
-     * 写入目标源数据
-     *
-     * @param config
-     * @return
-     */
-    Result writer(M connectorMapper, WriterSingleConfig config);
-
     /**
      * 获取数据源同步参数
      *

+ 0 - 6
dbsyncer-connector/src/main/java/org/dbsyncer/connector/ConnectorFactory.java

@@ -137,12 +137,6 @@ public class ConnectorFactory implements DisposableBean {
         return result;
     }
 
-    public Result writer(ConnectorMapper connectorMapper, WriterSingleConfig config) {
-        Result result = getConnector(connectorMapper).writer(connectorMapper, config);
-        Assert.notNull(result, "Connector writer single result can not null");
-        return result;
-    }
-
     public Connector getConnector(ConnectorMapper connectorMapper) {
         return getConnector(connectorMapper.getConnectorType());
     }

+ 0 - 1
dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java

@@ -168,7 +168,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
         return result;
     }
 
-    @Override
     public Result writer(DatabaseConnectorMapper connectorMapper, WriterSingleConfig config) {
         String event = config.getEvent();
         List<Field> fields = config.getFields();

+ 20 - 68
dbsyncer-connector/src/main/java/org/dbsyncer/connector/es/ESConnector.java

@@ -14,7 +14,6 @@ import org.dbsyncer.connector.enums.ESFieldTypeEnum;
 import org.dbsyncer.connector.enums.FilterEnum;
 import org.dbsyncer.connector.enums.OperationEnum;
 import org.dbsyncer.connector.util.ESUtil;
-import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
@@ -175,16 +174,13 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
             throw new ConnectorException("writer data can not be empty.");
         }
 
-        Result result = new Result();
+        final Result result = new Result();
         final ESConfig cfg = connectorMapper.getConfig();
-        Field pkField = getPrimaryKeyField(config.getFields());
+        final Field pkField = getPrimaryKeyField(config.getFields());
+        final String primaryKeyName = pkField.getName();
         try {
             BulkRequest request = new BulkRequest();
-            data.forEach(row -> {
-                IndexRequest r = new IndexRequest(cfg.getIndex(), cfg.getType(), String.valueOf(row.get(pkField.getName())));
-                r.source(row, XContentType.JSON);
-                request.add(r);
-            });
+            data.forEach(row -> addRequest(request, cfg.getIndex(), cfg.getType(), config.getEvent(), String.valueOf(row.get(primaryKeyName)), row));
 
             BulkResponse response = connectorMapper.getConnection().bulk(request, RequestOptions.DEFAULT);
             RestStatus restStatus = response.status();
@@ -200,46 +196,6 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         return result;
     }
 
-    @Override
-    public Result writer(ESConnectorMapper connectorMapper, WriterSingleConfig config) {
-        Map<String, Object> data = config.getData();
-        Field pkField = getPrimaryKeyField(config.getFields());
-        String pk = String.valueOf(data.get(pkField.getName()));
-
-        if (isUpdate(config.getEvent())) {
-            return execute(connectorMapper, data, pk, (index, type, id) -> {
-                try {
-                    UpdateRequest request = new UpdateRequest(index, type, id);
-                    request.doc(data, XContentType.JSON);
-                    connectorMapper.getConnection().update(request, RequestOptions.DEFAULT);
-                } catch (ElasticsearchStatusException e) {
-                    // 数据不存在则写入
-                    if (RestStatus.NOT_FOUND.getStatus() == e.status().getStatus()) {
-                        IndexRequest r = new IndexRequest(index, type, id);
-                        r.source(data, XContentType.JSON);
-                        connectorMapper.getConnection().index(r, RequestOptions.DEFAULT);
-                        return;
-                    }
-                    throw new ConnectorException(e.getMessage());
-                }
-            });
-        }
-        if (isInsert(config.getEvent())) {
-            return execute(connectorMapper, data, pk, (index, type, id) -> {
-                IndexRequest request = new IndexRequest(index, type, id);
-                request.source(data, XContentType.JSON);
-                connectorMapper.getConnection().index(request, RequestOptions.DEFAULT);
-            });
-        }
-        if (isDelete(config.getEvent())) {
-            return execute(connectorMapper, data, pk, (index, type, id) ->
-                    connectorMapper.getConnection().delete(new DeleteRequest(index, type, id), RequestOptions.DEFAULT)
-            );
-        }
-
-        throw new ConnectorException(String.format("Unsupported event: %s", config.getEvent()));
-    }
-
     @Override
     public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
         Map<String, String> command = new HashMap<>();
@@ -262,7 +218,7 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
     @Override
     public Map<String, String> getTargetCommand(CommandConfig commandConfig) {
         Table table = commandConfig.getTable();
-        if(!CollectionUtils.isEmpty(table.getColumn())){
+        if (!CollectionUtils.isEmpty(table.getColumn())) {
             getPrimaryKeyField(table.getColumn());
         }
         return Collections.EMPTY_MAP;
@@ -323,26 +279,22 @@ public final class ESConnector extends AbstractConnector implements Connector<ES
         }
     }
 
-    private Result execute(ESConnectorMapper connectorMapper, Map<String, Object> data, String id, RequestMapper mapper) {
-        Result result = new Result();
-        final ESConfig config = connectorMapper.getConfig();
-        try {
-            mapper.apply(config.getIndex(), config.getType(), id);
-        } catch (Exception e) {
-            // 记录错误数据
-            result.getFailData().add(data);
-            result.getError().append("INDEX:").append(config.getIndex()).append(System.lineSeparator())
-                    .append("TYPE:").append(config.getType()).append(System.lineSeparator())
-                    .append("ID:").append(id).append(System.lineSeparator())
-                    .append("DATA:").append(data).append(System.lineSeparator())
-                    .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
-            logger.error("INDEX:{}, TYPE:{}, ID:{}, DATA:{}, ERROR:{}", config.getIndex(), config.getType(), id, data, e.getMessage());
+    private void addRequest(BulkRequest request, String index, String type, String event, String id, Map data) {
+        if (isUpdate(event)) {
+            UpdateRequest req = new UpdateRequest(index, type, id);
+            req.doc(data, XContentType.JSON);
+            request.add(req);
+            return;
+        }
+        if (isInsert(event)) {
+            IndexRequest req = new IndexRequest(index, type, id);
+            req.source(data, XContentType.JSON);
+            request.add(req);
+            return;
+        }
+        if (isDelete(event)) {
+            request.add(new DeleteRequest(index, type, id));
         }
-        return result;
-    }
-
-    private interface RequestMapper {
-        void apply(String index, String type, String id) throws IOException;
     }
 
     private interface FilterMapper {

+ 0 - 17
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -92,23 +92,6 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
         return result;
     }
 
-    @Override
-    public Result writer(KafkaConnectorMapper connectorMapper, WriterSingleConfig config) {
-        Map<String, Object> data = config.getData();
-        Result result = new Result();
-        final KafkaConfig cfg = connectorMapper.getConfig();
-        Field pkField = getPrimaryKeyField(config.getFields());
-        try {
-            connectorMapper.getConnection().send(cfg.getTopic(), String.valueOf(data.get(pkField.getName())), data);
-        } catch (Exception e) {
-            // 记录错误数据
-            result.getFailData().add(data);
-            result.getError().append(e.getMessage()).append(System.lineSeparator());
-            logger.error(e.getMessage());
-        }
-        return result;
-    }
-
     @Override
     public Map<String, String> getSourceCommand(CommandConfig commandConfig) {
         return Collections.EMPTY_MAP;