Ver Fonte

fix config

AE86 há 3 anos atrás
pai
commit
941eca6344

+ 10 - 15
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/KafkaConfigChecker.java

@@ -1,7 +1,6 @@
 package org.dbsyncer.biz.checker.impl.connector;
 
 import org.dbsyncer.biz.checker.ConnectorConfigChecker;
-import org.dbsyncer.common.util.BooleanUtil;
 import org.dbsyncer.common.util.NumberUtil;
 import org.dbsyncer.connector.config.KafkaConfig;
 import org.springframework.stereotype.Component;
@@ -27,29 +26,26 @@ public class KafkaConfigChecker implements ConnectorConfigChecker<KafkaConfig> {
         String consumerValueDeserializer = params.get("consumerValueDeserializer");
         Assert.hasText(consumerKeyDeserializer, "consumerKeyDeserializer is empty.");
         Assert.hasText(consumerValueDeserializer, "consumerValueDeserializer is empty.");
-        boolean enableAutoCommit = BooleanUtil.toBoolean(params.get("enableAutoCommit"));
-        long autoCommitIntervalMs = NumberUtil.toLong(params.get("autoCommitIntervalMs"));
-        long maxPartitionFetchBytes = NumberUtil.toLong(params.get("maxPartitionFetchBytes"));
+        int sessionTimeoutMs = NumberUtil.toInt(params.get("sessionTimeoutMs"));
+        int maxPartitionFetchBytes = NumberUtil.toInt(params.get("maxPartitionFetchBytes"));
 
         String producerKeySerializer = params.get("producerKeySerializer");
         String producerValueSerializer = params.get("producerValueSerializer");
+        String acks = params.get("acks");
         Assert.hasText(producerKeySerializer, "producerKeySerializer is empty.");
         Assert.hasText(producerValueSerializer, "producerValueSerializer is empty.");
-        long bufferMemory = NumberUtil.toLong(params.get("bufferMemory"));
-        long batchSize = NumberUtil.toLong(params.get("batchSize"));
-        long lingerMs = NumberUtil.toLong(params.get("lingerMs"));
-        long maxBlockMs = NumberUtil.toLong(params.get("maxBlockMs"));
-        long retries = NumberUtil.toLong(params.get("retries"));
-        long retriesBackoffMs = NumberUtil.toLong(params.get("retriesBackoffMs"));
-        long maxRequestSize = NumberUtil.toLong(params.get("maxRequestSize"));
+        int bufferMemory = NumberUtil.toInt(params.get("bufferMemory"));
+        int batchSize = NumberUtil.toInt(params.get("batchSize"));
+        int lingerMs = NumberUtil.toInt(params.get("lingerMs"));
+        int retries = NumberUtil.toInt(params.get("retries"));
+        int maxRequestSize = NumberUtil.toInt(params.get("maxRequestSize"));
 
         connectorConfig.setBootstrapServers(bootstrapServers);
 
         connectorConfig.setGroupId(groupId);
         connectorConfig.setConsumerKeyDeserializer(consumerKeyDeserializer);
         connectorConfig.setConsumerValueDeserializer(consumerValueDeserializer);
-        connectorConfig.setEnableAutoCommit(enableAutoCommit);
-        connectorConfig.setAutoCommitIntervalMs(autoCommitIntervalMs);
+        connectorConfig.setSessionTimeoutMs(sessionTimeoutMs);
         connectorConfig.setMaxPartitionFetchBytes(maxPartitionFetchBytes);
 
         connectorConfig.setProducerKeySerializer(producerKeySerializer);
@@ -57,9 +53,8 @@ public class KafkaConfigChecker implements ConnectorConfigChecker<KafkaConfig> {
         connectorConfig.setBufferMemory(bufferMemory);
         connectorConfig.setBatchSize(batchSize);
         connectorConfig.setLingerMs(lingerMs);
-        connectorConfig.setMaxBlockMs(maxBlockMs);
+        connectorConfig.setAcks(acks);
         connectorConfig.setRetries(retries);
-        connectorConfig.setRetriesBackoffMs(retriesBackoffMs);
         connectorConfig.setMaxRequestSize(maxRequestSize);
     }
 

+ 28 - 46
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/KafkaConfig.java

@@ -14,20 +14,18 @@ public class KafkaConfig extends ConnectorConfig {
     private String groupId;
     private String consumerKeyDeserializer;
     private String consumerValueDeserializer;
-    private boolean enableAutoCommit;
-    private long autoCommitIntervalMs;
-    private long maxPartitionFetchBytes;
+    private int sessionTimeoutMs;
+    private int maxPartitionFetchBytes;
 
     // 生产者
     private String producerKeySerializer;
     private String producerValueSerializer;
-    private long bufferMemory;
-    private long batchSize;
-    private long lingerMs;
-    private long maxBlockMs;
-    private long retries;
-    private long retriesBackoffMs;
-    private long maxRequestSize;
+    private int bufferMemory;
+    private int batchSize;
+    private int lingerMs;
+    private String acks;
+    private int retries;
+    private int maxRequestSize;
 
     public String getBootstrapServers() {
         return bootstrapServers;
@@ -61,27 +59,19 @@ public class KafkaConfig extends ConnectorConfig {
         this.consumerValueDeserializer = consumerValueDeserializer;
     }
 
-    public boolean isEnableAutoCommit() {
-        return enableAutoCommit;
+    public int getSessionTimeoutMs() {
+        return sessionTimeoutMs;
     }
 
-    public void setEnableAutoCommit(boolean enableAutoCommit) {
-        this.enableAutoCommit = enableAutoCommit;
+    public void setSessionTimeoutMs(int sessionTimeoutMs) {
+        this.sessionTimeoutMs = sessionTimeoutMs;
     }
 
-    public long getAutoCommitIntervalMs() {
-        return autoCommitIntervalMs;
-    }
-
-    public void setAutoCommitIntervalMs(long autoCommitIntervalMs) {
-        this.autoCommitIntervalMs = autoCommitIntervalMs;
-    }
-
-    public long getMaxPartitionFetchBytes() {
+    public int getMaxPartitionFetchBytes() {
         return maxPartitionFetchBytes;
     }
 
-    public void setMaxPartitionFetchBytes(long maxPartitionFetchBytes) {
+    public void setMaxPartitionFetchBytes(int maxPartitionFetchBytes) {
         this.maxPartitionFetchBytes = maxPartitionFetchBytes;
     }
 
@@ -101,59 +91,51 @@ public class KafkaConfig extends ConnectorConfig {
         this.producerValueSerializer = producerValueSerializer;
     }
 
-    public long getBufferMemory() {
+    public int getBufferMemory() {
         return bufferMemory;
     }
 
-    public void setBufferMemory(long bufferMemory) {
+    public void setBufferMemory(int bufferMemory) {
         this.bufferMemory = bufferMemory;
     }
 
-    public long getBatchSize() {
+    public int getBatchSize() {
         return batchSize;
     }
 
-    public void setBatchSize(long batchSize) {
+    public void setBatchSize(int batchSize) {
         this.batchSize = batchSize;
     }
 
-    public long getLingerMs() {
+    public int getLingerMs() {
         return lingerMs;
     }
 
-    public void setLingerMs(long lingerMs) {
+    public void setLingerMs(int lingerMs) {
         this.lingerMs = lingerMs;
     }
 
-    public long getMaxBlockMs() {
-        return maxBlockMs;
+    public String getAcks() {
+        return acks;
     }
 
-    public void setMaxBlockMs(long maxBlockMs) {
-        this.maxBlockMs = maxBlockMs;
+    public void setAcks(String acks) {
+        this.acks = acks;
     }
 
-    public long getRetries() {
+    public int getRetries() {
         return retries;
     }
 
-    public void setRetries(long retries) {
+    public void setRetries(int retries) {
         this.retries = retries;
     }
 
-    public long getRetriesBackoffMs() {
-        return retriesBackoffMs;
-    }
-
-    public void setRetriesBackoffMs(long retriesBackoffMs) {
-        this.retriesBackoffMs = retriesBackoffMs;
-    }
-
-    public long getMaxRequestSize() {
+    public int getMaxRequestSize() {
         return maxRequestSize;
     }
 
-    public void setMaxRequestSize(long maxRequestSize) {
+    public void setMaxRequestSize(int maxRequestSize) {
         this.maxRequestSize = maxRequestSize;
     }
 }

+ 58 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaClient.java

@@ -1,7 +1,65 @@
 package org.dbsyncer.connector.kafka;
 
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.PartitionInfo;
+import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.config.Table;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 /**
  * // TODO implements Producer<?, ?>, Consumer<?>
  */
 public class KafkaClient {
+
+    private static final String DEFAULT_TOPIC = "__consumer_offsets";
+
+    private KafkaConsumer consumer;
+    private KafkaProducer producer;
+
+    public List<Table> getTopics() {
+        Map<String, List<PartitionInfo>> topicMap = consumer.listTopics();
+        List<Table> topics = new ArrayList<>();
+        if (!CollectionUtils.isEmpty(topicMap)) {
+            topicMap.forEach((t, list) -> {
+                if (!StringUtil.equals(DEFAULT_TOPIC, t)) {
+                    topics.add(new Table(t));
+                }
+            });
+        }
+        return topics;
+    }
+
+    public boolean ping() {
+        return false;
+    }
+
+    public void close() {
+        if (null != producer) {
+            producer.close();
+        }
+        if (null != consumer) {
+            consumer.close();
+        }
+    }
+
+    public KafkaConsumer getConsumer() {
+        return consumer;
+    }
+
+    public void setConsumer(KafkaConsumer consumer) {
+        this.consumer = consumer;
+    }
+
+    public KafkaProducer getProducer() {
+        return producer;
+    }
+
+    public void setProducer(KafkaProducer producer) {
+        this.producer = producer;
+    }
 }

+ 5 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -4,6 +4,7 @@ import org.dbsyncer.common.model.Result;
 import org.dbsyncer.connector.Connector;
 import org.dbsyncer.connector.ConnectorMapper;
 import org.dbsyncer.connector.config.*;
+import org.dbsyncer.connector.util.KafkaUtil;
 
 import java.util.List;
 import java.util.Map;
@@ -11,17 +12,17 @@ import java.util.Map;
 public class KafkaConnector implements Connector<KafkaConnectorMapper, KafkaConfig> {
     @Override
     public ConnectorMapper connect(KafkaConfig config) {
-        return new KafkaConnectorMapper(config, new KafkaClient());
+        return new KafkaConnectorMapper(config, KafkaUtil.getConnection(config));
     }
 
     @Override
     public void disconnect(KafkaConnectorMapper connectorMapper) {
-
+        KafkaUtil.close(connectorMapper.getConnection());
     }
 
     @Override
     public boolean isAlive(KafkaConnectorMapper connectorMapper) {
-        return false;
+        return connectorMapper.getConnection().ping();
     }
 
     @Override
@@ -31,7 +32,7 @@ public class KafkaConnector implements Connector<KafkaConnectorMapper, KafkaConf
 
     @Override
     public List<Table> getTable(KafkaConnectorMapper connectorMapper) {
-        return null;
+        return connectorMapper.getConnection().getTopics();
     }
 
     @Override

+ 52 - 0
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/KafkaUtil.java

@@ -0,0 +1,52 @@
+package org.dbsyncer.connector.util;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.dbsyncer.connector.config.KafkaConfig;
+import org.dbsyncer.connector.kafka.KafkaClient;
+
+import java.util.Properties;
+
+public abstract class KafkaUtil {
+
+    public static KafkaClient getConnection(KafkaConfig config) {
+        KafkaClient client = new KafkaClient();
+        // Consumer API
+        {
+            Properties props = new Properties();
+            props.put("bootstrap.servers", config.getBootstrapServers());
+            props.put("group.id", config.getGroupId());
+            props.put("enable.auto.commit", true);
+            props.put("auto.commit.interval.ms", 5000);
+            props.put("session.timeout.ms", config.getSessionTimeoutMs());
+            props.put("max.partition.fetch.bytes", config.getMaxPartitionFetchBytes());
+            props.put("key.deserializer", config.getConsumerKeyDeserializer());
+            props.put("value.deserializer", config.getConsumerValueDeserializer());
+            client.setConsumer(new KafkaConsumer<>(props));
+        }
+
+        // Producer API
+        {
+            Properties props = new Properties();
+            props.put("bootstrap.servers", config.getBootstrapServers());
+            props.put("buffer.memory", config.getBufferMemory());
+            props.put("batch.size", config.getBatchSize());
+            props.put("linger.ms", config.getLingerMs());
+            props.put("acks", config.getAcks());
+            props.put("retries", config.getRetries());
+            props.put("max.block.ms", 60000);
+            props.put("max.request.size", config.getMaxRequestSize());
+            props.put("key.serializer", config.getProducerKeySerializer());
+            props.put("value.serializer", config.getProducerValueSerializer());
+            client.setProducer(new KafkaProducer<>(props));
+        }
+        return client;
+    }
+
+    public static void close(KafkaClient client) {
+        if (null != client) {
+            client.close();
+        }
+    }
+
+}

+ 14 - 4
dbsyncer-listener/src/main/test/KafkaClientTest.java

@@ -3,6 +3,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.dbsyncer.common.util.CollectionUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -10,6 +12,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -21,8 +25,8 @@ import java.util.concurrent.TimeUnit;
 public class KafkaClientTest {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private KafkaConsumer<String, String> consumer;
-    private KafkaProducer<String, String> producer;
+    private KafkaConsumer consumer;
+    private KafkaProducer producer;
 
     private String server = "192.168.1.100:9092";
     private String cKeyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
@@ -39,7 +43,7 @@ public class KafkaClientTest {
             props.put("bootstrap.servers", server);
             props.put("group.id", "test");
             props.put("enable.auto.commit", true);
-            props.put("auto.commit.interval.ms", 1000);
+            props.put("auto.commit.interval.ms", 5000);
             props.put("session.timeout.ms", 10000);
             props.put("max.partition.fetch.bytes", 1048576);
             props.put("key.deserializer", cKeyDeserializer);
@@ -56,7 +60,7 @@ public class KafkaClientTest {
             props.put("batch.size", 32768);
             props.put("linger.ms", 10);
             props.put("acks", "1");
-            props.put("retries", 3);
+            props.put("retries", 1);
             props.put("max.block.ms", 60000);
             props.put("max.request.size", 1048576);
             props.put("key.serializer", pKeySerializer);
@@ -78,6 +82,12 @@ public class KafkaClientTest {
     @Test
     public void testProducerAndConsumer() throws Exception {
         logger.info("test begin");
+        // __consumer_offsets
+        Map<String, List<PartitionInfo>> topics = consumer.listTopics();
+        if (!CollectionUtils.isEmpty(topics)) {
+            topics.forEach((t, list) -> logger.info("topic:{}", t));
+        }
+
         new Producer().start();
         new Consumer().start();
         TimeUnit.SECONDS.sleep(60);

+ 2 - 29
dbsyncer-web/src/main/resources/public/connector/addKafka.html

@@ -37,23 +37,6 @@
                             <input class="form-control" name="consumerValueDeserializer" type="text" dbsyncer-valid="require" th:value="${connector?.config?.consumerValueDeserializer?:'org.apache.kafka.common.serialization.StringDeserializer'}"/>
                         </div>
                     </div>
-                    <div class="form-group">
-                        <label class="col-sm-3 control-label">enable.auto.commit <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="如果为true,消费者的偏移量将在后台定期提交"></i></label>
-                        <div class="col-sm-3">
-                            <select class="form-control select-control" name="enableAutoCommit">
-                                <option value="false" th:selected="${connector?.config?.enableAutoCommit eq 'false'}">false</option>
-                                <option value="true" th:selected="${connector?.config?.enableAutoCommit eq 'true'}">true</option>
-                            </select>
-                        </div>
-                        <label class="col-sm-3 control-label">auto.commit.interval.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="enable.auto.commit为true才生效"></i></label>
-                        <div class="col-sm-3">
-                            <select class="form-control select-control" name="autoCommitIntervalMs">
-                                <option value="1000" th:selected="${connector?.config?.autoCommitIntervalMs eq '1000'}">1秒</option>
-                                <option value="1500" th:selected="${connector?.config?.autoCommitIntervalMs eq '1500'}">1.5秒</option>
-                                <option value="2000" th:selected="${connector?.config?.autoCommitIntervalMs eq '2000'}">2秒</option>
-                            </select>
-                        </div>
-                    </div>
                     <div class="form-group">
                         <label class="col-sm-3 control-label">session.timeout.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="在使用Kafka的组管理时,用于检测消费者故障超时"></i></label>
                         <div class="col-sm-3">
@@ -147,21 +130,11 @@ linger.ms配合batch.size一起来设置,可避免一个Batch迟迟凑不满
                         <label class="col-sm-3 control-label">retries <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="发送失败重试次数"></i></label>
                         <div class="col-sm-3">
                             <select class="form-control select-control" name="retries">
-                                <option value="3" th:selected="${connector?.config?.retries eq '3'}">3</option>
-                                <option value="2" th:selected="${connector?.config?.retries eq '2'}">2</option>
                                 <option value="1" th:selected="${connector?.config?.retries eq '1'}">1</option>
+                                <option value="2" th:selected="${connector?.config?.retries eq '2'}">2</option>
+                                <option value="3" th:selected="${connector?.config?.retries eq '3'}">3</option>
                             </select>
                         </div>
-                        <label class="col-sm-3 control-label">max.block.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。"></i></label>
-                        <div class="col-sm-3">
-                            <select class="form-control select-control" name="maxBlockMs">
-                                <option value="60000" th:selected="${connector?.config?.maxBlockMs eq '60000'}">60秒</option>
-                                <option value="30000" th:selected="${connector?.config?.maxBlockMs eq '30000'}">30秒</option>
-                                <option value="15000" th:selected="${connector?.config?.maxBlockMs eq '15000'}">15秒</option>
-                            </select>
-                        </div>
-                    </div>
-                    <div class="form-group">
                         <label class="col-sm-3 control-label">max.request.size <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="每次发送给Kafka服务器请求消息的最大大小,与max.partition.fetch.bytes保持一致"></i></label>
                         <div class="col-sm-3">
                             <select id="maxRequestSizeSelect" class="form-control select-control" name="maxRequestSize">