find in path

End to end tests for KafkaListener

2019-11-18spring-kafkaend-to-end-testingtestcontainers

This post shows how the KafkaListener belonging to the spring-kafka library can be tested in an end-to-end fashion for both json and avro mesages.

The spring-kafka comes with a few testing utilities, but it doesn’t provide any utilities for testing the methods annotated with the KafkaListener annotation. Moreover, it makes use of an embedded Apache Kafka broker, instead of dockerized Apache Kafka container image artifacts.

This post concentrates on the concepts implemented in the project kafkalistener-e2e-test for dealing with end-to-end-testing for the methods annotated with the KafkaListener annotation.

The testcontainers library is employed for spawning before the tests a complete Confluent ecosystem of docker container images for artifacts related to Apache Kafka:

  • Apache Kafka
  • Apache Zookeeper
  • Confluent Schema Registry

By using versions for the container images that correspond to the Apache Kafka ecosystem from the production environment, there is simulated an environment which is very close to the one running in the production. This particularity gives a high relevance to the integration/ end-to-end tests for the kafka listener functionality.

It is very important to have the ability to perform end-to-end tests in a throwaway dockerized environment because there can be executed with a high certainty common scenarios that the kafka listener service is supposed to handle as part of its contract.

End to End Test setup

As mentioned previously, by employing the testcontainers library an entire Apache Kafka ecosystem will be spawned at the beginning of the tests. Check out the implementation related to testcontainers in the project kafkalistener-e2e-test for seeing how the Apache Kafka ecosystem artifacts are configured to work together for setting up the testing enviroment for the end to end tests.

//KafkaTestContainers.java

public KafkaTestContainers() throws IOException {
  this.network = Network.newNetwork();
  this.zookeeperContainer = new ZookeeperContainer()
      .withNetwork(network);
  this.kafkaContainer = new KafkaContainer(zookeeperContainer.getZookeeperConnect())
      .withNetwork(network);
  this.schemaRegistryContainer = new SchemaRegistryContainer(
      zookeeperContainer.getZookeeperConnect())
      .withNetwork(network);

  Startables
      .deepStart(Stream.of(zookeeperContainer, kafkaContainer, schemaRegistryContainer))
      .join();

}

Once the Apache Kafka ecosystem is up and running, the topics necessary for the end-to-end tests are created and the AVRO types are registered to Confluent Schema Registry docker container.

// KafkaDockerConfiguration.java

@Bean
public KafkaTestContainers kafkaTestContainers(
    @Value("${kafka.userBookmarkEventsJson.topic}") String userBookmarkEventJsonTopic,
    @Value("${kafka.userBookmarkEventsAvro.topic}") String userBookmarkEventAvroTopic
) throws Exception {
  var kafkaTestContainers = new KafkaTestContainers();

  createTopics(kafkaTestContainers, userBookmarkEventJsonTopic, userBookmarkEventAvroTopic);
  registerSchemaRegistryTypes(kafkaTestContainers.getSchemaRegistryContainer());
  return kafkaTestContainers;
}

After this setup, the rest of the spring beans from Spring’s dependency injection container (including the kafka listeners) are initialized and at this time there can be executed end-to-end tests.

// UserBookmarkEventJsonListenerTest.java

@MockBean
private UserBookmarkEventService userBookmarkEventService;

@Test
public void demo() {
  // GIVEN
  var userId = UUID.randomUUID().toString();
  var url = "https://findinpath.com";
  UserBookmarkEvent userBookmarkEvent = new UserBookmarkEvent(userId, url,
      Instant.now().toEpochMilli());

  // WHEN
  writeToTopic(userBookmarkEventJsonTopic, userBookmarkEvent);

  // THEN
  var argumentCaptor = ArgumentCaptor.forClass(UserBookmarkEvent.class);
  verify(userBookmarkEventService, timeout(10_000)).ingest(argumentCaptor.capture());
  UserBookmarkEvent capturedUserBookmarkEvent = argumentCaptor.getValue();
  assertThat(userBookmarkEvent, equalTo(capturedUserBookmarkEvent));
}

The demo test is quite straightforward, because it only concentrates to make sure that the service responsible of the business logic of handling the message is being called. Nevertheless, such a test ensures that the correct service is being called in the kafka listener and also that the message sent to the kafka topic is correctly deserialized.

Limitations

Compared to the tests in which the tests in which the KafkaConsumer can be manipulated directly in order to be able to reset the consumer offset after each test, spring-kafka hides the consumer instance inside the class org.springframework.kafka.listener.KafkaMessageListenerContainer.listenerConsumer with a private access. Even with extra motivation, when accessing the private field via Java Reflection, for resetting its offset, the operations on it will fail because multi-threaded access on the consumer is not supported (see org.apache.kafka.clients.consumer.KafkaConsumer.acquire method).

But even with the limitation of not being able to reset the consumer offset, it is still quite useful to ensure the fact that the right service is being called to handle Kafka message sent over the topic (otherwise said, regression test).

Source code

The proof of concept project kafkalistener-e2e-test offers two end-to-end sample tests:

  • com.findinpath.kafka.listener.UserBookmarkEventAvroListenerTest : for testing the consumption of messages serialized in AVRO format
  • com.findinpath.kafka.listener.UserBookmarkEventJsonListenerTest : for testing the consumption of messages serialized in JSON format

Run the command

mvn clean install

for executing the tests from this project.