Multiprocessing

Let's be honest, multithreading is challenging—we have already seen that in the previous section. It's a fact that the simplest approach to the problem required only minimal effort. But dealing with threads in a sane and safe manner required a tremendous amount of code.

We had to set up thread pool and communication queues, gracefully handle exceptions from threads, and also care about thread safety when trying to provide rate limiting capability. Tens lines of code only to execute one function from an external library in parallel! And we only assume that this is production-ready because there is a promise from the external package creator that his library is thread-safe. Sounds like a high price for a solution that is practically applicable only for doing I/O bound tasks.

An alternative approach that allows you to achieve parallelism is multiprocessing. Separate Python processes that do not constrain each other with GIL allow for better resource utilization. This is especially important for applications running on multicore processors that are performing really CPU-extensive tasks. Right now this is the only built-in concurrent solution available for Python developers (using the CPython interpreter) that allows you to take benefit from multiple processor cores.

The other advantage of using multiple processes is the fact that they do not share memory context. So it is harder to corrupt data and introduce deadlocks into your application. Not sharing the memory context means that you need some additional effort to pass the data between separate processes, but fortunately there are many good ways to implement reliable interprocess communication. In fact, Python provides some primitives that make communication between processes as easy as possible between threads.

The most basic way to start new processes in any programming language is usually by forking the program at some point. On POSIX systems (Unix, Mac OS, and Linux) a fork is a system call exposed in Python through the os.fork() function, which will create a new child process. The two processes then continue the program on their own right after forking. Here is an example script that forks itself exactly once:

import os

pid_list = []


def main():
    pid_list.append(os.getpid())
    child_pid = os.fork()

    if child_pid == 0:
        pid_list.append(os.getpid())
        print()
        print("CHLD: hey, I am the child process")
        print("CHLD: all the pids i know %s" % pid_list)

    else:
        pid_list.append(os.getpid())
        print()
        print("PRNT: hey, I am the parent")
        print("PRNT: the child is pid %d" % child_pid)
        print("PRNT: all the pids i know %s" % pid_list)


if __name__ == "__main__":
    main()

And here is an example of running it in a terminal:

$ python3 forks.py

PRNT: hey, I am the parent
PRNT: the child is pid 21916
PRNT: all the pids i know [21915, 21915]

CHLD: hey, I am the child process
CHLD: all the pids i know [21915, 21916]

Notice how both processes have exactly the same initial state of their data before the os.fork() call. They both have the same PID number (process identifier) as a first value of the pid_list collection. Later, both states diverge and we can see that the child process added the 21916 value while the parent duplicated its 21915 PID. This is because the memory contexts of these two processes are not shared. They have the same initial conditions but cannot affect each other after the os.fork() call.

After the fork memory context is copied to the child, each process deals with its own address space. To communicate, processes need to work with system-wide resources or use low-level tools such as signals.

Unfortunately, os.fork is not available under Windows, where a new interpreter needs to be spawned in order to mimic the fork feature. So it needs to be different depending on the platform. The os module also exposes functions that allow you to spawn new processes under Windows, but eventually you will use them rarely. This is also true for os.fork(). Python provides great a multiprocessing module that creates a high-level interface for multiprocessing. The great advantage of this module is that it provides some of the abstractions that we had to code from scratch in An example of a threaded application section. It allows you to limit the amount of boilerplate code, so it improves application maintainability and reduces its complexity. Surprisingly, despite its name, the multiprocessing module also exposes a similar interface for threads, so you will probably want to use the same interface for both approaches.

The built-in multiprocessing module

multiprocessing provides a portable way to work with processes as if they were threads.

This module contains a Process class that is very similar to the Thread class, and can be used on any platform:

from multiprocessing import Process
import os


def work(identifier):
    print(
        'hey, i am a process {}, pid: {}'
        ''.format(identifier, os.getpid())
    )


def main():
    processes = [
        Process(target=work, args=(number,))
        for number in range(5)
    ]
    for process in processes:
        process.start()
    
    while processes:
        processes.pop().join()


if __name__ == "__main__":
    main()

The preceding script, when executed, gives the following result:

$ python3 processing.py
hey, i am a process 1, pid: 9196
hey, i am a process 0, pid: 8356
hey, i am a process 3, pid: 9524
hey, i am a process 2, pid: 3456
hey, i am a process 4, pid: 6576

When the processes are created, the memory is forked (on POSIX systems). The most efficient usage of processes is to let them work on their own after they have been created to avoid overhead, and check on their states from the main thread. Besides the memory state that is copied, the Process class also provides an extra args argument in its constructor so that data can be passed along.

The communication between process modules requires some additional work because their local memory is not shared by default. To simplify this, the multiprocessing module provides a few ways of communication between processes:

  • Using the multiprocessing.Queue class, which is a near clone of queue.Queue, which was used earlier for communication between threads
  • Using multiprocessing.Pipe, which is a socket-like two-way communication channel
  • Using the multiprocessing.sharedctypes module, which allows you to create arbitrary C types (from the ctypes module) in a dedicated pool of memory that is shared between processes

The multiprocessing.Queue and queue.Queue classes have the same interface. The only difference is that the first is designed for use in multiple process environments, rather than with multiple threads, so it uses different internal transports and locking primitives. We already saw how to use Queue with multithreading in the An example of a threaded application section, so we won't do the same for multiprocessing. The usage stays exactly the same, so such an example would not bring anything new.

A more interesting pattern right now is provided by the Pipe class. It is a duplex (two-way) communication channel that is very similar in concept to Unix pipes. The interface of Pipe is also very similar to a simple socket from the built-in socket module. The difference from raw system pipes and sockets is that it allows you to send any pickable object (using the pickle module) instead of just raw bytes. This allows for a lot easier communication between processes because you can send almost any basic Python type:

from multiprocessing import Process, Pipe


class CustomClass:
    pass


def work(connection):
    while True:
        instance = connection.recv()

        if instance:
            print("CHLD: {}".format(instance))

        else:
            return


def main():
    parent_conn, child_conn = Pipe()

    child = Process(target=work, args=(child_conn,))

    for item in (
        42,
        'some string',
        {'one': 1},
        CustomClass(),
        None,
    ):
        print("PRNT: send {}:".format(item))
        parent_conn.send(item)
        
    child.start()
    child.join()


if __name__ == "__main__":
    main()

When looking at an example output of the preceding script, you will see that you can easily pass custom class instances and that they have different addresses depending on the process:

PRNT: send: 42
PRNT: send: some string
PRNT: send: {'one': 1}
PRNT: send: <__main__.CustomClass object at 0x101cb5b00>
PRNT: send: None
CHLD: recv: 42
CHLD: recv: some string
CHLD: recv: {'one': 1}
CHLD: recv: <__main__.CustomClass object at 0x101cba400>

The other way to share a state between processes is to use raw types in a shared memory pool with the classes provided in multiprocessing.sharedctypes. The most basic ones are Value and Array. Here is an example code from the official documentation of the multiprocessing module:

from multiprocessing import Process, Value, Array


def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]


if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

And this example will print the following output:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

When working with multiprocessing.sharedctypes, you need to remember that you are dealing with shared memory, so to avoid the risk of data corruption you need to use locking primitives. Multiprocessing provides some of the classes available in threading, such as Lock, RLock, and Semaphore, to do that. The downside of classes from sharedctypes is that they allow you only to share the basic C types from the ctypes module. If you need to pass more complex structures or class instances, you need to use Queue, Pipe, or other interprocess communication channels instead. In most cases, it is reasonable to avoid types from sharedctypes because they increase code complexity and bring all the dangers known from multithreading.

Using process pools

Using multiple processes instead of threads adds some substantial overhead. Mostly, it increases the memory footprint because each process has its own independent memory context. This means allowing for an unbound number of child processes is even more of a problematic issue than in multithreaded applications.

The best pattern to control resource usage in applications that rely on multiprocessing for better resource utilization is to build a process pool in a similar way as described for threads in the Using a thread pool section.

And the best thing about the multiprocessing module is that it provides a ready-to-use Pool class that handles all the complexity of managing multiple process workers for you. This pool implementation greatly reduces the amount of boilerplate required and the number of issues related to two-way communication. You also are not required to use the join() method manually, as Pool can be used as the context manager (using the with statement). Here is one of our previous threading examples rewritten to use the Pool class from the multiprocessing module:

from multiprocessing import Pool

from gmaps import Geocoding

api = Geocoding()


PLACES = (
    'Reykjavik', 'Vien', 'Zadar', 'Venice',
    'Wrocław', 'Bolognia', 'Berlin', 'Słubice',
    'New York', 'Dehli',
)

POOL_SIZE = 4


def fetch_place(place):
    return api.geocode(place)[0]


def present_result(geocoded):
    print("{:>25s}, {:6.2f}, {:6.2f}".format(
        geocoded['formatted_address'],
        geocoded['geometry']['location']['lat'],
        geocoded['geometry']['location']['lng'],
    ))


def main():
    with Pool(POOL_SIZE) as pool:
        results = pool.map(fetch_place, PLACES)

    for result in results:
        present_result(result)


if __name__ == "__main__":
    main()

As you can see, the code is now a lot shorter. It means that it is now easier to maintain and debug in case of issues. Actually, there are now only two lines of code that explicitly deal with multiprocessing. This is a great improvement over the situation where we had to build the processing pool from scratch. Now we don't even need to care about communication channels because they are created implicitly inside of the Pool implementation.

Using multiprocessing.dummy as a multithreading interface

The high-level abstractions from the multiprocessing module, such as the Pool class, are great advantages over the simple tools provided in the threading module. But no, it does not mean that multiprocessing is always a better approach than multithreading. There are a lot of use cases where threads may be a better solution than processes. This is especially true for situations where low latency and/or high resource efficiency is required.

But it does not mean that you need to sacrifice all the useful abstractions from the multiprocessing module whenever you want to use threads instead of processes. There is the multiprocessing.dummy module, which replicates the multiprocessing API but uses multiple threads instead of forking/spawning new processes.

This allows you to reduce the amount of boilerplate in your code and also make a more pluggable interface. For instance, let's take yet another look at our main() function from the previous examples. If we wanted to give the user control over which processing backend he wants to use (processes or threads), we could do that simply by replacing the Pool class:

from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool


def main(use_threads=False):
    if use_threads:
        pool_cls = ThreadPool
    else:
        pool_cls = ProcessPool

    with pool_cls(POOL_SIZE) as pool:
        results = pool.map(fetch_place, PLACES)

    for result in results:
        present_result(result)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset