1
0
AE86 3 жил өмнө
parent
commit
f38803ae5a

+ 0 - 4
dbsyncer-biz/src/main/java/org/dbsyncer/biz/checker/impl/connector/KafkaConfigChecker.java

@@ -27,13 +27,10 @@ public class KafkaConfigChecker implements ConnectorConfigChecker<KafkaConfig> {
         Assert.hasText(valueSerializer, "valueSerializer is empty.");
 
         String groupId = params.get("groupId");
-        Assert.hasText(groupId, "groupId is empty.");
         boolean enableAutoCommit = BooleanUtil.toBoolean(params.get("enableAutoCommit"));
         long autoCommitIntervalMs = NumberUtil.toLong(params.get("autoCommitIntervalMs"));
         long maxPartitionFetchBytes = NumberUtil.toLong(params.get("maxPartitionFetchBytes"));
 
-        String topic = params.get("topic");
-        Assert.hasText(topic, "topic is empty.");
         long bufferMemory = NumberUtil.toLong(params.get("bufferMemory"));
         long batchSize = NumberUtil.toLong(params.get("batchSize"));
         long lingerMs = NumberUtil.toLong(params.get("lingerMs"));
@@ -51,7 +48,6 @@ public class KafkaConfigChecker implements ConnectorConfigChecker<KafkaConfig> {
         connectorConfig.setAutoCommitIntervalMs(autoCommitIntervalMs);
         connectorConfig.setMaxPartitionFetchBytes(maxPartitionFetchBytes);
 
-        connectorConfig.setTopic(topic);
         connectorConfig.setBufferMemory(bufferMemory);
         connectorConfig.setBatchSize(batchSize);
         connectorConfig.setLingerMs(lingerMs);

+ 0 - 9
dbsyncer-connector/src/main/java/org/dbsyncer/connector/config/KafkaConfig.java

@@ -19,7 +19,6 @@ public class KafkaConfig extends ConnectorConfig {
     private long maxPartitionFetchBytes;
 
     // 生产者
-    private String topic;
     private long bufferMemory;
     private long batchSize;
     private long lingerMs;
@@ -84,14 +83,6 @@ public class KafkaConfig extends ConnectorConfig {
         this.maxPartitionFetchBytes = maxPartitionFetchBytes;
     }
 
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
     public long getBufferMemory() {
         return bufferMemory;
     }

+ 2 - 2
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaConnector.java

@@ -11,7 +11,7 @@ import java.util.Map;
 public class KafkaConnector implements Connector<KafkaConnectorMapper, KafkaConfig> {
     @Override
     public ConnectorMapper connect(KafkaConfig config) {
-        return null;
+        return new KafkaConnectorMapper(config, new KafkaClient());
     }
 
     @Override
@@ -26,7 +26,7 @@ public class KafkaConnector implements Connector<KafkaConnectorMapper, KafkaConf
 
     @Override
     public String getConnectorMapperCacheKey(KafkaConfig config) {
-        return String.format("%s-%s-%s", config.getBootstrapServers(), config.getGroupId(), config.getTopic());
+        return String.format("%s-%s", config.getBootstrapServers(), config.getGroupId());
     }
 
     @Override

+ 31 - 32
dbsyncer-web/src/main/resources/public/connector/addKafka.html

@@ -32,17 +32,9 @@
                 </div>
                 <div class="panel-body">
                     <div class="form-group">
-                        <label class="col-sm-3 control-label">group.id <strong class="driverVerifcateRequired">*</strong></label>
-                        <div class="col-sm-3">
-                            <input class="form-control" name="groupId" type="text" maxlength="64" dbsyncer-valid="require" th:value="${connector?.config?.groupId}"/>
-                        </div>
-                        <label class="col-sm-3 control-label">session.timeout.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="在使用Kafka的组管理时,用于检测消费者故障超时"></i></label>
-                        <div class="col-sm-3">
-                            <select class="form-control select-control" name="sessionTimeoutMs">
-                                <option value="10000" th:selected="${connector?.config?.sessionTimeoutMs eq '10000'}">10秒</option>
-                                <option value="15000" th:selected="${connector?.config?.sessionTimeoutMs eq '15000'}">15秒</option>
-                                <option value="20000" th:selected="${connector?.config?.sessionTimeoutMs eq '20000'}">20秒</option>
-                            </select>
+                        <label class="col-sm-3 control-label">group.id </label>
+                        <div class="col-sm-9">
+                            <input class="form-control" name="groupId" type="text" maxlength="64" th:value="${connector?.config?.groupId}"/>
                         </div>
                     </div>
                     <div class="form-group">
@@ -63,9 +55,17 @@
                         </div>
                     </div>
                     <div class="form-group">
+                        <label class="col-sm-3 control-label">session.timeout.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="在使用Kafka的组管理时,用于检测消费者故障超时"></i></label>
+                        <div class="col-sm-3">
+                            <select class="form-control select-control" name="sessionTimeoutMs">
+                                <option value="10000" th:selected="${connector?.config?.sessionTimeoutMs eq '10000'}">10秒</option>
+                                <option value="15000" th:selected="${connector?.config?.sessionTimeoutMs eq '15000'}">15秒</option>
+                                <option value="20000" th:selected="${connector?.config?.sessionTimeoutMs eq '20000'}">20秒</option>
+                            </select>
+                        </div>
                         <label class="col-sm-3 control-label">max.partition.fetch.bytes <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="每次接收Kafka服务器订阅消息的最大大小,与max.request.size参数保持一致"></i></label>
                         <div class="col-sm-3">
-                            <select class="form-control select-control" name="maxPartitionFetchBytes">
+                            <select id="maxPartitionFetchBytesSelect" class="form-control select-control" name="maxPartitionFetchBytes">
                                 <option value="1048576" th:selected="${connector?.config?.maxPartitionFetchBytes eq '1048576'}">1MB</option>
                                 <option value="2097152" th:selected="${connector?.config?.maxPartitionFetchBytes eq '2097152'}">2MB</option>
                                 <option value="3145728" th:selected="${connector?.config?.maxPartitionFetchBytes eq '3145728'}">3MB</option>
@@ -87,20 +87,6 @@
                     <h3 class="panel-title">生产者配置</h3>
                 </div>
                 <div class="panel-body">
-                    <div class="form-group">
-                        <label class="col-sm-3 control-label">topic <strong class="driverVerifcateRequired">*</strong></label>
-                        <div class="col-sm-3">
-                            <input class="form-control" name="topic" type="text" maxlength="64" dbsyncer-valid="require" th:value="${connector?.config?.topic}"/>
-                        </div>
-                        <label class="col-sm-3 control-label">acks <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="0:客户端发送出去为成功;1:写入磁盘为成功;all:Leader保持同步Follower结束为成功"></i></label>
-                        <div class="col-sm-3">
-                            <select class="form-control select-control" name="acks">
-                                <option value="1" th:selected="${connector?.config?.acks eq '1'}">1</option>
-                                <option value="0" th:selected="${connector?.config?.acks eq '0'}">0</option>
-                                <option value="all" th:selected="${connector?.config?.acks eq 'all'}">all</option>
-                            </select>
-                        </div>
-                    </div>
                     <div class="form-group">
                         <label class="col-sm-3 control-label">buffer.memory <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="Kafka的客户端发送数据到服务器,不是来一条就发一条,而是经过缓冲,通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker。buffer.memory用来约束KafkaProducer能够使用的内存缓冲的大小,默认值32MB。
 如果buffer.memory设置的太小,可能导致的问题是:消息快速的写入内存缓冲里,但Sender线程来不及把Request发送到Kafka服务器,会造成内存缓冲很快就被写满。而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了。
@@ -136,12 +122,12 @@ linger.ms配合batch.size一起来设置,可避免一个Batch迟迟凑不满
                                 <option value="30" th:selected="${connector?.config?.lingerMs eq '20'}">20毫秒</option>
                             </select>
                         </div>
-                        <label class="col-sm-3 control-label">max.block.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。"></i></label>
+                        <label class="col-sm-3 control-label">acks <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="0:客户端发送出去为成功;1:写入磁盘为成功;all:Leader保持同步Follower结束为成功"></i></label>
                         <div class="col-sm-3">
-                            <select class="form-control select-control" name="maxBlockMs">
-                                <option value="60000" th:selected="${connector?.config?.maxBlockMs eq '60000'}">60秒</option>
-                                <option value="30000" th:selected="${connector?.config?.maxBlockMs eq '30000'}">30</option>
-                                <option value="15000" th:selected="${connector?.config?.maxBlockMs eq '15000'}">15秒</option>
+                            <select class="form-control select-control" name="acks">
+                                <option value="1" th:selected="${connector?.config?.acks eq '1'}">1</option>
+                                <option value="0" th:selected="${connector?.config?.acks eq '0'}">0</option>
+                                <option value="all" th:selected="${connector?.config?.acks eq 'all'}">all</option>
                             </select>
                         </div>
                     </div>
@@ -164,9 +150,17 @@ linger.ms配合batch.size一起来设置,可避免一个Batch迟迟凑不满
                         </div>
                     </div>
                     <div class="form-group">
+                        <label class="col-sm-3 control-label">max.block.ms <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。"></i></label>
+                        <div class="col-sm-3">
+                            <select class="form-control select-control" name="maxBlockMs">
+                                <option value="60000" th:selected="${connector?.config?.maxBlockMs eq '60000'}">60秒</option>
+                                <option value="30000" th:selected="${connector?.config?.maxBlockMs eq '30000'}">30秒</option>
+                                <option value="15000" th:selected="${connector?.config?.maxBlockMs eq '15000'}">15秒</option>
+                            </select>
+                        </div>
                         <label class="col-sm-3 control-label">max.request.size <i class="fa fa-question-circle fa_gray" aria-hidden="true" title="每次发送给Kafka服务器请求消息的最大大小,与max.partition.fetch.bytes保持一致"></i></label>
                         <div class="col-sm-3">
-                            <select class="form-control select-control" name="maxRequestSize">
+                            <select id="maxRequestSizeSelect" class="form-control select-control" name="maxRequestSize">
                                 <option value="1048576" th:selected="${connector?.config?.maxRequestSize eq '1048576'}">1MB</option>
                                 <option value="2097152" th:selected="${connector?.config?.maxRequestSize eq '2097152'}">2MB</option>
                                 <option value="3145728" th:selected="${connector?.config?.maxRequestSize eq '3145728'}">3MB</option>
@@ -184,6 +178,11 @@ linger.ms配合batch.size一起来设置,可避免一个Batch迟迟凑不满
     $(function () {
         // 初始化select插件
         initSelectIndex($(".select-control"), 1);
+
+        // 绑定下拉联动切换事件
+        $("#maxPartitionFetchBytesSelect").on('changed.bs.select',function(e){
+            $("#maxRequestSizeSelect").selectpicker('val', $(this).selectpicker('val'));
+        });
     })
 </script>
 </html>

BIN
dbsyncer-web/src/main/resources/static/img/Kafka.png