KafkaClientTest.java 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.apache.kafka.clients.consumer.ConsumerRecords;
  3. import org.dbsyncer.common.util.JsonUtil;
  4. import org.dbsyncer.connector.config.KafkaConfig;
  5. import org.dbsyncer.connector.enums.KafkaFieldTypeEnum;
  6. import org.dbsyncer.connector.kafka.KafkaClient;
  7. import org.dbsyncer.connector.kafka.serialization.JsonToMapDeserializer;
  8. import org.dbsyncer.connector.kafka.serialization.MapToJsonSerializer;
  9. import org.dbsyncer.connector.util.KafkaUtil;
  10. import org.dbsyncer.sdk.model.Field;
  11. import org.junit.After;
  12. import org.junit.Before;
  13. import org.junit.Test;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16. import java.util.ArrayList;
  17. import java.util.Arrays;
  18. import java.util.List;
  19. import java.util.concurrent.TimeUnit;
  20. /**
  21. * @author AE86
  22. * @version 1.0.0
  23. * @date 2021/11/23 23:13
  24. */
  25. public class KafkaClientTest {
  26. private final Logger logger = LoggerFactory.getLogger(getClass());
  27. private KafkaClient client;
  28. private KafkaConfig config;
  29. @Before
  30. public void init() {
  31. config = new KafkaConfig();
  32. config.setBootstrapServers("127.0.0.1:9092");
  33. config.setTopic("mytopic");
  34. config.setFields(getFields());
  35. config.setDeserializer(JsonToMapDeserializer.class.getName());
  36. config.setSerializer(MapToJsonSerializer.class.getName());
  37. config.setGroupId("test");
  38. config.setSessionTimeoutMs(10000);
  39. config.setMaxPartitionFetchBytes(1048576);
  40. config.setBufferMemory(33554432);
  41. config.setBatchSize(32768);
  42. config.setLingerMs(10);
  43. config.setAcks("1");
  44. config.setRetries(1);
  45. config.setMaxRequestSize(1048576);
  46. client = KafkaUtil.getConnection(config);
  47. }
  48. @After
  49. public void close() {
  50. client.close();
  51. }
  52. @Test
  53. public void testProducerAndConsumer() throws Exception {
  54. logger.info("test begin");
  55. logger.info("ping {}", client.ping());
  56. client.subscribe(Arrays.asList(config.getTopic()));
  57. // 模拟生产者
  58. // for (int i = 0; i < 5; i++) {
  59. // Map<String, Object> map = new HashMap<>();
  60. // map.put("id", i);
  61. // map.put("name", "张三" + i);
  62. // map.put("update_time", new Timestamp(System.currentTimeMillis()));
  63. // client.send(config.getTopic(), map.get("id").toString(), map);
  64. // }
  65. new Consumer().start();
  66. new Heartbeat().start();
  67. TimeUnit.SECONDS.sleep(6000);
  68. logger.info("test end");
  69. }
  70. private String getFields() {
  71. List<Field> fields = new ArrayList<>();
  72. fields.add(genField("id", KafkaFieldTypeEnum.STRING, true));
  73. fields.add(genField("name", KafkaFieldTypeEnum.STRING));
  74. fields.add(genField("age", KafkaFieldTypeEnum.INTEGER));
  75. fields.add(genField("count", KafkaFieldTypeEnum.LONG));
  76. fields.add(genField("type", KafkaFieldTypeEnum.SHORT));
  77. fields.add(genField("money", KafkaFieldTypeEnum.FLOAT));
  78. fields.add(genField("score", KafkaFieldTypeEnum.DOUBLE));
  79. fields.add(genField("status", KafkaFieldTypeEnum.BOOLEAN));
  80. fields.add(genField("create_date", KafkaFieldTypeEnum.DATE));
  81. fields.add(genField("time", KafkaFieldTypeEnum.TIME));
  82. fields.add(genField("update_time", KafkaFieldTypeEnum.TIMESTAMP));
  83. return JsonUtil.objToJson(fields);
  84. }
  85. private Field genField(String name, KafkaFieldTypeEnum typeEnum) {
  86. return genField(name, typeEnum, false);
  87. }
  88. private Field genField(String name, KafkaFieldTypeEnum typeEnum, boolean pk) {
  89. return new Field(name, typeEnum.getClazz().getSimpleName(), typeEnum.getType(), pk);
  90. }
  91. class Consumer extends Thread {
  92. public Consumer() {
  93. setName("Consumer-thread");
  94. }
  95. @Override
  96. public void run() {
  97. while (true) {
  98. ConsumerRecords<String, Object> records = client.poll(100);
  99. for (ConsumerRecord record : records) {
  100. logger.info("收到消息:offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
  101. }
  102. }
  103. }
  104. }
  105. class Heartbeat extends Thread {
  106. public Heartbeat() {
  107. setName("Heartbeat-thread");
  108. }
  109. @Override
  110. public void run() {
  111. while (true) {
  112. try {
  113. TimeUnit.SECONDS.sleep(3L);
  114. } catch (InterruptedException e) {
  115. e.printStackTrace();
  116. }
  117. logger.info("ping {}", client.ping());
  118. }
  119. }
  120. }
  121. }