Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

RabbitMQ

The RabbitMQ module provides a pre-configured container for integration testing with RabbitMQ message broker.

Quick Start

open Lwt.Syntax
open Testcontainers_rabbitmq

let test_rabbitmq () =
  Rabbitmq_container.with_rabbitmq (fun container amqp_url ->
    Printf.printf "RabbitMQ running at: %s\n" amqp_url;
    Lwt.return_unit
  )

Installation

opam install testcontainers-rabbitmq

In your dune file:

(libraries testcontainers-rabbitmq)

Configuration

Basic Configuration

Rabbitmq_container.with_rabbitmq
  ~config:(fun c -> c
    |> Rabbitmq_container.with_username "myuser"
    |> Rabbitmq_container.with_password "mypass"
    |> Rabbitmq_container.with_vhost "/myapp")
  (fun container amqp_url ->
    (* amqp_url: amqp://myuser:mypass@127.0.0.1:5672/myapp *)
    ...
  )

Configuration Options

FunctionDefaultDescription
with_imagerabbitmq:3-management-alpineDocker image
with_usernameguestUsername
with_passwordguestPassword
with_vhost/Virtual host

Custom Image

Rabbitmq_container.with_rabbitmq
  ~config:(fun c -> c
    |> Rabbitmq_container.with_image "rabbitmq:3.12-alpine")
  (fun container amqp_url -> ...)

Connection Details

AMQP URL

amqp://username:password@host:port/vhost

Individual Components

Rabbitmq_container.with_rabbitmq (fun container amqp_url ->
  let* host = Rabbitmq_container.host container in
  let* amqp_port = Rabbitmq_container.amqp_port container in
  let* mgmt_port = Rabbitmq_container.management_port container in
  let config = Rabbitmq_container.create () in
  let username = Rabbitmq_container.username config in
  let password = Rabbitmq_container.password config in
  let vhost = Rabbitmq_container.vhost config in
  ...
)

Management UI

The default image includes the management plugin:

let* mgmt_port = Rabbitmq_container.management_port container in
Printf.printf "Management UI: http://127.0.0.1:%d\n" mgmt_port;
(* Default credentials: guest/guest *)

Manual Lifecycle

let run_tests () =
  let config =
    Rabbitmq_container.create ()
    |> Rabbitmq_container.with_username "admin"
    |> Rabbitmq_container.with_password "secret"
    |> Rabbitmq_container.with_vhost "/test"
  in

  let* container = Rabbitmq_container.start config in
  let* amqp_url = Rabbitmq_container.amqp_url config container in

  (* Run tests... *)

  let* () = Testcontainers.Container.terminate container in
  Lwt.return_unit

Queue and Exchange Setup

Using rabbitmqadmin

let setup_queues container =
  (* Declare exchange *)
  let* (code, _) = Testcontainers.Container.exec container [
    "rabbitmqadmin"; "declare"; "exchange";
    "name=events"; "type=topic"; "durable=true"
  ] in
  assert (code = 0);

  (* Declare queue *)
  let* (code, _) = Testcontainers.Container.exec container [
    "rabbitmqadmin"; "declare"; "queue";
    "name=user.events"; "durable=true"
  ] in
  assert (code = 0);

  (* Create binding *)
  let* (code, _) = Testcontainers.Container.exec container [
    "rabbitmqadmin"; "declare"; "binding";
    "source=events"; "destination=user.events";
    "routing_key=user.*"
  ] in
  assert (code = 0);

  Lwt.return_unit

Using rabbitmqctl

let setup_with_rabbitmqctl container =
  (* Add user *)
  let* _ = Testcontainers.Container.exec container [
    "rabbitmqctl"; "add_user"; "appuser"; "apppass"
  ] in

  (* Set permissions *)
  let* _ = Testcontainers.Container.exec container [
    "rabbitmqctl"; "set_permissions"; "-p"; "/";
    "appuser"; ".*"; ".*"; ".*"
  ] in

  (* Add vhost *)
  let* _ = Testcontainers.Container.exec container [
    "rabbitmqctl"; "add_vhost"; "myapp"
  ] in

  Lwt.return_unit

Publishing and Consuming Messages

Publish Test Message

let publish_message container =
  let* (code, _) = Testcontainers.Container.exec container [
    "rabbitmqadmin"; "publish";
    "exchange=amq.default";
    "routing_key=test.queue";
    "payload=Hello from test!"
  ] in
  Lwt.return (code = 0)

Get Message from Queue

let get_message container =
  let* (code, output) = Testcontainers.Container.exec container [
    "rabbitmqadmin"; "get"; "queue=test.queue"
  ] in
  Printf.printf "Message: %s\n" output;
  Lwt.return (code = 0)

Check Queue Status

let check_queue_status container queue_name =
  let* (code, output) = Testcontainers.Container.exec container [
    "rabbitmqadmin"; "list"; "queues";
    "name"; "messages"; "consumers"
  ] in
  Printf.printf "Queues:\n%s\n" output;
  Lwt.return (code = 0)

Complete Test Example

open Lwt.Syntax
open Testcontainers
open Testcontainers_rabbitmq

let with_test_rabbitmq f =
  Rabbitmq_container.with_rabbitmq
    ~config:(fun c -> c
      |> Rabbitmq_container.with_username "testuser"
      |> Rabbitmq_container.with_password "testpass")
    (fun container amqp_url ->
      (* Setup queue *)
      let* _ = Container.exec container [
        "rabbitmqadmin"; "-u"; "testuser"; "-p"; "testpass";
        "declare"; "queue"; "name=test.queue"; "durable=false"
      ] in
      f container amqp_url
    )

let test_publish_consume _switch () =
  with_test_rabbitmq (fun container _amqp_url ->
    (* Publish message *)
    let* (code, _) = Container.exec container [
      "rabbitmqadmin"; "-u"; "testuser"; "-p"; "testpass";
      "publish"; "routing_key=test.queue";
      "payload=Test message content"
    ] in
    Alcotest.(check int) "publish succeeds" 0 code;

    (* Consume message *)
    let* (code, output) = Container.exec container [
      "rabbitmqadmin"; "-u"; "testuser"; "-p"; "testpass";
      "get"; "queue=test.queue"; "ackmode=ack_requeue_false"
    ] in
    Alcotest.(check int) "get succeeds" 0 code;
    Alcotest.(check bool) "has content" true
      (String.length output > 0);

    Lwt.return_unit
  )

let test_exchange_routing _switch () =
  with_test_rabbitmq (fun container _amqp_url ->
    (* Create topic exchange *)
    let* _ = Container.exec container [
      "rabbitmqadmin"; "-u"; "testuser"; "-p"; "testpass";
      "declare"; "exchange"; "name=events"; "type=topic"
    ] in

    (* Create queues *)
    let* _ = Container.exec container [
      "rabbitmqadmin"; "-u"; "testuser"; "-p"; "testpass";
      "declare"; "queue"; "name=user.created"
    ] in
    let* _ = Container.exec container [
      "rabbitmqadmin"; "-u"; "testuser"; "-p"; "testpass";
      "declare"; "queue"; "name=user.deleted"
    ] in

    (* Bind queues *)
    let* _ = Container.exec container [
      "rabbitmqadmin"; "-u"; "testuser"; "-p"; "testpass";
      "declare"; "binding";
      "source=events"; "destination=user.created";
      "routing_key=user.created"
    ] in

    (* Publish to exchange *)
    let* (code, _) = Container.exec container [
      "rabbitmqadmin"; "-u"; "testuser"; "-p"; "testpass";
      "publish"; "exchange=events"; "routing_key=user.created";
      "payload={\"user_id\": 123}"
    ] in
    Alcotest.(check int) "publish succeeds" 0 code;

    (* Verify message routed correctly *)
    let* (code, output) = Container.exec container [
      "rabbitmqadmin"; "-u"; "testuser"; "-p"; "testpass";
      "get"; "queue=user.created"
    ] in
    Alcotest.(check int) "get succeeds" 0 code;
    Alcotest.(check bool) "message routed" true
      (String.length output > 0);

    Lwt.return_unit
  )

let () =
  Lwt_main.run (
    Alcotest_lwt.run "RabbitMQ Tests" [
      "messaging", [
        Alcotest_lwt.test_case "publish/consume" `Slow test_publish_consume;
        Alcotest_lwt.test_case "exchange routing" `Slow test_exchange_routing;
      ];
    ]
  )

Wait Strategy

RabbitMQ module waits for the broker to be fully started:

Wait_strategy.for_log "Server startup complete"

Ports

PortDescription
5672AMQP
15672Management UI (HTTP)
15692Prometheus metrics

Troubleshooting

Connection Refused on Port 5672

Ensure RabbitMQ is fully started:

(* Use with_rabbitmq which handles waiting *)
Rabbitmq_container.with_rabbitmq (fun container amqp_url ->
  (* Safe to connect here *)
  ...
)

Authentication Failed

Check credentials match:

Rabbitmq_container.with_username "myuser"
|> Rabbitmq_container.with_password "mypass"

(* AMQP URL must use same credentials *)

Virtual Host Not Found

Create vhost or use default:

(* Default vhost *)
Rabbitmq_container.with_vhost "/"

(* Or create custom vhost via rabbitmqctl *)
Container.exec container ["rabbitmqctl"; "add_vhost"; "myapp"]

Slow Startup

RabbitMQ can take 10-30 seconds:

Container_request.with_startup_timeout 60.0

Management Plugin Not Available

Use an image with management:

Rabbitmq_container.with_image "rabbitmq:3-management-alpine"