Skip to content

Rabbit Mq Basics

RabbitMQ#

  • A producer is a user application that sends messages.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.

Hello World - Message Queue#

Message queue - communication between systems. A simple example.

python send.py

#!/usr/bin/env python
import pika

# Establish a connection with rabbitMQ - a broker on the local machine
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Make sure the recipient queue exists - otherwise it will be dropped
channel.queue_declare(queue='hello')

# In RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange
# The default exchange is an empty string
# queue name must be specified in the routing_key
channel.basic_publish(
    exchange='',
    routing_key='hello',
    body='Hello World!'
)
print(" [x] Sent 'Hello World!'")

# Ensure network buffers flushed and message sent
connection.close()

python receive.py

#!/usr/bin/env python
import pika

# Establish a connection with rabbitMQ - a broker on the local machine
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Make sure the recipient queue exists - otherwise it will be dropped
# We are not sure that the queue exists already
channel.queue_declare(queue='hello')

# Receiving messages on the queue requires subscribing a callback function to a queue
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

# Tell RabbitMQ that this particular callback function should receive messages from our hello queue
channel.basic_consume(
    queue='hello',
    auto_ack=True,
    on_message_callback=callback
)

# finally, we enter a never-ending loop that waits for data and runs callbacks whenever necessary
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

Sleep - Task Queue#

Work queue - distributing long running tasks among workers

Real tasks here would be: images to be resized, pdf files to be rendered or emails to be sent.

Round-robin dispatching - easily parallelise work - add more workers and that way, scale easily.

We can run more than 1 worker.py at a time

By default, RabbitMQ will send each message to the next consumer, in sequence

python worker.py
python worker.py
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....

Messages are distributed to workers in a round robin

CLI#

Add it to you path on homebrew with:

export PATH=/usr/local/Cellar/rabbitmq/3.7.16/sbin:$PATH

View available queues

sudo rabbitmqctl list_queues

List exchanges on a server

sudo rabbitmqctl list_exchanges

List existing bindings

rabbitmqctl list_bindings

Message Acknowledgement#

What if a consumer dies with a task only halfway done?

Currently rabbitMQ will mark a message for deletion as it is delivered

But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.

For this RabbitMQ supports message acknowledgements. An ack tells RabbitMQ that it is free to delete a task

Manually message acknowledgements are enabled by default. We turn that off with:

channel.basic_consume(
    queue='hello',
    auto_ack=True,
    on_message_callback=callback
)

We should remove the auto_ack=True and send a real acknowledgement once the task is complete. You do that with:

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(
    queue='hello',
    on_message_callback=callback
)

It is common to forget acknowledgements, so use:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Message durability#

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to

Tell rabbitmq it is durable:

channel.queue_declare(queue='hello', durable=True)

Ensure the queue name os different

You also need to mark the messages as persistent with delivery_mode = 2:

channel.basic_publish(
    exchange='',
    routing_key="task_queue",
    body=message,
    properties=pika.BasicProperties(
        delivery_mode = 2, # make message persistent
    )
)

Fair Displatch#

This tells RabbitMQ not to give more than one message to a worker at a time

channel.basic_qos(prefetch_count=1)

If all the workers are busy, your queue can fill up

Pub Sub#

The assumption behind a work queue is that each task is delivered to exactly one worker.

You can also deliver a message to multiple consumers, known as “publish/subscribe”

the producer can only send messages to an exchange

An exchange receives messages from producers and pushes them to queues

What should the exchange do:

  • Append to a particular queue
  • Append to many queues
  • Should it get discarded

The exchange type define the rules:

  • direct
  • topic
  • headers
  • fanout - broadcasts all messages it receives to all the queues it knows

Lets crate a fanout exchange:

channel.exchange_declare(exchange='logs',
                        exchange_type='fanout')

We can now publish to this names exchange with:

channel.basic_publish(exchange='logs',
                  routing_key='',
                  body=message)

Create a fresh empty queue with:

result = channel.queue_declare(queue='')

Once a consumer connection is closed the queue should be deleted:

result = channel.queue_declare(queue='', exclusive=True)

Bindings#

The relationship between the exchange and the queue