Multithreading

Threading is often considered to be a complex topic by developers. While this statement is totally true, Python provides high-level classes and functions that ease the usage of threading. CPython's implementation of threads comes with some inconvenient details that make them less useful than in other languages. They are still completely fine for some set problems that you may want to solve, but not for as many as in C or Java. In this section, we will discuss the limitations of multithreading in CPython, as well as the common concurrent problems where Python threads are a viable solution.

What is multithreading?

Thread is short for a thread of execution. A programmer can split his or her work into threads that run simultaneously and share the same memory context. Unless your code depends on third-party resources, multithreading will not speed it up on a single-core processor, and will even add some overhead for thread management. Multi-threading will benefit from a multiprocessor or multi-core machine and will parallelize each thread execution on each CPU core, thus making the program faster. Note that this is a general rule that should hold true for most programming languages. In Python, the performance benefit from multithreading on multicore CPUs has some limits, but we will discuss that later. For simplicity, let's assume for now that this statement is true.

The fact that the same context is shared among threads means you must protect data from concurrent access. If two threads update the same data without any protection, a race condition occurs. This is called a race hazard, where unexpected results may happen because of the code run by each thread making false assumptions about the state of the data.

Lock mechanisms help in protecting data, and thread programming has always been a matter of making sure that the resources are accessed by threads in a safe way. This can be quite hard and thread programming often leads to bugs that are hard to debug, since they are hard to reproduce. The worst problem occurs when, due to poor code design, two threads lock a resource and try to get the resource that the other thread has locked. They will wait for each other forever. This is called a deadlock and is quite hard to debug. Reentrant locks help a bit in this by making sure a thread doesn't get locked by attempting to lock a resource twice.

Nevertheless, when threads are used for isolated needs with tools that were built for them, they might increase the speed of the program.

Multithreading is usually supported at the system kernel level. When the machine has one single processor with a single core, the system uses a timeslicing mechanism. Here, the CPU switches from one thread to another so fast that there is an illusion of threads running simultaneously. This is done at the processing level as well. Parallelism without multiple processing units is obviously virtual and there is no performance gain from running multiple threads on such hardware. Anyway, sometimes it is still useful to implement code with threads even if it has to execute on a single core, and we will see a possible use case later.

Everything changes when your execution environment has multiple processors or multiple CPU cores for its disposition. Even if timeslicing is used, processes and threads are distributed among CPUs, providing the ability to run your program faster.

How Python deals with threads

Unlike some other languages, Python uses multiple kernel-level threads that can each run any of the interpreter-level threads. But the standard implementation of the language—CPython—comes with major limitation that renders threads less usable in many contexts. All threads accessing Python objects are serialized by one global lock. This is done because much of the interpreter internal structures, as well as third-party C code, are not thread-safe and need to be protected.

This mechanism is called the Global Interpreter Lock (GIL) and its implementation details on the Python/C API level were already discussed in the Releasing GIL section of Chapter 7, Python Extensions in Other Languages. The removal of GIL is a topic that occasionally appears on the python-dev e-mail list and was postulated by developers multiple times. Sadly, until this time, no one ever managed to provide a reasonable and simple solution that would allow us to get rid of this limitation. It is highly improbable that we will see any progress in this area soon. It is safer to assume that GIL will stay in CPython forever. So we need to learn how to live with it.

So what is the point of multithreading in Python?

When threads contain only pure Python code, there is little point in using threads to speed up the program since the GIL will serialize it. But remember that GIL just enforces that only one thread can execute the Python code at any time. In practice, the global interpreter lock is released on a number of blocking system calls and can be released in sections of C extensions that do not use any Python/C API functions. This means, multiple threads can do I/O operations or execute C code in certain third-party extensions in parallel.

For nonpure code blocks where external resources are used or C code is involved, multithreading is useful for waiting for a third-party resource to return results. This is because a sleeping thread that has explicitly released the GIL can stand by and wake up when the results are back. Last, whenever a program needs to provide a responsive interface, multithreading is the answer even if it uses timeslicing. The program can interact with the user while doing some heavy computing in the so-called background.

Note that GIL does not exist in every implementation of the Python language. It is a limitation of CPython, Stackless Python, and PyPy, but does not exist in Jython and IronPython (see Chapter 1, Current Status of Python). There is although some development of the GIL-free version of PyPy, but at the time of writing this book, it is still at an experimental stage and the documentation is lacking. It is based on Software Transactional Memory and is called PyPy-STM. It is really hard to say when (or if) it will be officially released as a production-ready interpreter. Everything seems to indicate that it won't happen soon.

When should threading be used?

Despite the GIL limitation, threads can be really useful in some cases. They can help in:

  • Building responsive interfaces
  • Delegating work
  • Building multiuser applications

Building responsive interfaces

Let's say you ask your system to copy files from a folder to another through a graphical user interface. The task will possibly be pushed into the background and the interface window will be constantly refreshed by the main thread. This way you get live feedback on the progress of the whole process. You will also be able to cancel the operation. This is less irritating than a raw cp or copy shell command that does not provide any feedback until all work is finished.

A responsive interface also allows a user to work on several tasks at the same time. For instance, Gimp will let you play around with a picture while another one is being filtered, since the two tasks are independent.

When trying to achieve such responsive interfaces, a good approach is to try to push long running tasks into the background, or at least try to provide constant feedback to the user. The easiest way to achieve that is to use threads. In such a scenario, they are not intended to increase performance, but only to make sure that the user can still operate the interface even if it needs to process some data for a longer period of time.

In case such background tasks perform a lot of I/O operations, you are able to still get some benefit from multicore CPUs. Then it's a win-win situation.

Delegating work

If your process depends on third-party resources, threads might really speed up everything.

Let's consider the case of a function that indexes files in a folder and pushes the built indexes into a database. Depending on the type of file, the function calls a different external program. For example, one is specialized in PDFs and another one in OpenOffice files.

Instead of treating each file in a sequence, by executing the right program and then storing the result into the database, your function can set up a thread for each converter and push jobs to be done to each one of them through a queue. The overall time taken by the function will be closer to the processing time of the slowest converter than to the sum of all the work.

Converter threads can be initialized from the start and the code in charge of pushing the result into the database can also be a thread that consumes available results in the queue.

Note that such an approach is somewhat a hybrid between multithreading and multiprocessing. If you delegate the work to external processes (for example, using the run() function from the subprocess module), you are in fact doing work in multiple processes, so this has symptoms of multiprocessing. But in our scenario, we are waiting for the processing results in separate threads, so it is still mostly multithreading from the view of the Python code.

The other common use case for threads is performing multiple HTTP requests to external services. For instance, if you want to fetch multiple results from a distant web API, it could take a lot of time to do that synchronously. If you wait for every previous response before making new requests, you will spend a lot of time just waiting for the external service to respond and additional roundtrip time delays will be added to every such request. If you are communicating with an efficient service (Google Maps API, for instance), it is highly probable that it can serve most of your requests concurrently without affecting response times of separate requests. It is then reasonable to perform multiple queries in separate threads. Remember that when doing an HTTP request, most of time is spent on reading from the TCP socket. This is a blocking I/O operation, so CPython will release the GIL when performing the recv() C function. This allows for great improvements in your application's performance.

Multiuser applications

Threading is also used as a concurrency base for multiuser applications. For instance, a web server will push a user request into a new thread and then will become idle, waiting for new requests. Having a thread dedicated to each request simplifies a lot of work, but requires the developer to take care of locking the resources. But this is not a problem when all the shared data is pushed into a relational database that takes care of concurrency matters. So threads in a multi-user application act almost like separate independent processes. They are under the same process only to simplify their management at the application level.

For instance, a web server will be able to put all requests in a queue and wait for a thread to be available to send the work to it. Furthermore, it allows memory sharing that can boost some work and reduce the memory load. The two very popular Python WSGI-compliant webservers: Gunicorn (refer to http://gunicorn.org/) and uWSGI (refer to https://uwsgi-docs.readthedocs.org), allow you to serve HTTP requests with threaded workers in a way that generally follows this principle.

Using multithreading to enable concurrency in multiuser applications is less expensive than using multiprocessing. Separate processes cost more resources since a new interpreter needs to be loaded for each one of them. On the other hand, having too many threads is expensive too. We know that the GIL isn't such a problem for I/O extensive applications, but there is always a time where you will need to execute Python code. Since you cannot parallelize all of the application parts with bare threads, you will never be able to utilize all resources on machines with multicore CPUs and a single Python process. This is why often the optimal solution is a hybrid of multiprocessing and multithreading—multiple workers (processes) running with multiple threads. Fortunately, many of the WSGI-compliant web servers allow for such a setup.

But before you marry multithreading with multiprocessing, consider if such an approach is really worth all the cost. Such an approach uses multiprocessing for better resource utilization and additionally multithreading for more concurrency, which should be lighter than running multiple processes. But it does not need to be true. Maybe getting rid of threads and increasing the number of processes is not as expensive as you think? When choosing the best setup, you always need to do load testing of your application (see the Load and performance testing section in Chapter 10, Test-Driven Development). Also, as a side effect of using multiple threads, you get a less safe environment where shared memory creates a risk of data corruption or dreadful deadlock. Maybe a better alternative would be using some asynchronous approach with event loops, green threads, or coroutines. We will cover such solutions later in the Asynchronous programming section. Again, without sensible load testing and experimentation, you cannot really tell what approach will work best in your context.

An example of a threaded application

To see how Python threading works in practice, let's construct an example application that can take some benefit from implementing multithreading. We will discuss a simple problem that you may encounter from time to time in your professional practice—making multiple parallel HTTP queries. This problem was already mentioned as a common use case for multithreading.

Let's say we need to fetch data from some web service using multiple queries that cannot be batched into a single big HTTP request. As a realistic example, we will use geocoding endpoints from Google Maps API. The reasons for that choice are as follows:

  • It is very popular and a well-documented service
  • There is a free tier of this API that does not require any authentication keys
  • There is a python-gmaps package available on PyPI that allows you to interact with various Google Maps API endpoints and is extremely easy to use

Geocoding means simply the transformation of address or place into coordinates. We will try to geocode a predefined list of various cities into latitude/longitude tuples and display results on the standard output with python-gmaps. It is as simple as shown in the following code:

>>> from gmaps import Geocoding
>>> api = Geocoding()
>>> geocoded = api.geocode('Warsaw')[0]
>>> print("{:>25s}, {:6.2f}, {:6.2f}".format(
...         geocoded['formatted_address'],
...         geocoded['geometry']['location']['lat'],
...         geocoded['geometry']['location']['lng'],
...     ))
Warsaw, Poland,  52.23,  21.01

Since our goal is to show how a multithreaded solution to concurrent problems compares to standard synchronous solution, we will start with an implementation that does not use threads at all. Here is the code of a program that loops over the list of cities, queries the Google Maps API, and displays information about their addresses and coordinates in a text-formatted table:

import time

from gmaps import Geocoding

api = Geocoding()


PLACES = (
    'Reykjavik', 'Vien', 'Zadar', 'Venice',
    'Wrocław', 'Bolognia', 'Berlin', 'Słubice',
    'New York', 'Dehli',
)


def fetch_place(place):
    geocoded = api.geocode(place)[0]

    print("{:>25s}, {:6.2f}, {:6.2f}".format(
        geocoded['formatted_address'],
        geocoded['geometry']['location']['lat'],
        geocoded['geometry']['location']['lng'],
    ))


def main():
    for place in PLACES:
        fetch_place(place)

if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started

    print()
    print("time elapsed: {:.2f}s".format(elapsed))

Around the execution of the main() function, we added a few statements that are intended to measure how much time it took to finish the job. On my computer, this program usually takes around 2 to 3 seconds to complete its task:

$ python3 synchronous.py
       Reykjavík, Iceland,  64.13, -21.82
          Vienna, Austria,  48.21,  16.37
           Zadar, Croatia,  44.12,  15.23
            Venice, Italy,  45.44,  12.32
          Wrocław, Poland,  51.11,  17.04
           Bologna, Italy,  44.49,  11.34
          Berlin, Germany,  52.52,  13.40
          Slubice, Poland,  52.35,  14.56
        New York, NY, USA,  40.71, -74.01
    Dehli, Gujarat, India,  21.57,  73.22

time elapsed: 2.79s

Note

Every run of our script will always take a different amount of time because it mostly depends on a remote service accessible through a network connection. So there is a lot of nondeterministic factors affecting the final result. The best approach would be to make longer tests, repeat them multiple times, and also calculate some average from the measurements. But for the sake of simplicity, we won't do that. You will see later that this simplified approach is just enough for illustrational purposes.

Using one thread per item

Now it is time for improvement. We don't do a lot of processing in Python and the long execution time is caused by communication with the external service. We send an HTTP request to the server, it calculates the answer, and then we wait until the response is transferred back. There is a lot of I/O involved, so multithreading seems like a viable option. We can start all the requests at once in separate threads and then just wait until they receive data. If the service that we are communicating with is able to process our request concurrently, we should definitely see a performance improvement.

So let's start with the easiest approach. Python provides clean and easy to use abstraction over system threads with the threading module. The core of this standard library is the Thread class that represents a single thread instance. Here is a modified version of the main() function, which creates and starts a new thread for every place to geocode and then waits until all the threads finish:

from threading import Thread


def main():
    threads = []
    for place in PLACES:
        thread = Thread(target=fetch_place, args=[place])
        thread.start()
        threads.append(thread)

    while threads:
        threads.pop().join()

It is quick-and-dirty change that has some serious issues that we will try to address later. It approaches the problem in a bit of a frivolous way, and it is not a way to write reliable software that will serve thousands or millions of users. But hey, it works:

$ python3 threaded.py
          Wrocław, Poland,  51.11,  17.04
          Vienna, Austria,  48.21,  16.37
    Dehli, Gujarat, India,  21.57,  73.22
        New York, NY, USA,  40.71, -74.01
           Bologna, Italy,  44.49,  11.34
       Reykjavík, Iceland,  64.13, -21.82
           Zadar, Croatia,  44.12,  15.23
          Berlin, Germany,  52.52,  13.40
          Slubice, Poland,  52.35,  14.56
            Venice, Italy,  45.44,  12.32

time elapsed: 1.05s

So when we know that threads have a beneficial effect on our application, it is time to use them in a slightly saner way. First we need to identify the issues in the preceding code:

  • We start a new thread for every parameter. Thread initialization also takes some time but this minor overhead is not the only problem. Threads also consume other resources such as memory and file descriptors. Our example input has a strictly defined number of items, what if it did not have? You definitely don't want to run an unbound number of threads that depend on the arbitrary size of data input.
  • The fetch_place() function executed in threads calls the built-in print() function and in practice it is very unlikely that you would want to do that outside of the main application thread. At first, it is due to the fact how the standard output is buffered in Python. You can experience malformed output when multiple calls to this function interleave between threads. Also, the print() function is considered slow. If used recklessly in multiple threads, it can lead to serialization, which will undo all the benefits of multithreading.
  • Last but not least, by delegating every function call to a separate thread, we make it extremely hard to control the rate at which our input is processed. Yes, we want to do the job as fast as possible, but very often external services enforce hard limits on the rate of requests from a single client that they can process. Sometimes it is reasonable to design the program in a way that enables you to throttle the rate of processing, so your application won't be blacklisted by external APIs for abusing their usage limits.

Using a thread pool

The first issue we will try to solve is the unbound limit of threads that are run by our program. A good solution would be to build a pool of threaded workers with strictly defined sizes that will handle all the parallel work and communicate with workers through some thread-safe data structure. By using this thread pool approach, we will also make it easier to solve the two other problems that we just mentioned.

So the general idea is to start some predefined number of threads that will consume the work items from a queue until it is done. When there is no other work to do, the threads will return and we will be able to exit from the program. A good candidate for our structure to be used to communicate with the workers is the Queue class from the built-in queue module. It is a FIFO (First In First Out) queue implementation that is very similar to the deque collection from the collections module and was specifically designed to handle interthread communication. Here is a modified version of the main() function that starts only a limited number of worker threads with a new worker() function as a target, and communicates with them using a thread-safe queue:

from queue import Queue, Empty
from threading import Thread

THREAD_POOL_SIZE = 4


def worker(work_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            fetch_place(item)
            work_queue.task_done()


def main():
    work_queue = Queue()

    for place in PLACES:
        work_queue.put(place)

    threads = [
        Thread(target=worker, args=(work_queue,))
        for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

The result of running a modified version of our program is similar to the previous one:

$ python threadpool.py 
       Reykjavík, Iceland,  64.13, -21.82
            Venice, Italy,  45.44,  12.32
          Vienna, Austria,  48.21,  16.37
           Zadar, Croatia,  44.12,  15.23
          Wrocław, Poland,  51.11,  17.04
           Bologna, Italy,  44.49,  11.34
          Slubice, Poland,  52.35,  14.56
          Berlin, Germany,  52.52,  13.40
        New York, NY, USA,  40.71, -74.01
    Dehli, Gujarat, India,  21.57,  73.22

time elapsed: 1.20s

The run time will be slower than in a situation with one thread per argument, but at least now it is not possible to exhaust all the computing resources with an arbitrary long input. Also, we can tweak the THREAD_POOL_SIZE parameter a for better resource/time balance.

Using two-way queues

The other issue that we are now able to solve is the potentially problematic printing of the output in threads. It would be much better to leave such a responsibility to the main thread that started the other threads. We can handle that by providing another queue that will be responsible for collecting results from our workers. Here is the complete code that puts everything together with the main changes highlighted:

import time
from queue import Queue, Empty
from threading import Thread

from gmaps import Geocoding

api = Geocoding()


PLACES = (
    'Reykjavik', 'Vien', 'Zadar', 'Venice',
    'Wrocław', 'Bolognia', 'Berlin', 'Słubice',
    'New York', 'Dehli',
)

THREAD_POOL_SIZE = 4


def fetch_place(place):
    return api.geocode(place)[0]


def present_result(geocoded):
    print("{:>25s}, {:6.2f}, {:6.2f}".format(
        geocoded['formatted_address'],
        geocoded['geometry']['location']['lat'],
        geocoded['geometry']['location']['lng'],
    ))


def worker(work_queue, results_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            results_queue.put(
                fetch_place(item)
            )
            work_queue.task_done()


def main():
    work_queue = Queue()
    results_queue = Queue()

    for place in PLACES:
        work_queue.put(place)

    threads = [
        Thread(target=worker, args=(work_queue, results_queue))
        for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

    while not results_queue.empty():
        present_result(results_queue.get())


if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started

    print()
    print("time elapsed: {:.2f}s".format(elapsed))

This eliminates the risk of malformed output, which we could experience if the present_result() function does more print() statements or performs some additional computation. We don't expect any performance improvement from this approach with small inputs, but in fact we also reduce the risk of thread serialization due to slow print() execution. Here is our final output:

$ python threadpool_with_results.py 
          Vienna, Austria,  48.21,  16.37
       Reykjavík, Iceland,  64.13, -21.82
           Zadar, Croatia,  44.12,  15.23
            Venice, Italy,  45.44,  12.32
          Wrocław, Poland,  51.11,  17.04
           Bologna, Italy,  44.49,  11.34
          Slubice, Poland,  52.35,  14.56
          Berlin, Germany,  52.52,  13.40
        New York, NY, USA,  40.71, -74.01
    Dehli, Gujarat, India,  21.57,  73.22

time elapsed: 1.30s

Dealing with errors and rate limiting

The last of the issues mentioned earlier that you may experience when dealing with such problems are rate limits imposed by external service providers. In the case of the Google Maps API, at the time of writing this book, the official rate limit for free and non-authenticated requests is 10 requests per second and 2,500 requests per day. When using multiple threads, it is very easy to exhaust such a limit. The problem is even more serious due to the fact that we did not cover any failure scenarios yet, and dealing with exceptions in multithreaded Python code is a bit more complicated than usual.

The api.geocode() function will raise an exception when the client exceeds Google's rate and this is good news. But this exception is raised separately and will not crash the entire program. The worker thread will of course exit immediately, but the main thread will wait for all tasks stored on work_queue to be finished (with the work_queue.join() call). This means that our worker threads should gracefully handle possible exceptions and make sure that all items from the queue are processed. Without further improvement, we may end up in a situation where some of the worker threads crashed and the program will never exit.

Let's make some minor changes to our code in order to be prepared for any issues that may occur. In the case of exceptions in the worker thread, we may put an error instance in the results_queue queue and mark the current task as done, the same as we would do if there was no error. That way we make sure that the main thread won't lock indefinitely while waiting in work_queue.join(). The main thread might then inspect the results and re-raise any of the exceptions found on the results queue. Here are the improved versions of the worker() and main() functions that can deal with exceptions in a safer way:

def worker(work_queue, results_queue):
    while True:
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            try:
                result = fetch_place(item)
            except Exception as err:
                results_queue.put(err)
            else:
                results_queue.put(result)
            finally:
                work_queue.task_done()


def main():
    work_queue = Queue()
    results_queue = Queue()

    for place in PLACES:
        work_queue.put(place)

    threads = [
        Thread(target=worker, args=(work_queue, results_queue))
        for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

    while not results_queue.empty():
        result = results_queue.get()

        if isinstance(result, Exception):
            raise result

        present_result(result)

When we are ready to handle exceptions, it is time to break our code and exceed the rate limit. We can do that easily by modifying some initial conditions. Let's increase the number of places to geocode and the size of our thread pool:

PLACES = (
    'Reykjavik', 'Vien', 'Zadar', 'Venice',
    'Wrocław', 'Bolognia', 'Berlin', 'Słubice',
    'New York', 'Dehli',
) * 10

THREAD_POOL_SIZE = 10

If your execution environment is fast enough, you should get a similar error soon:

$ python3 threadpool_with_errors.py
        New York, NY, USA,  40.71, -74.01
          Berlin, Germany,  52.52,  13.40
          Wrocław, Poland,  51.11,  17.04
           Zadar, Croatia,  44.12,  15.23
          Vienna, Austria,  48.21,  16.37
           Bologna, Italy,  44.49,  11.34
       Reykjavík, Iceland,  64.13, -21.82
            Venice, Italy,  45.44,  12.32
    Dehli, Gujarat, India,  21.57,  73.22
          Slubice, Poland,  52.35,  14.56
          Vienna, Austria,  48.21,  16.37
           Zadar, Croatia,  44.12,  15.23
            Venice, Italy,  45.44,  12.32
       Reykjavík, Iceland,  64.13, -21.82
Traceback (most recent call last):
  File "threadpool_with_errors.py", line 83, in <module>
    main()
  File "threadpool_with_errors.py", line 76, in main
    raise result
  File "threadpool_with_errors.py", line 43, in worker
    result = fetch_place(item)
  File "threadpool_with_errors.py", line 23, in fetch_place
    return api.geocode(place)[0]
  File "...site-packagesgmapsgeocoding.py", line 37, in geocode
    return self._make_request(self.GEOCODE_URL, parameters, "results")
  File "...site-packagesgmapsclient.py", line 89, in _make_request
    )(response)
gmaps.errors.RateLimitExceeded: {'status': 'OVER_QUERY_LIMIT', 'results': [], 'error_message': 'You have exceeded your rate-limit for this API.', 'url': 'https://maps.googleapis.com/maps/api/geocode/json?address=Wroc%C5%82aw&sensor=false'}

The preceding exception is of course not the result of faulty code. This program simply is a bit too fast for this free service. It makes too many concurrent requests, and in order to work correctly, we need to have a way to limit their rate.

Limiting the pace of work is often called throttling. There are a few packages on PyPI that allow you to limit the rate of any kind of work and are really easy to use. But we won't use any external code here. Throttling is a good opportunity to introduce some locking primitives for threading, so we will try to build a solution from scratch.

The algorithm we will use is sometimes called token bucket and is very simple:

  1. There is a bucket with a predefined amount of tokens.
  2. Each token responds to a single permission to process one item of work.
  3. Each time the worker asks for a single or multiple tokens (permission):
    • We measure how much time was spent from the last time we refilled the bucket
    • If the time difference allows for it, we refill the bucket with the amount of tokens that respond to this time difference
    • If the amount of stored tokens is bigger or equal to the amount requested, we decrease the number of stored tokens and return that value
    • If the amount of stored tokens is less than requested, we return zero

The two important things are to always initialize the token bucket with zero tokens and never allow it to fill with more tokens that is available by its rate, expressed in tokens, as per our standard quant of time. If we don't follow these precautions, we can release the tokens in bursts that exceed the rate limit. Because in our situation the rate limit is expressed in requests per second, we don't need to deal with arbitrary quants of time. We assume that the base for our measurement is one second, so we will never store more tokens than the number of requests allowed for that quant of time. Here is an example implementation of the class that allows for throttling with a token bucket algorithm:

From threading import Lock

class Throttle:
    def __init__(self, rate):
        self._consume_lock = Lock()
        self.rate = rate
        self.tokens = 0
        self.last = 0

    def consume(self, amount=1):
        with self._consume_lock:
            now = time.time()
            
            # time measument is initialized on first
            # token request to avoid initial bursts
            if self.last == 0:
                self.last = now

            elapsed = now - self.last

            # make sure that quant of passed time is big
            # enough to add new tokens
            if int(elapsed * self.rate):
                self.tokens += int(elapsed * self.rate)
                self.last = now

            # never over-fill the bucket
            self.tokens = (
                self.rate
                if self.tokens > self.rate
                else self.tokens
            )

            # finally dispatch tokens if available
            if self.tokens >= amount:
                self.tokens -= amount
            else:
                amount = 0

            return amount

The usage of this class is very simple. Assume that we created only one instance of Throttle (with Throttle(10) for instance) in the main thread and passed it to every worker thread as a positional argument. Using the same data structure in different threads is safe because we guarded manipulation of its internal state with the instance of Lock class from the threading module. We can now update the worker() function implementation to wait with every item until throttle releases a new token:

def worker(work_queue, results_queue, throttle):
    while True:
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            while not throttle.consume():
                pass

            try:
                result = fetch_place(item)
            except Exception as err:
                results_queue.put(err)
            else:
                results_queue.put(result)
            finally:
                work_queue.task_done()
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset