AE86 vor 3 Jahren
Ursprung
Commit
60c7e3344a

+ 75 - 19
dbsyncer-connector/src/main/java/org/dbsyncer/connector/kafka/KafkaClient.java

@@ -1,43 +1,107 @@
 package org.dbsyncer.connector.kafka;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.dbsyncer.common.util.CollectionUtils;
 import org.dbsyncer.common.util.StringUtil;
+import org.dbsyncer.connector.ConnectorException;
 import org.dbsyncer.connector.config.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 /**
- * // TODO implements Producer<?, ?>, Consumer<?>
+ * Kafka客户端,集成消费者、生产者API
  */
 public class KafkaClient {
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
     private static final String DEFAULT_TOPIC = "__consumer_offsets";
 
     private KafkaConsumer consumer;
     private KafkaProducer producer;
+    private NetworkClient networkClient;
+
+    public KafkaClient(KafkaConsumer consumer, KafkaProducer producer) {
+        this.consumer = consumer;
+        this.producer = producer;
+    }
 
     public List<Table> getTopics() {
-        Map<String, List<PartitionInfo>> topicMap = consumer.listTopics();
-        List<Table> topics = new ArrayList<>();
-        if (!CollectionUtils.isEmpty(topicMap)) {
-            topicMap.forEach((t, list) -> {
-                if (!StringUtil.equals(DEFAULT_TOPIC, t)) {
-                    topics.add(new Table(t));
-                }
-            });
+        try {
+            Map<String, List<PartitionInfo>> topicMap = consumer.listTopics();
+            List<Table> topics = new ArrayList<>();
+            if (!CollectionUtils.isEmpty(topicMap)) {
+                topicMap.forEach((t, list) -> {
+                    if (!StringUtil.equals(DEFAULT_TOPIC, t)) {
+                        topics.add(new Table(t));
+                    }
+                });
+            }
+            return topics;
+        } catch (Exception e) {
+            throw new ConnectorException("获取Topic异常" + e.getMessage());
         }
-        return topics;
     }
 
     public boolean ping() {
-        return false;
+        return ping(consumer);
+    }
+
+    private boolean ping(Object client) {
+        if (null == networkClient) {
+            synchronized (this) {
+                if (null == networkClient) {
+                    try {
+                        networkClient = (NetworkClient) invoke(invoke(client, "client"), "client");
+                    } catch (NoSuchFieldException e) {
+                        logger.error(e.getMessage());
+                    } catch (IllegalAccessException e) {
+                        logger.error(e.getMessage());
+                    }
+                }
+            }
+        }
+        final Node node = networkClient.leastLoadedNode(0);
+        InetSocketAddress address = new InetSocketAddress(node.host(), node.port());
+        if (address.isUnresolved()) {
+            throw new ConnectorException(String.format("DNS resolution failed for url in %s %s:%s", CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, node.host(), node.port()));
+        }
+        return true;
     }
 
+    private Object invoke(Object obj, String declaredFieldName) throws NoSuchFieldException, IllegalAccessException {
+        final Field field = obj.getClass().getDeclaredField(declaredFieldName);
+        field.setAccessible(true);
+        return field.get(obj);
+    }
+
+//    private boolean isConnected(Object obj, String nodeId) throws NoSuchFieldException, InvocationTargetException, IllegalAccessException {
+//        final Field field = obj.getClass().getDeclaredField("connectionStates");
+//        field.setAccessible(true);
+//        Object connectionStates = field.get(obj);
+//
+//        Method[] methods = connectionStates.getClass().getDeclaredMethods();
+//        Method method = null;
+//        for (int i = 0; i < methods.length; i++) {
+//            if (methods[i].getName() == "isConnected") {
+//                method = methods[i];
+//                method.setAccessible(true);
+//                break;
+//            }
+//        }
+//        return (boolean) method.invoke(connectionStates, nodeId);
+//    }
+
     public void close() {
         if (null != producer) {
             producer.close();
@@ -51,15 +115,7 @@ public class KafkaClient {
         return consumer;
     }
 
-    public void setConsumer(KafkaConsumer consumer) {
-        this.consumer = consumer;
-    }
-
     public KafkaProducer getProducer() {
         return producer;
     }
-
-    public void setProducer(KafkaProducer producer) {
-        this.producer = producer;
-    }
 }

+ 6 - 4
dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/KafkaUtil.java

@@ -10,8 +10,9 @@ import java.util.Properties;
 public abstract class KafkaUtil {
 
     public static KafkaClient getConnection(KafkaConfig config) {
-        KafkaClient client = new KafkaClient();
+
         // Consumer API
+        KafkaConsumer consumer;
         {
             Properties props = new Properties();
             props.put("bootstrap.servers", config.getBootstrapServers());
@@ -22,10 +23,11 @@ public abstract class KafkaUtil {
             props.put("max.partition.fetch.bytes", config.getMaxPartitionFetchBytes());
             props.put("key.deserializer", config.getConsumerKeyDeserializer());
             props.put("value.deserializer", config.getConsumerValueDeserializer());
-            client.setConsumer(new KafkaConsumer<>(props));
+            consumer = new KafkaConsumer<>(props);
         }
 
         // Producer API
+        KafkaProducer producer;
         {
             Properties props = new Properties();
             props.put("bootstrap.servers", config.getBootstrapServers());
@@ -38,9 +40,9 @@ public abstract class KafkaUtil {
             props.put("max.request.size", config.getMaxRequestSize());
             props.put("key.serializer", config.getProducerKeySerializer());
             props.put("value.serializer", config.getProducerValueSerializer());
-            client.setProducer(new KafkaProducer<>(props));
+            producer = new KafkaProducer<>(props);
         }
-        return client;
+        return new KafkaClient(consumer, producer);
     }
 
     public static void close(KafkaClient client) {

+ 55 - 63
dbsyncer-listener/src/main/test/KafkaClientTest.java

@@ -1,10 +1,10 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.PartitionInfo;
-import org.dbsyncer.common.util.CollectionUtils;
+import org.dbsyncer.connector.config.KafkaConfig;
+import org.dbsyncer.connector.config.Table;
+import org.dbsyncer.connector.kafka.KafkaClient;
+import org.dbsyncer.connector.util.KafkaUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -13,8 +13,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -25,72 +23,61 @@ import java.util.concurrent.TimeUnit;
 public class KafkaClientTest {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private KafkaConsumer consumer;
-    private KafkaProducer producer;
-
-    private String server = "192.168.1.100:9092";
-    private String cKeyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
-    private String cValueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
-    private String pKeySerializer = "org.apache.kafka.common.serialization.StringSerializer";
-    private String pValueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
-    private String topic = "mytopic";
+    private KafkaClient client;
 
     @Before
     public void init() {
-        // Consumer API
-        {
-            Properties props = new Properties();
-            props.put("bootstrap.servers", server);
-            props.put("group.id", "test");
-            props.put("enable.auto.commit", true);
-            props.put("auto.commit.interval.ms", 5000);
-            props.put("session.timeout.ms", 10000);
-            props.put("max.partition.fetch.bytes", 1048576);
-            props.put("key.deserializer", cKeyDeserializer);
-            props.put("value.deserializer", cValueDeserializer);
-            consumer = new KafkaConsumer<>(props);
-            consumer.subscribe(Arrays.asList(topic));
-        }
-
-        // Producer API
-        {
-            Properties props = new Properties();
-            props.put("bootstrap.servers", server);
-            props.put("buffer.memory", 33554432);
-            props.put("batch.size", 32768);
-            props.put("linger.ms", 10);
-            props.put("acks", "1");
-            props.put("retries", 1);
-            props.put("max.block.ms", 60000);
-            props.put("max.request.size", 1048576);
-            props.put("key.serializer", pKeySerializer);
-            props.put("value.serializer", pValueSerializer);
-            producer = new KafkaProducer<>(props);
-        }
+        String cKeyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
+        String cValueDeserializer = cKeyDeserializer;
+        String pKeySerializer = "org.apache.kafka.common.serialization.StringSerializer";
+        String pValueSerializer = pKeySerializer;
+
+        KafkaConfig config = new KafkaConfig();
+        config.setBootstrapServers("192.168.1.100:9092");
+
+        config.setGroupId("test");
+        config.setConsumerKeyDeserializer(cKeyDeserializer);
+        config.setConsumerValueDeserializer(cValueDeserializer);
+        config.setSessionTimeoutMs(10000);
+        config.setMaxPartitionFetchBytes(1048576);
+
+        config.setProducerKeySerializer(pKeySerializer);
+        config.setProducerValueSerializer(pValueSerializer);
+        config.setBufferMemory(33554432);
+        config.setBatchSize(32768);
+        config.setLingerMs(10);
+        config.setAcks("1");
+        config.setRetries(1);
+        config.setMaxRequestSize(1048576);
+
+        client = KafkaUtil.getConnection(config);
     }
 
     @After
     public void close() {
-        if (null != producer) {
-            producer.close();
-        }
-        if (null != consumer) {
-            consumer.close();
-        }
+        client.close();
     }
 
     @Test
     public void testProducerAndConsumer() throws Exception {
         logger.info("test begin");
-        // __consumer_offsets
-        Map<String, List<PartitionInfo>> topics = consumer.listTopics();
-        if (!CollectionUtils.isEmpty(topics)) {
-            topics.forEach((t, list) -> logger.info("topic:{}", t));
+        List<Table> topics = client.getTopics();
+        topics.forEach(t -> logger.info(t.getName()));
+
+        logger.info("ping {}", client.ping());
+
+        String topic = "mytopic";
+        logger.info("Subscribed to topic {}", topic);
+        client.getConsumer().subscribe(Arrays.asList(topic));
+
+        // 模拟生产者
+        for (int i = 0; i < 1; i++) {
+            client.getProducer().send(new ProducerRecord<>(topic, Integer.toString(i), "测试" + i));
         }
 
-        new Producer().start();
         new Consumer().start();
-        TimeUnit.SECONDS.sleep(60);
+        new Heartbeat().start();
+        TimeUnit.SECONDS.sleep(600);
         logger.info("test end");
     }
 
@@ -103,7 +90,7 @@ public class KafkaClientTest {
         @Override
         public void run() {
             while (true) {
-                ConsumerRecords<String, String> records = consumer.poll(100);
+                ConsumerRecords<String, String> records = client.getConsumer().poll(100);
                 for (ConsumerRecord<String, String> record : records) {
                     logger.info("收到消息:offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
                 }
@@ -111,16 +98,21 @@ public class KafkaClientTest {
         }
     }
 
-    class Producer extends Thread {
+    class Heartbeat extends Thread {
 
-        public Producer() {
-            setName("Producer-thread");
+        public Heartbeat() {
+            setName("Heartbeat-thread");
         }
 
         @Override
         public void run() {
-            for (int i = 0; i < 100; i++) {
-                producer.send(new ProducerRecord<>(topic, Integer.toString(i), "测试" + i));
+            while (true) {
+                try {
+                    TimeUnit.SECONDS.sleep(3L);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                logger.info("ping {}", client.ping());
             }
         }
     }