使用Spring Embedded Kafka测试@KafkaListener

我正在尝试为我正在使用Spring Boot 2.x开发的Kafka听器编写一个单元测试。作为一个单元测试,我不想启动一个完整的Kafka服务器作为Zookeeper的实例。所以,我决定使用Spring Embedded Kafka。

我的听众的定义是非常基本的。

@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}

此外,在收到消息后验证计数器是否等于零的测试也非常简单。latch

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }

    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}

不幸的是,测试失败了,我不明白为什么。是否可以使用 的实例来测试标有注释的方法?KafkaEmbedded@KafkaListener

所有代码都共享在我的GitHub存储库kafka-listener中。

谢谢大家。


答案 1

您可能在为使用者分配主题/分区之前发送消息。设置属性...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

...它默认为 。latest

这就像与控制台使用者一起使用一样。--from-beginning

编辑

哦;您没有使用引导的属性。

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

编辑2

顺便说一句,您可能还应该对(a)的结果执行a,以断言发送成功。get(10L, TimeUnit.SECONDS)template.send()Future<>

编辑3

要仅覆盖测试的偏移重置,您可以执行与代理地址相同的操作:

@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

但是,请记住,此属性仅在组首次使用时应用。若要在每次应用启动时始终从末尾开始,则必须在启动期间查找到末尾。

此外,我建议设置为,以便容器负责提交偏移量,而不仅仅是依赖于使用者客户端按时间计划执行此操作。enable.auto.commitfalse


答案 2

也许有人会发现这很有用。我遇到了类似的问题。本地测试正在运行(一些检查在 内执行),但在 Jenkins 管道中,测试失败。Awaitility.waitAtMost

解决方案是,就像在投票最多的答案中已经提到的那样,设置 。运行测试时,可以通过查看测试日志来检查是否正确设置了配置。适用于生产商和消费者的弹簧输出配置auto-offset-reset=earliest


推荐