Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Kafka

The Kafka module provides a pre-configured Apache Kafka container using KRaft mode (no ZooKeeper required) for integration testing.

Quick Start

open Lwt.Syntax
open Testcontainers_kafka

let test_kafka () =
  Kafka_container.with_kafka (fun container bootstrap_servers ->
    Printf.printf "Kafka running at: %s\n" bootstrap_servers;
    Lwt.return_unit
  )

Installation

opam install testcontainers-kafka

In your dune file:

(libraries testcontainers-kafka)

Configuration

Basic Usage

Kafka_container.with_kafka (fun container bootstrap_servers ->
  (* bootstrap_servers: "127.0.0.1:9092" *)
  ...
)

Configuration Options

FunctionDefaultDescription
with_imageapache/kafka:3.7.0Docker image

Custom Image

Kafka_container.with_kafka
  ~config:(fun c -> c
    |> Kafka_container.with_image "apache/kafka:3.6.0")
  (fun container bootstrap_servers -> ...)

Connection Details

Bootstrap Servers

127.0.0.1:9092

Individual Components

Kafka_container.with_kafka (fun container bootstrap_servers ->
  let* host = Kafka_container.host container in  (* "127.0.0.1" *)
  let* port = Kafka_container.port container in  (* 9092 *)
  ...
)

Manual Lifecycle

let run_tests () =
  let config = Kafka_container.create () in
  let* container = Kafka_container.start config in
  let* bootstrap = Kafka_container.bootstrap_servers config container in

  (* Run tests... *)

  let* () = Testcontainers.Container.terminate container in
  Lwt.return_unit

Topic Management

Create a Topic

let create_topic container topic_name =
  let* (exit_code, output) = Testcontainers.Container.exec container [
    "/opt/kafka/bin/kafka-topics.sh";
    "--create";
    "--topic"; topic_name;
    "--bootstrap-server"; "localhost:9092";
    "--partitions"; "1";
    "--replication-factor"; "1"
  ] in
  assert (exit_code = 0);
  Printf.printf "Created topic: %s\n" topic_name;
  Lwt.return_unit

List Topics

let list_topics container =
  let* (exit_code, output) = Testcontainers.Container.exec container [
    "/opt/kafka/bin/kafka-topics.sh";
    "--list";
    "--bootstrap-server"; "localhost:9092"
  ] in
  Printf.printf "Topics: %s\n" output;
  Lwt.return_unit

Producing and Consuming Messages

Produce Messages

let produce_message container topic message =
  let* (exit_code, _) = Testcontainers.Container.exec container [
    "sh"; "-c";
    Printf.sprintf "echo '%s' | /opt/kafka/bin/kafka-console-producer.sh --topic %s --bootstrap-server localhost:9092"
      message topic
  ] in
  Lwt.return (exit_code = 0)

Consume Messages

let consume_messages container topic =
  let* (exit_code, output) = Testcontainers.Container.exec container [
    "/opt/kafka/bin/kafka-console-consumer.sh";
    "--topic"; topic;
    "--bootstrap-server"; "localhost:9092";
    "--from-beginning";
    "--timeout-ms"; "5000"
  ] in
  Printf.printf "Messages: %s\n" output;
  Lwt.return_unit

Complete Test Example

open Lwt.Syntax
open Testcontainers
open Testcontainers_kafka

let test_kafka_messaging _switch () =
  Kafka_container.with_kafka (fun container _bootstrap ->
    (* Create topic *)
    let* (code, _) = Container.exec container [
      "/opt/kafka/bin/kafka-topics.sh";
      "--create"; "--topic"; "test-topic";
      "--bootstrap-server"; "localhost:9092";
      "--partitions"; "1";
      "--replication-factor"; "1"
    ] in
    Alcotest.(check int) "topic created" 0 code;

    (* Produce message *)
    let* (code, _) = Container.exec container [
      "sh"; "-c";
      "echo 'Hello Kafka!' | /opt/kafka/bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092"
    ] in
    Alcotest.(check int) "message produced" 0 code;

    Lwt.return_unit
  )

let () =
  Lwt_main.run (
    Alcotest_lwt.run "Kafka Tests" [
      "messaging", [
        Alcotest_lwt.test_case "produce" `Slow test_kafka_messaging;
      ];
    ]
  )

Wait Strategy

Kafka uses a log-based wait strategy to ensure the broker is ready:

Wait_strategy.for_log ~timeout:60.0 "Kafka Server started"

KRaft Mode

This module uses Apache Kafka in KRaft mode, which means:

  • No ZooKeeper dependency
  • Faster startup
  • Simpler configuration
  • Single container deployment

Troubleshooting

Connection Refused

Kafka needs time to start. Always use with_kafka:

(* Good *)
Kafka_container.with_kafka (fun container bootstrap -> ...)

(* May fail if Kafka isn't ready *)
let* container = Container.start request in
(* immediate connection attempt *)

Topic Creation Fails

Ensure the broker is fully started before creating topics. The wait strategy handles this automatically with with_kafka.

Consumer Timeout

For tests, use a reasonable timeout:

"--timeout-ms"; "5000"  (* 5 seconds *)