TL;DR: Use pytest-docker to create a test fixture that starts a Kafka container.

Problem: I want to test my Kafka client, but I don’t have a Kafka cluster

At work, we need to consume and produce messages to some queue. And one of the tools available already is Kafka.

Before integrating with the existing Kafka cluster, I want to test my client code. I want to ensure that it can consume and produce messages correctly.

I have an existing BaseQueueService class like this:

class BaseQueueService(ABC):
    @abstractmethod
    def publish(self, message: str) -> None:
        pass

    @abstractmethod
    def consume(self) -> str | None:
        pass

with existing implementations for Azure Service Bus and an InMemoryQueue for testing business logic.

So I want to create a KafkaQueueService class that implements this interface. And I want to test it, but I don’t have a Kafka cluster available.

Solution: Use docker to start a Kafka container for testing

I can use pytest-docker to create a test fixture that starts a Kafka container. This way, I can test my KafkaQueueService class without needing a Kafka cluster.

This is how I did it:

A docker-compose.yml file to start a Kafka container:

services:
  zookeeper:
    image: 'confluentinc/cp-zookeeper:latest'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: 'confluentinc/cp-kafka:latest'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    ports:
      - "9092:9092"
    expose:
      - "29092"

  init-kafka:
    image: 'confluentinc/cp-kafka:latest'
    depends_on:
      - kafka
    entrypoint: [ '/bin/sh', '-c' ]
    command: |
      "
      # blocks until kafka is reachable
      kafka-topics --bootstrap-server kafka:29092 --list

      echo -e 'Creating kafka topics'
      kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic testtopic --replication-factor 1 --partitions 1
      kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic input_test_topic --replication-factor 1 --partitions 1
      kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic output_test_topic --replication-factor 1 --partitions 1

      echo -e 'Successfully created the following topics:'
      kafka-topics --bootstrap-server kafka:29092 --list
      "

A conftest.py file to create a test fixture that starts the Kafka container:

def check_kafka_ready(required_topics, host="localhost", port=9092):
    from confluent_kafka import KafkaException
    from confluent_kafka.admin import AdminClient

    try:
        admin = AdminClient({"bootstrap.servers": f"{host}:{port}"})
        topics = admin.list_topics(timeout=5)
        # Check if all required topics are present
        if all(topic in topics.topics for topic in required_topics):
            return True
        else:
            return False
    except KafkaException:
        return False


@pytest.fixture(scope="session")
def kafka_url(docker_services):
    """Start kafka service and return the url."""
    port = docker_services.port_for("kafka", 9092)
    required_topics = ["testtopic", "input_test_topic", "output_test_topic"]
    docker_services.wait_until_responsive(
        check=lambda: check_kafka_ready(port=port, required_topics=required_topics),
        timeout=30.0,
        pause=0.1,
    )
    return f"localhost:{port}"

And finally, a test file to test the KafkaQueueService class:

@pytest.mark.kafka
def test_kafka_queue_can_publish_and_consume(kafka_url):
    kafka_queue_service = KafkaQueueService(
        broker=kafka_url,
        topic="testtopic",
        group_id="testgroup",
    )
    clear_messages_from_queue(kafka_queue_service)

    unique_message = "hello" + str(uuid.uuid4())
    kafka_queue_service.publish(unique_message)

    received_message = kafka_queue_service.consume()
    assert received_message == unique_message

Now I can test my KafkaQueueService class without needing a Kafka cluster. This even works on my CI/CD pipeline in Azure DevOps.

NOTE: The docker-services fixture starts ALL the docker services in the docker-compose.yml file.

Bonus: The passing implementation of KafkaQueueService

This passes the test above (and a few other tests I wrote):

from confluent_kafka import Consumer, KafkaError, Producer

class KafkaQueueService(BaseQueueService):
    def __init__(self, broker: str, topic: str, group_id: str):
        # Configuration for the producer and consumer
        self.topic = topic
        self.producer: Producer = Producer({"bootstrap.servers": broker})
        self.consumer: Consumer = Consumer(
            {
                "bootstrap.servers": broker,
                "group.id": group_id,
                "auto.offset.reset": "earliest",
                "enable.partition.eof": "true",
            }
        )
        self.consumer.subscribe([self.topic])

    def publish(self, message: str) -> None:
        """Publish a message to the Kafka topic."""
        logger.debug(f"Publishing message to topic {self.topic}: {message}")

        self.producer.produce(self.topic, message.encode("utf-8"))
        self.producer.flush()

    def consume(self) -> str | None:
        """Consume a single message from the Kafka topic."""
        logger.debug(f"Consuming message from topic {self.topic}")

        # Get the next message
        message = self.consumer.poll(timeout=20)
        if message is None:
            logger.debug("Consumer poll timeout")
            return None
        # No new message
        if message.error() is not None and message.error().code() == KafkaError._PARTITION_EOF:
            logger.debug("No new messages in topic")
            return None
        # Check for errors
        if message.error() is not None:
            raise Exception(f"Consumer error: {message.error()}")
        self.consumer.commit(message, asynchronous=False)
        return message.value().decode("utf-8")

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}(topic={self.topic})"