6

Interacting with Other Services

In the previous chapter, our monolithic application was split up into several microservices, and consequently, more network interactions between the different parts were included.

More interactions with other components can lead to complications of their own, however, such as a high volume of messages or large data sizes delaying responses, or long-running tasks taking up valuable resources. Since many of our useful tasks involve interacting with third-party services, the techniques to manage these changes are useful both inside our application and for communicating outside of it. Having the ability to loosely couple different parts of the system using some asynchronous messages is useful to prevent blockages and unwanted dependency entanglements.

In any case, the bottom line is that we need to interact with other services through the network, both synchronously and asynchronously. These interactions need to be efficient, and when something goes wrong, we need to have a plan.

The other problem introduced by adding more network connections is testing: how do we test a microservice in isolation that also needs to call other microservices to function? In this chapter, we will explore this in detail:

  • How one service can call another using synchronous and asynchronous libraries, and how to make these calls more efficient
  • How a service can use messages to make asynchronous calls and communicate with other services via events
  • We will also see some techniques to test services that have network dependencies

Calling other web resources

As we have seen in the previous chapters, synchronous interactions between microservices can be done via HTTP APIs using JSON payloads. This is by far the pattern most often used, because both HTTP and JSON are common standards. If your web service implements an HTTP API that accepts JSON, any developer using any programming language will be able to use it. Most of these interfaces are also RESTful, meaning that they follow the Representational State Transfer (REST) architecture principles of being stateless—with each interaction containing all the information needed instead of relying on previous exchanges—as well as cacheable and having a well-defined interface.

Following a RESTful scheme is not a requirement, however, and some projects implement Remote Procedure Call (RPC) APIs, which focus on the action being performed and abstract away the network requests from the code that handles the messages. In REST, the focus is on the resource, and actions are defined by HTTP methods. Some projects are a mix of both and don't strictly follow a given standard. The most important thing is that your service behavior should be consistent and well-documented. This book leans on REST rather than RPC, but is not strict about it, and recognizes that different situations have different solutions.

Sending and receiving JSON payloads is the simplest way for a microservice to interact with others, and only requires microservices to know the entry points and parameters to pass using HTTP requests.

To do this, you just need to use an HTTP client. Python has one as part of the http.client module, and in a synchronous Python environment, the Requests library is rightfully popular: https://docs.python-requests.org.

As we are in an asynchronous environment, we will use aiohttp, which has a clean way of creating asynchronous web requests and offers built-in features that make it easier to perform multiple simultaneous asynchronous requests: https://docs.aiohttp.org/en/stable/.

HTTP requests in the aiohttp library are built around the concept of a session, and the best way to use it is to call CreateSession, creating a Session object that can be reused every time you interact with any service.

A Session object can hold authentication information and some default headers you may want to set for all requests that your application will make. It can also control default error handling behavior, storing cookies, and what timeouts to use. In the following example, the call to ClientSession will create an object with the right Content-Type headers:

# clientsession.py
import asyncio
import aiohttp
 
async def make_request(url):
    headers = {
        "Content-Type": "application/json",
    }
    async with aiohttp.ClientSession(headers=headers) as session:
        async with session.get(url) as response:
            print(await response.text())
 
 
url = "http://localhost:5000/api"
loop = asyncio.get_event_loop()
loop.run_until_complete(make_request(url))

If we should limit how many concurrent requests are being made to an external endpoint, there are two main approaches. aiohttp has a concept of connectors, and we can set options to control how many outgoing TCP connections a session can operate at once, as well as limiting those numbers for a single destination:

conn = aiohttp.TCPConnector(limit=300, limit_per_host=10)
session = aiohttp.ClientSession(connector=conn)

This might be enough for our needs; however, if we make several outgoing connections to complete one request, we could end up in a situation where each piece of work is continuously blocking after each one as we reach the limit. Ideally, we would like a discrete chunk of work to continue until it's done, and for that we can use a semaphore. A semaphore is a simple token that gives code permission to perform a task. If we were to add a semaphore with three slots, then the first three tasks that try to access the semaphore will take a slot each and carry on. Any other task that requests the semaphore will have to wait until one of the slots is free.

Since the most common way to request a semaphore is inside a with block, this means that as soon as the context of the with block is over, the semaphore is released—inside the semaphore object's __exit__ function:

# clientsession_list.py
import asyncio
import aiohttp
async def make_request(url, session, semaphore):
    async with semaphore, session.get(url) as response:
        print(f"Fetching {url}")
        await asyncio.sleep(1)  # Pretend there is real work happening
        return await response.text()
async def organise_requests(url_list):
    semaphore = asyncio.Semaphore(3)
    tasks = list()
    async with aiohttp.ClientSession() as session:
        for url in url_list:
            tasks.append(make_request(url, session, semaphore))
        await asyncio.gather(*tasks)
 
urls = [
    "https://www.google.com",
    "https://developer.mozilla.org/en-US/",
    "https://www.packtpub.com/",
    "https://aws.amazon.com/",
]
loop = asyncio.get_event_loop()
loop.run_until_complete(organise_requests(urls))

Let us now see how we can generalize this pattern in a Quart app that needs to interact with other services.

This naive implementation is based on the hypothesis that everything will go smoothly, but real life is rarely so easy. We can set up different error handling options in a ClientSession, such as retries and timeouts, and we only need to set them up in that one place.

Finding out where to go

When we make a web request to a service, we need to know which Uniform Resource Locator (URL) to use. Most of the examples in this book use hardcoded URLs—that is, they are written into the source code. This is nice and easy to read for an example, but can be a problem when maintaining software. What happens when a service gets a new URI, and its hostname or IP address changes? It might move between AWS regions due to a failure or be migrated from Google Cloud Platform to Microsoft Azure. An API update can make the path to a resource change, even if the hostname or IP address has not updated.

We want to pass in data about which URLs to use as configuration to our application. There are several options to manage more configuration options without adding them directly to the code, such as environment variables and service discovery.

Environment variables

Container-based environments are common these days, and we will discuss them in more detail in Chapter 10, Deploying on AWS. The most common approach to get configuration options into a container is to pass the container some environment variables. This has the advantage of being straightforward, since the code just needs to examine the environment when processing its configuration:

import os
def create_app(name=__name__, blueprints=None, settings=None):
    app = Quart(name)
    app.config["REMOTE_URL"] = os.environ.get("OTHER_SERVICE_URL", "https://default.url/here")

The downside to this approach is that if the URL changes, then we need to restart the application—and sometimes redeploy it—with the new environment. If you don't expect the configuration to change very often, environment variables are still a good idea due to their simplicity, although we must be careful to not record any secrets that are in environment variables when we log messages.

Service discovery

But what if we did not need to tell our service about all its options when we deploy it? Service discovery is an approach that involves configuring an application with just a few pieces of information: where to ask for configuration and how to identify the right questions to ask.

Services such as etcd (https://etcd.io/) provide a reliable key-value store in which to keep this configuration data. For example, let's use etcd to store the URL of the production and development RabbitMQ instances:

$ etcdctl put myservice/production/rabbitmq/url https://my.rabbitmq.url/
OK
$ etcdctl get myservice/production/rabbitmq/url
myservice/production/rabbitmq/url
https://my.rabbitmq.url/

When an application starts up, it can check to see whether it is running in production or in a local development environment and ask etcd for the right value—either myservice/production/rabbitmq/url or myservice/development/rabbitmq/url. With a single option in a deployment, it is possible to change a whole number of configuration options, use different external URLs, bind to different ports, or any other piece of configuration you might think of.

It's also possible to update the values in etcd, and when your application next checks for a new value, it will update and use that instead. Deploying a new version of RabbitMQ can now be done alongside the old version, and the swap will be a value change in etcd—or a change back if it goes wrong.

This approach does add complexity, both as an extra service to run and in terms of updating these values within your application, but it can be a valuable approach in more dynamic environments. We will discuss service discovery more in Chapter 10, Deploying on AWS, when we cover deploying an application on containers and in the cloud.

Transferring data

JSON is a human-readable data format. There is a long history of human-readable data transfer on the internet—a good example would be email, as you can quite happily type out the protocol needed to send an email as a human author. This readability is useful for determining exactly what is happening in your code and its connections, especially as JSON maps directly onto Python data structures.

The downside to this readability is the size of the data. Sending HTTP requests and responses with JSON payloads can add some bandwidth overhead in the long run, and serializing and deserializing data from Python objects to JSON structures also adds a bit of CPU overhead.

There are other ways to transfer data that involve caching, compression, binary payloads, or RPC, however.

HTTP cache headers

In the HTTP protocol, there are a few cache mechanisms that can be used to indicate to a client that a page that it's trying to fetch has not changed since its last visit. Caching is something we can do in our microservices on all the read-only API endpoints, such as GETs and HEADs.

The simplest way to implement it is to return, along with a result, an ETag header in the response. An ETag value is a string that can be considered as a version for the resource the client is trying to get. It can be a timestamp, an incremental version, or a hash. It's up to the server to decide what to put in it, but the idea is that it should be unique to the value of the response.

Like web browsers, when the client fetches a response that contains such a header, it can build a local dictionary cache that stores the response bodies and ETags as its values, and the URLs as its keys.

When making a new request, the client can look in its local cache and pass along a stored ETag value in the If-Modified-Since header. If the server sends back a 304 status code, it means that the response has not changed, and the client can use the previously stored one.

This mechanism can greatly reduce the response times from the server, since it can immediately return an empty 304 response when the content has not changed. If it has changed, the client gets the full message in the usual way.

Of course, this means the services that you are calling should implement this caching behavior by adding the proper ETag support. It's not possible to implement a generic solution for this because the cache logic depends on the nature of the data your service is managing. The rule of thumb is to version each resource and change that version every time the data changes. In the following example, the Quart app uses the current server time to create ETag values associated with users' entries. The ETag value is the current time since the epoch, in milliseconds, and is stored in the modified field.

The get_user() method returns a user entry from _USERS and sets the ETag value with response.set_etag. When the view gets some calls, it also looks for the If-None-Match header to compare it to the user's modified field, and returns a 304 response if it matches:

# quart_etag.py
from datetime import datetime
from quart import Quart, Response, abort, jsonify, request
app = Quart(__name__)
def _time2etag():
    return datetime.now().isoformat()
_USERS = {"1": {"name": "Simon", "modified": _time2etag()}}
@app.route("/api/user/<user_id>")
async def get_user(user_id):
    if user_id not in _USERS:
        return abort(404)
    user = _USERS[user_id]
    # returning 304 if If-None-Match matches
    if user["modified"] in request.if_none_match:
        return Response("Not modified", status=304)
    resp = jsonify(user)
    # setting the ETag
    resp.set_etag(user["modified"])
    return resp
if __name__ == "__main__":
    app.run()

The change_user() view sets a new modified value when the client modifies a user. In the following client session, we're changing the user, while also making sure that we get a 304 response when providing the new ETag value:

$ curl -v http://127.0.0.1:5000/api/user/1
*   Trying 127.0.0.1...
...
< HTTP/1.1 200
< content-type: application/json
< content-length: 56
< etag: "2021-06-29T21:32:25.685907"
< date: Tue, 29 Jun 2021 20:32:30 GMT
< server: hypercorn-h11
<
* Connection #0 to host 127.0.0.1 left intact
{"modified":"2021-06-29T21:32:25.685907","name":"Simon"}
 $ curl -v -H 'If-None-Match: 2021-06-29T21:32:25.685907' http://127.0.0.1:5000/api/user/1
...
< HTTP/1.1 304
...

This demonstration is a toy implementation that might not work well in production; relying on a server clock to store ETag values means you are sure that the clock is never set back in time and that if you have several servers, their clocks are all synchronized with a service, such as ntpdate.

There is also the problem of race conditions if two requests change the same entry within the same millisecond. Depending on your app, it may not be an issue, but then again if it is, then it may be a big one. A cleaner option is to have the modified field handled by your database system directly, and make sure its changes are done in serialized transactions. Sending the ETag with a POST request is also a good precaution against a race between concurrent updates—the server can use the ETag to verify what version of the data the client wants to update from, and if that version doesn't match, it is probably unsafe to update the data, as someone else has changed it first.

Some developers use hash functions for their ETag value because it's easy to compute in a distributed architecture, and it doesn't introduce any of the problems timestamps have. But calculating a hash has a CPU cost, and it means you need to pull the whole entry to do it—so it might be as slow as if you were sending back the actual data. That said, with a dedicated table in your database for all your hashes, you can probably come up with a solution that makes your 304 response fast in its return.

As we said earlier, there is no generic solution to implement an efficient HTTP cache logic—but it's worth implementing one if your client is doing a lot of reads on your service. When you have no choice but to send some data back, there are several ways to make it as efficient as possible, as we will see in the next section.

GZIP compression

Compression is an overarching term for reducing the size of data in such a way that the original data can be recovered. There are many different compression algorithms—some of them are general-purpose algorithms that can be used on any sort of data, while some of them are specialized to particular data formats and achieve very good results due to them making assumptions about how the data is structured.

There are trade-offs to make between the size of the compressed data, the speed of compression and decompression, and how widely implemented the compression algorithm is. It might be acceptable to spend a few minutes compressing a large data file if it spends most of its time being stored, as the space savings outweigh the access time taken, but for data that is short-lived or regularly accessed, then the overhead of compression and decompression is more important. For our purposes, we need a compression algorithm that is widely understood by different environments, even if it doesn't always achieve the smallest end result.

GZIP compression is available on almost every single system, and web servers such as Apache or nginx provide native support to compress responses that pass through them—which is far better than implementing your own ad hoc compression at the level of Python. It's important to remember that while this will save network bandwidth, it will use more CPU, and so experimenting with metrics collection activated will let us see the results—and decide whether this option is a good idea.

For example, this nginx configuration will enable GZIP compression for any response produced by the Quart app on port 5000, with an application/json content type:

http { 
    gzip  on; 
    gzip_types application/json; 
    gzip_proxied      any; 
    gzip_vary on; 
    server { 
        listen       80; 
        server_name  localhost; 
        
        location / {  
            proxy_pass http://localhost:5000; 
        } 
    } 

From the client side, making an HTTP request to the nginx server at localhost:8080, proxying for the application at localhost:5000 with an Accept-Encoding: gzip header, will trigger the compression:

$ curl http://localhost:8080/api -H "Accept-Encoding: gzip" 
<some binary output> 

In Python, requests made using the aiohttp and requests libraries will automatically decompress responses that are GZIP-encoded, so you don't have to worry about doing this when your service is calling another service.

Decompressing the data adds some processing, but Python's GZIP module relies on zlib (http://www.zlib.net/), which is very fast. To accept compressed responses to HTTP queries, we just need to add a header indicating we can deal with a GZIP-encoded response:

import asyncio
import aiohttp
async def make_request():
    url = "http://127.0.0.1:5000/api"
    headers = {
        "Accept-Encoding": "gzip",
    }
    async with aiohttp.ClientSession(headers=headers) as session:
        async with session.get(url) as response:
            print(await response.text())
loop = asyncio.get_event_loop()
loop.run_until_complete(make_request())

To compress the data that you are sending to the server, you can use the gzip module and specify a Content-Encoding header:

import asyncio
import gzip
import json
import aiohttp
async def make_request():
    url = "http://127.0.0.1:8080/api_post"
    headers = {
        "Content-Encoding": "gzip",
    }
    data = {"Hello": "World!", "result": "OK"}
    data = bytes(json.dumps(data), "utf8")
    data = gzip.compress(data)
    async with aiohttp.ClientSession(headers=headers) as session:
        async with session.post(url, data=data) as response:
            print(await response.text())
loop = asyncio.get_event_loop()
loop.run_until_complete(make_request())

In that case, however, you will get the zipped content in your Quart application, and you will need to decompress it in your Python code, or if you are using an nginx proxy that handles incoming web connections, nginx can decompress the requests for you. We discuss nginx in more detail in Chapter 10, Deploying on AWS. To summarize, setting up GZIP compression for all your service responses is a low-effort change with nginx, and your Python client can benefit from it by setting the right header. Sending compressed data is a little more complicated however, because the work isn't done for you—but it may still have benefits for large data transfers.

If you want to further reduce the size of HTTP request/response payloads, another option is to switch from JSON to binary payloads. That way, you do not have to deal with compression, and processing the data may be faster, but the message size reduction is not as good.

Protocol Buffers

While it is usually not relevant, if your microservice deals with a lot of data, using an alternative format can be an attractive option to increase performance, and decrease the required network bandwidth without having to use extra processing power and time compressing and decompressing the data. Two widely used binary formats are Protocol Buffers (protobuf) (https://developers.google.com/protocol-buffers) and MessagePack.

Protocol Buffers requires you to describe data that's being exchanged into some schema that will be used to index the binary content. The schemas add some work because all data that is transferred will need to be described in a schema, and you will need to learn a new Domain-Specific Language (DSL). In a typed language, such as Rust, C++, or Go, defining these structures is something that already has to be done, so the overhead is far less.

However, the advantages are that the messages are well defined and can be easily validated before either end of the network conversation attempts to use the information. It is also possible to generate code for various languages—including Python—that let you construct the data in a way that is more suitable for the language being used. The following example is taken from the protobuf documentation:

syntax = "proto2";
package tutorial; 
message Person { 
  required string name = 1; 
  required int32 id = 2; 
  optional string email = 3; 
  enum PhoneType { 
    MOBILE = 0; 
    HOME = 1; 
    WORK = 2; 
  } 
  message PhoneNumber { 
    required string number = 1; 
    optional PhoneType type = 2 [default = HOME]; 
  } 
  repeated PhoneNumber phones = 4; 
} 
message AddressBook { 
  repeated Person people = 1; 
} 

The schema is not very Pythonic, as it is intended to support multiple languages and environments. If you interact with statically typed languages or would like a feature to do basic syntax checking on data for you, then a definition like this may be helpful.

Using Protocol Buffers with a framework such as gRPC (https://grpc.io/) can abstract away the network interaction from your application, and instead provide a client with a function call in Python and little need to consider how it generates its return value.

MessagePack

Unlike Protocol Buffers, MessagePack (http://msgpack.org/) is schemaless, and can serialize your data by just calling a function. It's a simple alternative to JSON, and has implementations in most languages. The msgpack Python library (installed using the pip install msgpack-python command) offers the same level of integration as JSON:

>>> import msgpack
>>> data = {"this": "is", "some": "data"}
>>> msgpack.packb(data, use_bin_type=True)
b'x82xa4thisxa2isxa4somexa4data'
>>> msgpack.unpackb(msgpack.packb(data, use_bin_type=True))
{'this': 'is', 'some': 'data'}

Using MessagePack is simple compared to protobuf, but which one is faster and provides the best compression ratio depends a lot on your data. In some rare cases, plain JSON might be even quicker to serialize than a binary format.

In terms of compression, you can expect 10% to 20% compression with MessagePack, but if your JSON contains a lot of strings—which is often the case in microservices—GZIP will perform much better.

In the following example, a huge JSON payload of 48 KB that contains a lot of strings is converted using MessagePack and JSON and then GZIPped in both cases:

>>> sys.getsizeof(json.dumps(data))
35602
>>> sys.getsizeof(msgpack.packb(data))
30777
>>> sys.getsizeof(gzip.compress(bytes(json.dumps(data), 'utf8')))
3138
>>> sys.getsizeof(gzip.compress(msgpack.packb(data)))
3174

Using MessagePack reduces the size of the payload by approximately 14%, but GZIP is making it 11 times smaller with both JSON and MessagePack payloads!

It's clear that whatever format you are using, the best way to reduce the payload sizes is to use GZIP—and if your web server does not deal with decompression, it's straightforward in Python thanks to gzip.uncompress().

Message serialization often only supports basic data types, as they must remain unaware of what environment is running in both the source and destination. This means that they cannot encode data that might be commonly used in Python, such as datetime objects to represent time. While other languages have date and time representation, it is not done in the same way, and so data like this and other Python objects need to be converted into a serializable form that other platforms can understand. For date and time, common options include an integer representing epoch time (the number of seconds since 1st January 1970) or a string in ISO8601 format, such as 2021-03-01T13:31:03+00:00.

In any case, in a world of microservices where JSON is the most accepted standard, taking care of dates is a minor annoyance to stick with a universally adopted standard.

Unless all your services are in Python with well-defined structures, and you need to speed up the serialization steps as much as possible, it is probably simpler to stick with JSON.

Putting it together

Before moving on, we will quickly recall what we have covered so far:

  • Implementing HTTP cache headers is a great way to speed up repeated requests for data
  • GZIP compression is an efficient way to lessen the size of requests and responses and is easy to set up
  • Binary protocols are an attractive alternative to plain JSON, but it does depend on the situation

The next section will focus on asynchronous calls; everything your microservice can do that goes beyond the request/response pattern.

Asynchronous messages

In microservice architecture, asynchronous calls play a fundamental role when a process that is used to be performed in a single application now implicates several microservices. We touched briefly on this in the previous chapter with our change to the Jeeves application, which now communicates with its workers using an asynchronous message queue. To make the best use of these, we will investigate these tools in more depth.

Asynchronous calls can be as simple as a separate thread or process within a microservice app that is receiving some work to be done, and performs it without interfering with the HTTP request/response round trips that are happening at the same time.

But doing everything directly from the same Python process is not very robust. What happens if the process crashes and gets restarted? How do we scale background tasks if they are built like that?

It's much more reliable to send a message that gets picked by another program, and let the microservice focus on its primary goal, which is to serve responses to clients. If a web request does not need an immediate answer, an endpoint in our service can then become code that accepts an HTTP request, processes it, and passes it on, and its response to the client is now whether or not our service has successfully received the request rather than whether the request has been processed.

In the previous chapter, we looked at how Celery could be used to build a microservice that gets some work from a message broker like RabbitMQ. In that design, the Celery worker blocks—that is, it halts operation while it is waiting—until a new message is added to the RabbitMQ queue.

Message queue reliability

As with any distributed system, there are considerations with regard to reliability and consistency. Ideally, we would like to add a message to the queue and have it delivered—and acted upon—exactly once. In practice this is almost impossible to achieve in a distributed system, as components fail, experiencing high latency or packet loss, while all sorts of complex interactions occur.

We have two real choices, encoded in RabbitMQ's delivery strategies: "at-most-once" and "at-least-once."

A strategy to deliver a message at most once will not account for any unreliability in the message delivery system or failures in a worker. Once a worker has accepted the message, that is it: the message queue forgets about it. If the worker then suffers a failure and does not complete the chunk of work it has been given, that is something the wider system needs to cope with.

With a promise to deliver a message at least once, in the case of any failures the deliveries will be attempted again until a worker both accepts the message and acknowledges that it has acted upon it. This ensures that no data is lost, but it does mean that there are situations where the message can be delivered to more than one worker, and so some sort of universally unique identifier (UUID) is a good idea, so that while some work may be duplicated, it can be deduplicated when it is written to any database or storage. A wider discussion of distributed system reliability and consensus protocols like PAXOS would require a book of its own.

Basic queues

The pattern used by Celery workers is a push-pull tasks queue. One service pushes messages into a specific queue, and some workers pick them up from the other end and perform an action on them. Each task goes to a single worker. Consider the following diagram, shown in Figure 6.1.

image2.jpg

Figure 6.1: Tasks passing through a message queue

There is no bidirectional communication—the sender merely deposits a message in the queue and leaves. The next available worker gets the next message. This blind, unidirectional message passing is perfect when you want to perform some asynchronous parallel tasks, which makes it easy to scale.

In addition, once the sender has confirmed that the message was added to the broker, we can have message brokers—such as RabbitMQ—offer some message persistence. In other words, if all workers go offline, we don't lose the messages that are in the queue.

Topic exchanges and queues

Topics are a way of filtering and classifying messages that travel through the queue. When using topics, each message is sent with an extra label that helps to identify what sort of message it is, and our workers can subscribe to specific topics, or patterns that match several topics.

Let's imagine a scenario where we are releasing a mobile app to the Android Play Store and the Apple App Store. When our automation tasks finish building the Android app, we can send a message with a routing key of publish.playstore, so that RabbitMQ can route this message to the right topics. The reason that there is a difference between a routing key and a topic is that a topic can match a pattern. The worker that is capable of publishing files to the Play Store can subscribe to the topic publish.playstore and get its workload from those messages, but we could also have a queue for messages matching publish.* and a worker that sends notifications whenever something is about to be uploaded to the Play Store, the App Store, or any other place you might publish software.

In our microservices, this means we can have specialized workers that all register to the same messaging broker and get a subset of the messages that are added to it.

image1.jpg

Figure 6.2: Tasks of different types passing through a message queue

This sort of behavior exists in most message queue services, in slightly different forms. Let's look at how to set this up in RabbitMQ.

To install a RabbitMQ broker, you can look at the download page at http://www.rabbitmq.com/download.html.

Running the container should be enough for any local experiments. RabbitMQ implements the Advanced Message Queuing Protocol (AMQP). This protocol, described at http://www.amqp.org/, is a complete standard that has been developed for years by a group of companies working together.

AMQP is organized into three concepts: queues, exchanges, and bindings:

  • A queue is a recipient that holds messages and waits for consumers to pick them
  • An exchange is an entry point for publishers to add new messages to the system
  • A binding defines how messages are routed from exchanges to queues

For our topic queue, we need to set one exchange, so RabbitMQ accepts new messages, and all the queues we want for workers to pick messages. Between those two ends, we want to route the messages to the different queues, depending on the topics, using a binding.

Let's look at how we would set up our app publishing example from earlier. We will assume we have two workers: one that publishes Android applications, and the other that sends notifications, such as updating a website or sending an email. Using the rabbitmqadmin command line that gets installed with RabbitMQ, we can create all the necessary parts. If the admin command does not come installed, you can find instructions on installing it at https://www.rabbitmq.com/management-cli.html:

$ rabbitmqadmin declare exchange name=incoming type=topic 
exchange declared 
 
$ rabbitmqadmin declare queue name=playstore 
queue declared 
 
$ rabbitmqadmin declare queue name=notifications
queue declared 
 
$ rabbitmqadmin declare binding source="incoming" destination_type="queue" destination="playstore" routing_key="publish.playstore" 
binding declared 
 
$ rabbitmqadmin declare binding source="incoming" destination_type="queue" destination="notifications" routing_key="publish.*" 
binding declared 

In this setup, whenever a message is sent to RabbitMQ—and if the topic starts with publish—it will be sent to the notifications queue; and if it is publish.playstore, then it will end up in both the notifications and playstore queues. Any other topics will cause the message to be discarded.

To interact with RabbitMQ in the code, we can use Pika. This is a Python RPC client that implements all the RPC endpoints that a Rabbit service publishes: https://pika.readthedocs.io.

Everything we do with Pika can be done on the command line using rabbitmqadmin. You can directly get the status of all parts of the system, send and receive messages, and check what's in a queue. It is an excellent way to experiment with your messaging setup.

The following script shows how to publish two messages in RabbitMQ in the incoming exchange. One concerns a new app being published, and the other is about a newsletter:

from pika import BlockingConnection, BasicProperties
# assuming there's a working local RabbitMQ server with a working # guest/guest account
def message(topic, message):
    connection = BlockingConnection()
    try:
        channel = connection.channel()
        props = BasicProperties(content_type="text/plain", delivery_mode=1)
        channel.basic_publish("incoming", topic, message, props)
    finally:
        connection.close()
message("publish.playstore", "We are publishing an Android App!")
message("publish.newsletter", "We are publishing a newsletter!")

These RPC calls will each add one message to the incoming topic exchange. For the first message, the exchange will then add one message to the playstore queue, and for the second, two messages will be added—one to each queue. A worker script that waits for work that needs to be published to the Play Store would look like this:

import pika
def on_message(channel, method_frame, header_frame, body):
    print(f"Now publishing to the play store: {body}!")
    channel.basic_ack(delivery_tag=method_frame.delivery_tag)
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume("playstore", on_message)
try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()
connection.close()

Notice that Pika sends back an ACK to RabbitMQ about the message, so it can be safely removed from the queue once the worker has succeeded. This is the at-least-once strategy approach to message delivery. The notifications receiver can be identical apart from the queue it subscribes to and what it does with the message body:

$ python ./playstore_receiver.py
Now publishing to the play store: b'We are publishing an Android App!'!
$ python ./publish_receiver.py
We have some news! b'We are publishing an Android App!'!
We have some news! b'We are publishing a newsletter!'!

AMQP offers many patterns that you can investigate to exchange messages. The tutorial page has many examples, and they are all implemented using Python and Pika: http://www.rabbitmq.com/getstarted.html.

To integrate these examples in our microservices, the publisher phase is straightforward. Your Quart application can create a connection to RabbitMQ using pika.BlockingConnection and send messages through it. Projects such as pika-pool (https://github.com/bninja/pika-pool) implement simple connection pools so you can manage RabbitMQ channels without having to connect/disconnect every time you are sending something through RPC.

The consumers, on the other hand, are trickier to integrate into microservices. Pika can be embedded into an event loop running in the same process as the Quart application, and trigger a function when a message is received. It will simply be another entry point into the same code, and could be run alongside a RESTful API if that's also required.

Publish/subscribe

The previous pattern has workers that handle the specific topics of messages, and the messages consumed by a worker are completely gone from the queue. We even added code to acknowledge that the message was consumed.

When you want a message to be published to several workers, however, the Publish/Subscribe (pubsub) pattern needs to be used.

This pattern is the basis for building a general event system and is implemented exactly like the previous one, in which there is one exchange and several queues. The difference is that the exchange part has a fanout type.

In that setup, every queue that you bind to a fanout exchange will receive the same message. With pubsub in place, you can broadcast messages to all your microservices if necessary.

Putting it together

In this section, we have covered the following about asynchronous messaging:

  • Non-blocking calls should be used every time a microservice can execute some work out of band. There's no good reason to block a request if what you are doing is not utilized in the response.
  • Service-to-service communication is not always limited to task queues.
  • Sending events through a message queue is a good way to prevent tightly coupled components.
  • We can build a full event system around a broker—such as RabbitMQ—to make our microservices interact with each other via messages.
  • RabbitMQ can be used to coordinate all the message passing, with messages sent using Pika.

Testing

As we learned in Chapter 3, Coding, Testing, and Documentation: the Virtuous Cycle, the biggest challenge when writing functional tests for a service that calls other services is to isolate all network calls. In this section, we'll see how we can mock asynchronous calls made using aiohttp.

Testing aiohttp and its outgoing web requests involves a different approach to traditional synchronous tests. The aioresponses project (https://github.com/pnuckowski/aioresponses) allows you to easily create mocked responses to web requests made using an aiohttp ClientSession:

# test_aiohttp_fixture.py
import asyncio
import aiohttp
import pytest
from aioresponses import aioresponses
@pytest.fixture
def mock_aioresponse():
    with aioresponses() as m:
        yield m
@pytest.mark.asyncio
async def test_ctx(mock_aioresponse):
    async with aiohttp.ClientSession() as session:
        mock_aioresponse.get("http://test.example.com", payload={"foo": "bar"})
        resp = await session.get("http://test.example.com")
        data = await resp.json()
    assert {"foo": "bar"} == data  

In this example, we tell aioresponses that any GET request made to http://test.example.com should return the data we specify. This way we can easily provide mocked responses for several URLs, and even the same URL by invoking mocked.get more than once to create multiple responses for the same endpoint.

If you are using Requests to perform all the calls—or you are using a library that is based on Requests that does not customize it too much—this isolation work is also easy to do thanks to the requests-mock project (https://requests-mock.readthedocs.io), which implements mocked calls in a similar way, and likely inspired aioresponses.

That said, mocking responses from other services is still a fair amount of work, and can be difficult to maintain. It means that an eye needs to be kept on how the other services are evolving over time, so your tests are not based on a mock that's no longer a reflection of the real API.

Using mocks is encouraged to build good functional tests coverage, but make sure you are doing integration tests as well, where the service is tested in a deployment where it calls other services for real.

Using OpenAPI

The OpenAPI Specification (https://www.openapis.org/), previously known as Swagger, is a standard way of describing a set of HTTP endpoints, how they are used, and the structure of the data that is sent and received. By describing an API using a JSON or YAML file, it allows the intent to become machine-readable—this means that with an OpenAPI Specification, you can use a code generator to produce a client library in a language of your choosing, or to automatically validate data as it enters or leaves the system.

OpenAPI has the same goal that WSDL (https://www.w3.org/TR/2001/NOTE-wsdl-20010315) had back in the XML web services era, but it's much lighter and straight to the point.

The following example is a minimal OpenAPI description file that defines one single /apis/users_ids endpoint and supports the GET method to retrieve the list of user IDs:

---
openapi: "3.0.0"
info:
  title: Data Service
  description: returns info about users
  license:
    name: APLv2
    url: https://www.apache.org/licenses/LICENSE-2.0.html
  version: 0.1.0
basePath: /api
paths:
  /user_ids:
    get:
      operationId: getUserIds
      description: Returns a list of ids
      produces:
        - application/json
      responses:
        '200':
          description: List of Ids
          schema:
            type: array
            items:
              type: integer

The full OpenAPI Specification can be found on GitHub; it is very detailed and will let you describe metadata about the API, its endpoints, and the data types it uses: https://github.com/OAI/OpenAPI-Specification.

The data types described in the schema sections are following the JSON Schema specification (http://json-schema.org/latest/json-schema-core.html). Here, we are describing that the /get_ids endpoint returns an array of integers.

You can provide a lot of detail about your API in that specification—things such as what headers should be present in your requests, or what will be the content type of some responses and can be added to it.

Describing your HTTP endpoints with OpenAPI offers some excellent possibilities:

  • There are a plethora of OpenAPI clients that can consume your description and do something useful with it, such as building functional tests against your service, or validating data that is sent to it.
  • It provides a standard, language-agnostic documentation for your API
  • The server can check that the requests and responses follow the spec

Some web frameworks even use the specification to create all the routing and I/O data checks for your microservices; for instance, Connexion (https://github.com/zalando/connexion) does this for Flask. Support for this within Quart is limited at the time of writing, but the situation is always improving. For this reason, we won't be using OpenAPI a great deal in the examples presented here.

There are two schools of thought when people are building HTTP APIs with OpenAPI:

  • Specification-first, where you create a Swagger specification file and then create your app on top of it, using all the information provided in that specification. That's the principle behind Connexion.
  • Specification-extracted, where it is your code that generates the Swagger specification file. Some toolkits out there will do this by reading your view docstrings, for instance.

Summary

In this chapter, we've looked at how a service can interact with other services synchronously, by using a Requests session, and asynchronously, by using Celery workers or more advanced messaging patterns based on RabbitMQ.

We've also looked at some ways to test a service in isolation by mocking other services, but without mocking the message brokers themselves.

Testing each service in isolation is useful, but when something goes wrong, it's hard to know what happened, particularly if the bug happens in a series of asynchronous calls.

In that case, tracking what's going on with a centralized logging system helps a lot. The next chapter will explain how we can tool our microservices to follow their activities.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset