Topic queues

A variation of the task queue pattern is the topic pattern. In that case, instead of having workers blindly picking every message that is added to one or several queues, they subscribe to specific topics. A topic is just a label on a message, and workers can decide to filter the messages they pick from the queue so that they match the topic.

In our microservices, this means we can have specialized workers that all register to the same messaging broker and get a subset of the messages that are added to it.

Celery is an excellent tool for building tasks queues, however, for more complex messaging, we need to use another tool:

To implement complex messaging pattern, the good news is that we can use a Rabbit MQ message broker who still works with Celery and interacts with another library.

To install a RabbitMQ broker, you can look at the download page at http://www.rabbitmq.com/download.html and get started from there. A RabbitMQ broker is a TCP server that manages queues internally and dispatches messages from publishers to subscribers via RPC calls. Using it with Celery is just a small portion of what this system can offer.

RabbitMQ implements the Advanced Message Queuing Protocol (AMQP). This protocol, described in http://www.amqp.org/ is a complete standard that has been developed for years by majority of the companies in the industry.

AMQP is organized into three concepts: queues, exchanges, and bindings:

  • A queue is a recipient that holds messages and waits for consumers to pick them
  • An exchange is an entry point for publishers to add new messages to the system
  • A binding defines how messages are routed from exchanges to queues

For our topic queue, we need to set one exchange, so RabbitMQ accepts new messages, and all the queues we want for workers to pick messages. In the middle, we want to route the messages to the different queues depending on the topics, using a binding.

Let's say we have two workers, one that wants to receive messages about races and another one about training plans.

Every time a message is about a race, it gets labeled race.id, where race ;is a fixed prefix, and id ;is a unique ID for the race. Similarly, for training plans, it is training.id.

Using the rabbitmqadmin command line that gets installed with RabbitMQ, we can create all the necessary parts:

$ rabbitmqadmin declare exchange name=incoming type=topic 
exchange declared

$ rabbitmqadmin declare queue name=race
queue declared

$ rabbitmqadmin declare queue name=training
queue declared

$ rabbitmqadmin declare binding source="incoming" destination_type="queue" destination="race" routing_key="race.*"
binding declared

$ rabbitmqadmin declare binding source="incoming" destination_type="queue" destination="training" routing_key="training.*"
binding declared

In this setup, every message is sent to RabbitMQ, wherein, if the topic starts with race., it will be pushed into the race queue, and the training. ;ones will end up in the training queue.

To interact with RabbitMQ in the code, we can use Pika (https://pika.readthedocs.io) a Python RPC client that implements all the RPC endpoints a Rabbit service publishes.

Everything we do with Pika can be done on the command line using rabbitmqadmin. You can directly get the status of all parts of the system, send and receive messages, and check what's in a queue. It's an excellent way to experiment with your messaging setup.

The following script shows how to publish two messages in RabbitMQ in the incoming exchange. One about Race 34 ;and one about Training 12:

    from pika import BlockingConnection, BasicProperties

# assuming there's a working local RabbitMQ server with a working
guest/guest account
def message(topic, message):
connection = BlockingConnection()
try:
channel = connection.channel()
props = BasicProperties(content_type='text/plain',
delivery_mode=1)
channel.basic_publish('incoming', topic, message, props)
finally:
connection.close()

# sending a message about race 34
message('race.34', 'We have some results!')

# training 12
message('training.12', "It's time to do your long run")

These RPC calls will end up adding one message respectively in the race and training queues. A Race worker script that waits for news about races would look like this:

    import pika 

def on_message(channel, method_frame, header_frame, body):
race_id = method_frame.routing_key.split('.')[-1]
print('Race #%s: %s' % (race_id, body))
channel.basic_ack(delivery_tag=method_frame.delivery_tag)

print("Race NEWS!")
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume(on_message, queue='race')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()

connection.close()

Notice that Pika is sends back an ACK to RabbitMQ about the message, so it can be safely removed from the queue once the worker has succeeded.

An example of the output is as follows:

$ bin/python pika_worker.py 
Race NEWS!
Race #34: b'We have some results!'

AMQP offers many patterns you can investigate to exchange messages. The tutorial page at http://www.rabbitmq.com/getstarted.html has many examples, and they are all implemented using Python and Pika.

To integrate these examples in our microservices, the publisher part is straightforward. Your Flask application can create a synchronous connection to RabbitMQ using pika.BlockingConnection and send messages through it. Projects such as ;pika-pool (https://github.com/bninja/pika-pool) implement simple connection pools so you can manage RabbitMQ channels without having to connect/disconnect every time you are sending something through RPC.

The consumers, on the other hand, are trickier to integrate into microservices.

Pika can be embedded into an event loop running in the same process as the Flask application, and trigger a function when a message is received. That would be okay in an asynchronous framework, but for a Flask application, you will need to execute the code that uses the Pika client in a separate thread or process. The reason for this is that the event loop would be blocked every time a request is received in Flask.

The most reliable way to use a Pika client in order to interact with RabbitMQ is to have a standalone Python application that consumes messages on behalf of your Flask microservice and performs synchronous HTTP calls. It adds yet another intermediary, but with the ability to acknowledge that a message was successfully received, and with all the Requests tricks we learned earlier in this chapter, we can build a reliable bridge:

    import pika 
import requests
from requests.exceptions import ReadTimeout, ConnectionError

FLASK_ENDPOINT = 'http://localhost:5000/event'

def on_message(channel, method_frame, header_frame, body):
message = {'delivery_tag': method_frame.delivery_tag,
'message': body}
try:
res = requests.post(FLASK_ENDPOINT, json=message,
timeout=1.)
except (ReadTimeout, ConnectionError):
print('Failed to connect to %s.' % FLASK_ENDPOINT)
# need to implement a retry here
return

if res.status_code == 200:
print('Message forwarded to Flask')
channel.basic_ack(delivery_tag=method_frame.delivery_tag)

connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume(on_message, queue='race')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()

connection.close()

This script will perform HTTP calls on Flask with the messages delivered in the queue.

There's also a RabbitMQ plugin that does something similar by pushing messages to HTTP endpoints, but isolating this bridge into our little script offers more potential if we need to add logic-specific code. From a robustness and performance point of view, it's also probably better to avoid integrating HTTP pushes inside RabbitMQ.

In Flask, the /event endpoint can be a classical view:

    from flask import Flask, jsonify, request 

app = Flask(__name__)

@app.route('/event', methods=['POST'])
def event_received():
message = request.json['message']
# do something...
return jsonify({'status': 'OK'})

if __name__ == '__main__':
app.run()
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset