使用Spring Embedded Kafka测试@KafkaListener
我正在尝试为我正在使用Spring Boot 2.x开发的Kafka听器编写一个单元测试。作为一个单元测试,我不想启动一个完整的Kafka服务器作为Zookeeper的实例。所以,我决定使用Spring Embedded Kafka。
我的听众的定义是非常基本的。
@Component
public class Listener {
private final CountDownLatch latch;
@Autowired
public Listener(CountDownLatch latch) {
this.latch = latch;
}
@KafkaListener(topics = "sample-topic")
public void listen(String message) {
latch.countDown();
}
}
此外,在收到消息后验证计数器是否等于零的测试也非常简单。latch
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {
@Autowired
private KafkaEmbedded embeddedKafka;
@Autowired
private CountDownLatch latch;
private KafkaTemplate<Integer, String> producer;
@Before
public void setUp() {
this.producer = buildKafkaTemplate();
this.producer.setDefaultTopic("sample-topic");
}
private KafkaTemplate<Integer, String> buildKafkaTemplate() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
return new KafkaTemplate<>(pf);
}
@Test
public void listenerShouldConsumeMessages() throws InterruptedException {
// Given
producer.sendDefault(1, "Hello world");
// Then
assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
}
}
不幸的是,测试失败了,我不明白为什么。是否可以使用 的实例来测试标有注释的方法?KafkaEmbedded
@KafkaListener
所有代码都共享在我的GitHub存储库kafka-listener中。
谢谢大家。