AE86 3 lat temu
rodzic
commit
39706e623e

+ 12 - 6
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/KafkaConfigChecker.java

@@ -20,17 +20,21 @@ public class KafkaConfigChecker implements ConnectorConfigChecker<KafkaConfig> {
     @Override
     public void modify(KafkaConfig connectorConfig, Map<String, String> params) {
         String bootstrapServers = params.get("bootstrapServers");
-        String keyDeserializer = params.get("keyDeserializer");
-        String valueSerializer = params.get("valueSerializer");
         Assert.hasText(bootstrapServers, "bootstrapServers is empty.");
-        Assert.hasText(keyDeserializer, "keyDeserializer is empty.");
-        Assert.hasText(valueSerializer, "valueSerializer is empty.");
 
         String groupId = params.get("groupId");
+        String consumerKeyDeserializer = params.get("consumerKeyDeserializer");
+        String consumerValueDeserializer = params.get("consumerValueDeserializer");
+        Assert.hasText(consumerKeyDeserializer, "consumerKeyDeserializer is empty.");
+        Assert.hasText(consumerValueDeserializer, "consumerValueDeserializer is empty.");
         boolean enableAutoCommit = BooleanUtil.toBoolean(params.get("enableAutoCommit"));
         long autoCommitIntervalMs = NumberUtil.toLong(params.get("autoCommitIntervalMs"));
         long maxPartitionFetchBytes = NumberUtil.toLong(params.get("maxPartitionFetchBytes"));
 
+        String producerKeySerializer = params.get("producerKeySerializer");
+        String producerValueSerializer = params.get("producerValueSerializer");
+        Assert.hasText(producerKeySerializer, "producerKeySerializer is empty.");
+        Assert.hasText(producerValueSerializer, "producerValueSerializer is empty.");
         long bufferMemory = NumberUtil.toLong(params.get("bufferMemory"));
         long batchSize = NumberUtil.toLong(params.get("batchSize"));
         long lingerMs = NumberUtil.toLong(params.get("lingerMs"));
@@ -40,14 +44,16 @@ public class KafkaConfigChecker implements ConnectorConfigChecker<KafkaConfig> {
         long maxRequestSize = NumberUtil.toLong(params.get("maxRequestSize"));
 
         connectorConfig.setBootstrapServers(bootstrapServers);
-        connectorConfig.setKeyDeserializer(keyDeserializer);
-        connectorConfig.setValueSerializer(valueSerializer);
 
         connectorConfig.setGroupId(groupId);
+        connectorConfig.setConsumerKeyDeserializer(consumerKeyDeserializer);
+        connectorConfig.setConsumerValueDeserializer(consumerValueDeserializer);
         connectorConfig.setEnableAutoCommit(enableAutoCommit);
         connectorConfig.setAutoCommitIntervalMs(autoCommitIntervalMs);
         connectorConfig.setMaxPartitionFetchBytes(maxPartitionFetchBytes);
 
+        connectorConfig.setProducerKeySerializer(producerKeySerializer);
+        connectorConfig.setProducerValueSerializer(producerValueSerializer);
         connectorConfig.setBufferMemory(bufferMemory);
         connectorConfig.setBatchSize(batchSize);
         connectorConfig.setLingerMs(lingerMs);

+ 32 - 14
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/KafkaConfig.java

@@ -9,16 +9,18 @@ package org.dbsyncer.connector.config;
 public class KafkaConfig extends ConnectorConfig {
 
     private String bootstrapServers;
-    private String keyDeserializer;
-    private String valueSerializer;
 
     // 消费者
     private String groupId;
+    private String consumerKeyDeserializer;
+    private String consumerValueDeserializer;
     private boolean enableAutoCommit;
     private long autoCommitIntervalMs;
     private long maxPartitionFetchBytes;
 
     // 生产者
+    private String producerKeySerializer;
+    private String producerValueSerializer;
     private long bufferMemory;
     private long batchSize;
     private long lingerMs;
@@ -35,28 +37,28 @@ public class KafkaConfig extends ConnectorConfig {
         this.bootstrapServers = bootstrapServers;
     }
 
-    public String getKeyDeserializer() {
-        return keyDeserializer;
+    public String getGroupId() {
+        return groupId;
     }
 
-    public void setKeyDeserializer(String keyDeserializer) {
-        this.keyDeserializer = keyDeserializer;
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
     }
 
-    public String getValueSerializer() {
-        return valueSerializer;
+    public String getConsumerKeyDeserializer() {
+        return consumerKeyDeserializer;
     }
 
-    public void setValueSerializer(String valueSerializer) {
-        this.valueSerializer = valueSerializer;
+    public void setConsumerKeyDeserializer(String consumerKeyDeserializer) {
+        this.consumerKeyDeserializer = consumerKeyDeserializer;
     }
 
-    public String getGroupId() {
-        return groupId;
+    public String getConsumerValueDeserializer() {
+        return consumerValueDeserializer;
     }
 
-    public void setGroupId(String groupId) {
-        this.groupId = groupId;
+    public void setConsumerValueDeserializer(String consumerValueDeserializer) {
+        this.consumerValueDeserializer = consumerValueDeserializer;
     }
 
     public boolean isEnableAutoCommit() {
@@ -83,6 +85,22 @@ public class KafkaConfig extends ConnectorConfig {
         this.maxPartitionFetchBytes = maxPartitionFetchBytes;
     }
 
+    public String getProducerKeySerializer() {
+        return producerKeySerializer;
+    }
+
+    public void setProducerKeySerializer(String producerKeySerializer) {
+        this.producerKeySerializer = producerKeySerializer;
+    }
+
+    public String getProducerValueSerializer() {
+        return producerValueSerializer;
+    }
+
+    public void setProducerValueSerializer(String producerValueSerializer) {
+        this.producerValueSerializer = producerValueSerializer;
+    }
+
     public long getBufferMemory() {
         return bufferMemory;
     }

+ 118 - 0
dbsyncer-listener/src/main/test/KafkaClientTest.java

@@ -0,0 +1,118 @@
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author AE86
+ * @version 1.0.0
+ * @date 2021/11/23 23:13
+ */
+public class KafkaClientTest {
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private KafkaConsumer<String, String> consumer;
+    private KafkaProducer<String, String> producer;
+
+    private String server = "192.168.100.100:9092";
+    private String cKeyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
+    private String cValueSerializer = "org.apache.kafka.common.serialization.StringDeserializer";
+    private String pKeyDeserializer = "org.apache.kafka.common.serialization.StringSerializer";
+    private String pValueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
+    private String topic = "mytopic";
+
+    @Before
+    public void init() {
+        // Consumer API
+        {
+            Properties props = new Properties();
+            props.put("bootstrap.servers", server);
+            props.put("group.id", "test");
+            props.put("enable.auto.commit", true);
+            props.put("auto.commit.interval.ms", 1000);
+            props.put("session.timeout.ms", 10000);
+            props.put("max.partition.fetch.bytes", 1048576);
+            props.put("key.deserializer", cKeyDeserializer);
+            props.put("value.deserializer", cValueSerializer);
+            consumer = new KafkaConsumer<>(props);
+            consumer.subscribe(Arrays.asList(topic));
+        }
+
+        // Producer API
+        {
+            Properties props = new Properties();
+            props.put("bootstrap.servers", server);
+            props.put("buffer.memory", 33554432);
+            props.put("batch.size", 32768);
+            props.put("linger.ms", 10);
+            props.put("acks", "1");
+            props.put("retries", 3);
+            props.put("max.block.ms", 60000);
+            props.put("max.request.size", 1048576);
+            props.put("key.serializer", pKeyDeserializer);
+            props.put("value.serializer", pValueSerializer);
+            producer = new KafkaProducer<>(props);
+        }
+    }
+
+    @After
+    public void close() {
+        if (null != producer) {
+            producer.close();
+        }
+        if (null != consumer) {
+            consumer.close();
+        }
+    }
+
+    @Test
+    public void testProducerAndConsumer() throws Exception {
+        logger.info("test begin");
+        new Producer().start();
+//        new Consumer().start();
+        TimeUnit.SECONDS.sleep(60);
+        logger.info("test end");
+    }
+
+    class Consumer extends Thread {
+
+        public Consumer() {
+            setName("Consumer-thread");
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                ConsumerRecords<String, String> records = consumer.poll(100);
+                for (ConsumerRecord<String, String> record : records) {
+                    logger.info("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
+                }
+            }
+        }
+    }
+
+    class Producer extends Thread {
+
+        public Producer() {
+            setName("Producer-thread");
+        }
+
+        @Override
+        public void run() {
+            for (int i = 0; i < 100; i++) {
+                producer.send(new ProducerRecord<>(topic, Integer.toString(i), Integer.toString(i)));
+            }
+        }
+    }
+
+}

+ 26 - 22
dbsyncer-web/src/main/resources/public/connector/addKafka.html

@@ -9,18 +9,6 @@
             <textarea name="bootstrapServers" class="form-control" maxlength="1024" dbsyncer-valid="require" rows="2" th:text="${connector?.config?.bootstrapServers}?:'127.0.0.1:9092'"></textarea>
         </div>
     </div>
-    <div class="form-group">
-        <label class="col-sm-2 control-label">key序列化<strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-10">
-            <input class="form-control" name="keyDeserializer" type="text" dbsyncer-valid="require" th:value="${connector?.config?.keyDeserializer?:'org.apache.kafka.common.serialization.StringSerializer'}"/>
-        </div>
-    </div>
-    <div class="form-group">
-        <label class="col-sm-2 control-label">value序列化<strong class="driverVerifcateRequired">*</strong></label>
-        <div class="col-sm-10">
-            <input class="form-control" name="valueSerializer" type="text" dbsyncer-valid="require" th:value="${connector?.config?.valueSerializer?:'org.apache.kafka.common.serialization.StringSerializer'}"/>
-        </div>
-    </div>
 
     <!-- 消费者配置 -->
     <div class="row">
@@ -37,6 +25,18 @@
                             <input class="form-control" name="groupId" type="text" maxlength="64" th:value="${connector?.config?.groupId}"/>
                         </div>
                     </div>
+                    <div class="form-group">
+                        <label class="col-sm-3 control-label">key序列化<strong class="driverVerifcateRequired">*</strong></label>
+                        <div class="col-sm-9">
+                            <input class="form-control" name="consumerKeyDeserializer" type="text" dbsyncer-valid="require" th:value="${connector?.config?.consumerKeyDeserializer?:'org.apache.kafka.common.serialization.StringDeserializer'}"/>
+                        </div>
+                    </div>
+                    <div class="form-group">
+                        <label class="col-sm-3 control-label">value序列化<strong class="driverVerifcateRequired">*</strong></label>
+                        <div class="col-sm-9">
+                            <input class="form-control" name="consumerValueDeserializer" type="text" dbsyncer-valid="require" th:value="${connector?.config?.consumerValueDeserializer?:'org.apache.kafka.common.serialization.StringDeserializer'}"/>
+                        </div>
+                    </div>
                     <div class="form-group">
                         <label class="col-sm-3 control-label">enable.auto.commit <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="如果为true,消费者的偏移量将在后台定期提交"></i></label>
                         <div class="col-sm-3">
@@ -87,6 +87,18 @@
                     <h3 class="panel-title">生产者配置</h3>
                 </div>
                 <div class="panel-body">
+                    <div class="form-group">
+                        <label class="col-sm-3 control-label">key序列化<strong class="driverVerifcateRequired">*</strong></label>
+                        <div class="col-sm-9">
+                            <input class="form-control" name="producerKeySerializer" type="text" dbsyncer-valid="require" th:value="${connector?.config?.producerKeySerializer?:'org.apache.kafka.common.serialization.StringSerializer'}"/>
+                        </div>
+                    </div>
+                    <div class="form-group">
+                        <label class="col-sm-3 control-label">value序列化<strong class="driverVerifcateRequired">*</strong></label>
+                        <div class="col-sm-9">
+                            <input class="form-control" name="producerValueSerializer" type="text" dbsyncer-valid="require" th:value="${connector?.config?.producerValueSerializer?:'org.apache.kafka.common.serialization.StringSerializer'}"/>
+                        </div>
+                    </div>
                     <div class="form-group">
                         <label class="col-sm-3 control-label">buffer.memory <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="Kafka的客户端发送数据到服务器,不是来一条就发一条,而是经过缓冲,通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker。buffer.memory用来约束KafkaProducer能够使用的内存缓冲的大小,默认值32MB。
 如果buffer.memory设置的太小,可能导致的问题是:消息快速的写入内存缓冲里,但Sender线程来不及把Request发送到Kafka服务器,会造成内存缓冲很快就被写满。而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了。
@@ -140,16 +152,6 @@ linger.ms配合batch.size一起来设置,可避免一个Batch迟迟凑不满
                                 <option value="1" th:selected="${connector?.config?.retries eq '1'}">1</option>
                             </select>
                         </div>
-                        <label class="col-sm-3 control-label">retries.backoff.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="失败重试间隔"></i></label>
-                        <div class="col-sm-3">
-                            <select class="form-control select-control" name="retriesBackoffMs">
-                                <option value="100" th:selected="${connector?.config?.retriesBackoffMs eq '100'}">0.1秒</option>
-                                <option value="300" th:selected="${connector?.config?.retriesBackoffMs eq '300'}">0.3秒</option>
-                                <option value="500" th:selected="${connector?.config?.retriesBackoffMs eq '500'}">0.5秒</option>
-                            </select>
-                        </div>
-                    </div>
-                    <div class="form-group">
                         <label class="col-sm-3 control-label">max.block.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。"></i></label>
                         <div class="col-sm-3">
                             <select class="form-control select-control" name="maxBlockMs">
@@ -158,6 +160,8 @@ linger.ms配合batch.size一起来设置,可避免一个Batch迟迟凑不满
                                 <option value="15000" th:selected="${connector?.config?.maxBlockMs eq '15000'}">15秒</option>
                             </select>
                         </div>
+                    </div>
+                    <div class="form-group">
                         <label class="col-sm-3 control-label">max.request.size <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="每次发送给Kafka服务器请求消息的最大大小,与max.partition.fetch.bytes保持一致"></i></label>
                         <div class="col-sm-3">
                             <select id="maxRequestSizeSelect" class="form-control select-control" name="maxRequestSize">