|
@@ -84,10 +84,11 @@ public class KafkaConnector extends AbstractConnector implements Connector<Kafka
|
|
|
|
|
|
Result result = new Result();
|
|
|
final KafkaConfig cfg = connectorMapper.getConfig();
|
|
|
- Field pkField = getPrimaryKeyField(config.getFields());
|
|
|
+ final List<Field> pkNames = getPrimaryKeys(config.getFields());
|
|
|
try {
|
|
|
String topic = cfg.getTopic();
|
|
|
- String pk = pkField.getName();
|
|
|
+ // 默认取第一个主键
|
|
|
+ final String pk = pkNames.get(0).getName();
|
|
|
data.forEach(row -> connectorMapper.getConnection().send(topic, String.valueOf(row.get(pk)), row));
|
|
|
result.addSuccessData(data);
|
|
|
} catch (Exception e) {
|