AE86 3 years ago
parent
commit
f404cc2036
1 changed files with 5 additions and 5 deletions
  1. 5 5
      dbsyncer-listener/src/main/test/KafkaClientTest.java

+ 5 - 5
dbsyncer-listener/src/main/test/KafkaClientTest.java

@@ -26,8 +26,8 @@ public class KafkaClientTest {
 
     private String server = "192.168.100.100:9092";
     private String cKeyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
-    private String cValueSerializer = "org.apache.kafka.common.serialization.StringDeserializer";
-    private String pKeyDeserializer = "org.apache.kafka.common.serialization.StringSerializer";
+    private String cValueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
+    private String pKeySerializer = "org.apache.kafka.common.serialization.StringSerializer";
     private String pValueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
     private String topic = "mytopic";
 
@@ -43,7 +43,7 @@ public class KafkaClientTest {
             props.put("session.timeout.ms", 10000);
             props.put("max.partition.fetch.bytes", 1048576);
             props.put("key.deserializer", cKeyDeserializer);
-            props.put("value.deserializer", cValueSerializer);
+            props.put("value.deserializer", cValueDeserializer);
             consumer = new KafkaConsumer<>(props);
             consumer.subscribe(Arrays.asList(topic));
         }
@@ -59,7 +59,7 @@ public class KafkaClientTest {
             props.put("retries", 3);
             props.put("max.block.ms", 60000);
             props.put("max.request.size", 1048576);
-            props.put("key.serializer", pKeyDeserializer);
+            props.put("key.serializer", pKeySerializer);
             props.put("value.serializer", pValueSerializer);
             producer = new KafkaProducer<>(props);
         }
@@ -79,7 +79,7 @@ public class KafkaClientTest {
     public void testProducerAndConsumer() throws Exception {
         logger.info("test begin");
         new Producer().start();
-//        new Consumer().start();
+        new Consumer().start();
         TimeUnit.SECONDS.sleep(60);
         logger.info("test end");
     }