Chapter 7. Async IO – Multithreading without Threads

The previous chapter showed us the basic implementation of synchronous coroutines. Whenever you are dealing with external resources, however, synchronous coroutines are a bad idea. Just a single stalling remote connection can cause your entire process to hang, unless you are using multiprocessing (explained in Chapter 13, Multiprocessing – When a Single CPU Core Is Not Enough) or asynchronous functions that is.

Asynchronous IO makes it possible to access external resources without having to worry about slowing down or stalling your application. Instead of actively waiting for results, the Python interpreter can simply continue with other tasks until it is needed again. This is very similar to the functioning of Node.js and AJAX calls in JavaScript. Within Python, we have seen libraries such as asyncore, gevent, and eventlet that have made this possible for years. With the introduction of the asyncio module, however, it has become significantly easier to use.

This chapter will explain how asynchronous functions can be used in Python (particularly 3.5 and above) and how code can be restructured in such a way that it still functions even though it doesn't follow the standard procedural coding pattern of returning values.

The following topics will be covered in this chapter:

  • Functions using:
    • async def
    • async for
    • async with
    • await
  • Parallel execution
  • Servers
  • Clients
  • Eventual results using Future

Introducing the asyncio library

The asyncio library was created to make asynchronous processing much easier and results more predictable. It was introduced with the purpose of replacing the asyncore module, which has been available for a very long time (since Python 1.5 in fact). The asyncore module was never very usable, which prompted the creation of the gevent and eventlet third-party libraries. Both gevent and eventlet make asynchronous programming much easier than asyncore ever did, but I feel that both have been made largely obsolete with the introduction of asyncio. Even though I have to admit that asyncio still has quite a few rough edges, it is in very active development, which makes me think that all the rough edges will soon be fixed by either the core Python library or third-party wrappers.

The asyncio library was officially introduced for Python 3.4, but a back port for Python 3.3 is available through the Python Package Index. With that in mind, while some portions of this chapter will be able to run on Python 3.3, most of it has been written with Python 3.5 and the newly introduced async and await keywords in mind.

The async and await statements

Before we continue with any example, it is important to know how the Python 3.4 and Python 3.5 code syntaxes relate. Even though the asyncio library was introduced only in Python 3.4, a large portion of the generic syntax has already been replaced in Python 3.5. Not forcefully, but the easier and therefore recommended syntax using async and await has been introduced.

Python 3.4

For the traditional Python 3.4 usage, a few things need to be considered:

  • Functions should be declared using the asyncio.coroutine decorator
  • Asynchronous results should be fetched using yield from coroutine()
  • Asynchronous loops are not directly supported but can be emulated using while True: yield from coroutine()

Here is an example:

import asyncio


@asyncio.coroutine
def sleeper():
    yield from asyncio.sleep(1)

Python 3.5

In Python 3.5, a new syntax was introduced to mark a function as asynchronous. Instead of the asyncio.coroutine decorator, the async keyword can be used. Also, instead of the confusing yield from syntax, Python now supports the await statement. The yield from statement was slightly confusing because it might give someone the idea that a value is being exchanged, which is not always the case.

The following is the async statement:

async def some_coroutine():
    pass

It can be used instead of the decorator:

import asyncio


@asyncio.coroutine
def some_coroutine():
    pass

Within Python 3.5, and most likely in future versions as well, the coroutine decorator will still be supported, but if backwards compatibility is not an issue, I strongly recommend the new syntax.

Additionally, instead of the yield from statement, we can use the much more logical await statement. So, the example from the previous paragraph becomes as simple as the following:

import asyncio


async def sleeper():
    await asyncio.sleep(1)

The yield from statement originated from the original coroutines implementation in Python and was a logical extension from the yield statement used within synchronous coroutines. Actually, the yield from statement still works and the await statement is just a wrapper for it, with some added checks. While using await, the interpreter checks whether the object is an awaitable object, meaning it needs to be one of the following:

  • A native coroutine created with the async def statement
  • A coroutine created with the asyncio.coroutine decorator
  • An object that implements the __await__ method

This check alone makes the await statement preferable over the yield from statement, but I personally think that await conveys the meaning of the statement much better as well.

To summarize, to convert to the new syntax, make the following changes:

  • Functions should be declared using async def instead of def
  • Asynchronous results should be fetched using await coroutine()
  • Asynchronous loops can be created using async for ... in ...
  • Asynchronous with statements can be created using async with ...

Choosing between the 3.4 and 3.5 syntax

Unless you really need Python 3.3 or 3.4 support, I would strongly recommend the Python 3.5 syntax. The new syntax is clearer and supports more features, such as asynchronous for loops and with statements. Unfortunately, they are not fully compatible, so you need to make a choice. Within an async def (3.5), we cannot use yield from, but all we need to do to fix that is replace yield from with await.

A simple example of single-threaded parallel processing

Parallel processing has many uses: a server taking care of multiple requests at the same time, speeding up heavy tasks, waiting for external resources, and much more. Generic coroutines can help with handling multiple requests and external resources in some cases, but they are still synchronous and therefore limited. With asyncio, we can transcend the limitations of generic coroutines and easily handle stalling resources without having to worry about blocking the main thread. Let's see a quick example of how the code does not stall with multiple parallel functions:

>>> import asyncio


>>> async def sleeper(delay):
...     await asyncio.sleep(delay)
...     print('Finished sleeper with delay: %d' % delay)


>>> loop = asyncio.get_event_loop()
>>> results = loop.run_until_complete(asyncio.wait((
...     sleeper(1),
...     sleeper(3),
...     sleeper(2),
... )))
Finished sleeper with delay: 1
Finished sleeper with delay: 2
Finished sleeper with delay: 3

Even though we started the sleepers with the order of 1, 3, 2, which sleeps for that amount of time, asyncio.sleep combined with the await statement actually tells Python that it should just continue with a task that needs actual processing at this time. A regular time.sleep would actually stall the Python task, meaning they would execute sequentially. This makes it somewhat more obviously transparent what these can be used for, as it handles any type of wait, which we can hand off to asyncio instead of keeping the entire Python thread busy. So, instead of while True: fh.read(), we can just respond whenever there is new data.

Let's analyze the components used in this example:

  • asyncio.coroutine: This decorator enables yielding from async def coroutines. Unless you are using this syntax, there is no real need for the decorator, but it's a good default if only used as documentation.
  • asyncio.sleep: This is the asynchronous version of time.sleep. The big difference between these two is that time.sleep will keep the Python process busy while it is sleeping, whereas asyncio.sleep will allow switching to a different task within the event loop. This process is very similar to the workings of task switching in most operating systems.
  • asyncio.get_event_loop: The default event loop is effectively the asyncio task switcher; we'll explain more about these in the next paragraph.
  • asyncio.wait: This is the coroutine for wrapping a sequence of coroutines or futures and waiting for the results. The wait time is configurable, as is the manner of waiting (first done, all done, or the first exception).

That should explain the basic workings of the example: the sleeper function is the asynchronous coroutine, which exits after the given delay. The wait function waits for all coroutines to finish before exiting, and the event loop is used for switching between the three coroutines.

Concepts of asyncio

The asyncio library has several basic concepts, which have to be explained before we venture further into examples and uses. The example shown in the previous paragraph actually used most of them, but a little explanation about the how and the why might still be useful.

The main concepts of asyncio are coroutines and event loops. Within them, there are several helper classes available, such as Streams, Futures, and Processes. The next few paragraphs will explain the basics so that you can understand the implementations in the examples in the later paragraphs.

Futures and tasks

The asyncio.Future class is essentially a promise of a result; it returns the results if they are available, and once it receives results, it will pass them along to all the registered callbacks. It maintains a state variable internally, which allows an outside party to mark a future as canceled. The API is very similar to the concurrent.futures.Future class, but since they are not fully compatible, make sure you do not confuse the two.

The Future class by itself is not that convenient to use though, so that is where asyncio.Task comes in. The Task class wraps a coroutine and automatically handles the execution, results, and state for you. The coroutine will be executed through the given event loop, or the default event loop if none was given.

The creation of these classes is not something you need to worry about directly. This is because instead of creating the class yourself, the recommended way is through either asyncio.ensure_future or loop.create_task. The former actually executes loop.create_task internally but it is more convenient if you simply want to execute it on the main/default event loop without having to specify it first. The usage is simple enough. To create your own future manually, you simply tell the event loop to execute create_task for you. The following example is a bit complicated because of all the setup code but the usage of C should be clear enough. The most important aspect to note is that the event loop should be linked so that the task knows how/where to run:

>>> import asyncio


>>> async def sleeper(delay):
...     await asyncio.sleep(delay)
...     print('Finished sleeper with delay: %d' % delay)

# Create an event loop
>>> loop = asyncio.get_event_loop()

# Create the task
>>> result = loop.call_soon(loop.create_task, sleeper(1))

# Make sure the loop stops after 2 seconds
>>> result = loop.call_later(2, loop.stop)

# Start the loop and make it run forever. Or at least until the loop.stop gets
# called in 2 seconds.
>>> loop.run_forever()
Finished sleeper with delay: 1

Now, a little bit about debugging asynchronous functions. Debugging asynchronous functions used to be very difficult if not impossible, as there was no good way to see where and how the functions were stalling. Luckily, that has changed. In the case of the Task class, it is as simple as calling task.get_stack or task.print_stack to see where it is currently. The usage can be as simple as the following:

>>> import asyncio


>>> async def stack_printer():
...     for task in asyncio.Task.all_tasks():
...         task.print_stack()

# Create an event loop
>>> loop = asyncio.get_event_loop()

# Create the task
>>> result = loop.run_until_complete(stack_printer())

Event loops

The concept of event loops is actually the most important one within asyncio. You might have suspected that the coroutines themselves are what everything is about, but without the event loop, they are useless. Event loops function as task switchers, just the way operating systems switch between active tasks on the CPU. Even with multicore processors, there is still a need for a main process to tell the CPU which tasks have to run and which need to wait/sleep for a while. This is exactly what the event loop does: it decides which task to run.

Event loop implementations

So far, we have only seen asyncio.get_event_loop, which returns the default event loop with the default event loop policy. Currently, there are two bundled event loop implementations: the async.SelectorEventLoop and async.ProactorEventLoop implementations. Which of the two is available depends on your operating system. The latter event loop is available only on Windows machines and uses I/O Completion Ports, which is a system that is supposedly faster and more efficient than the Select implementation of asyncio.SelectorEventLoop. This is something to consider if performance is an issue. The usage is simple enough, luckily:

import asyncio


loop = asyncio.ProActorEventLoop()
asyncio.set_event_loop(loop)

The alternative event loop is based on selectors, which, since Python 3.4, are available through the selectors module in the core Python installation. The selectors module was introduced in Python 3.4 to enable easy access to low-level asynchronous I/O operations. Basically, it allows you to open and read from many files by using I/O multiplexing. Since asyncio handles all complexities for you, there is generally no need to use the module directly, but the usage is simple enough if you need it. Here's an example of binding a function to the read event (EVENT_READ) on the standard input. The code will simply wait until one of the registered files provides new data:

import sys
import selectors


def read(fh):
    print('Got input from stdin: %r' % fh.readline())

if __name__ == '__main__':
    # Create the default selector
    selector = selectors.DefaultSelector()

    # Register the read function for the READ event on stdin
    selector.register(sys.stdin, selectors.EVENT_READ, read)

    while True:
        for key, mask in selector.select():
            # The data attribute contains the read function here
            callback = key.data
            # Call it with the fileobj (stdin here)
            callback(key.fileobj)

There are several selectors available, such as the traditional selectors.SelectSelector (which uses select.select internally), but there are also more modern solutions such as selectors.KqueueSelector, selectors.EpollSelector, and selectors.DevpollSelector. Even though it should select the most efficient selector by default, there are cases where the most efficient one is not suitable in some way or another. In those cases, the selector event loop allows you to specify a different selector:

import asyncio
import selectors


selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)

It should be noted that the differences between these selectors are generally too small to notice in most real-world applications. The only situation I have come across where such an optimization makes a difference is when building a server that has to handle a lot of simultaneous connections. With "a lot," I am referring to over 100,000 concurrent connections on a single server, which is a problem only a few people on this planet have had to deal with.

Event loop policies

Event loop policies are objects that create and store the actual event loops for you. They have been written with maximum flexibility in mind but are not objects that you often need to modify. The only reason I can think of modifying the event loop policy is if you want to make specific event loops run on specific processors and/or systems, or if you wish to change the default event loop type. Beyond that, it offers more flexibility than most people will ever need. Making your own event loop (ProActorEventLoop in this case) the default is simply possible through this code:

import asyncio


class ProActorEventLoopPolicy(
        asyncio.events.BaseDefaultEventLoopPolicy):
    _loop_factory = asyncio.SelectorEventLoop


policy = ProActorEventLoopPolicy()
asyncio.set_event_loop_policy(policy)

Event loop usage

So far, we have only seen the loop.run_until_complete method. Naturally, there are a few others as well. The one you will most likely use most often is loop.run_forever. This method, as you might expect, keeps running forever, or at least until loop.stop has been run.

So, assuming we have an event loop running forever now, we need to add tasks to it. This is where things get interesting. There are quite a few choices available within the default event loops:

  • call_soon: Add an item to the end of the (FIFO) queue so that the functions will be executed in the order in which they were inserted.
  • call_soon_threadsafe: This is the same as call_soon except for being thread safe. The call_soon method is not thread safe because thread safety requires the usage of the global interpreter lock (GIL), which effectively makes your program single threaded at the moment of thread safety. The performance chapter will explain this more thoroughly.
  • call_later: Call the function after the given number of seconds. If two jobs would run at the same time, they will run in an undefined order. Note that the delay is a minimum. If the event loop is locked/busy, it can run later.
  • call_at: Call a function at a specific time related to the output of loop.time. Every integer after loop.time adds a second.

All of these functions return asyncio.Handle objects. These objects allow the cancellation of the task through the handle.cancel function as long as it has not been executed yet. Be careful with canceling from other threads, however, as cancellation is not thread safe either. To execute it in a thread-safe way, we have to execute the cancellation function as a task as well: loop.call_soon_threadsafe(handle.cancel). The following is an example usage:

>>> import time
>>> import asyncio


>>> t = time.time()

>>> def printer(name):
...     print('Started %s at %.1f' % (name, time.time() - t))
...     time.sleep(0.2)
...     print('Finished %s at %.1f' % (name, time.time() - t))


>>> loop = asyncio.get_event_loop()
>>> result = loop.call_at(loop.time() + .2, printer, 'call_at')
>>> result = loop.call_later(.1, printer, 'call_later')
>>> result = loop.call_soon(printer, 'call_soon')
>>> result = loop.call_soon_threadsafe(printer, 'call_soon_threadsafe')

>>> # Make sure we stop after a second
>>> result = loop.call_later(1, loop.stop)

>>> loop.run_forever()
Started call_soon at 0.0
Finished call_soon at 0.2
Started call_soon_threadsafe at 0.2
Finished call_soon_threadsafe at 0.4
Started call_later at 0.4
Finished call_later at 0.6
Started call_at at 0.6
Finished call_at at 0.8

You might be wondering why we are not using the coroutine decorator here. The reason is that the loop won't allow running of coroutines directly. To run a coroutine through these call functions, we need to make sure that it is wrapped in an asyncio.Task. As we have seen in the previous paragraph, this is easy enough—luckily:

>>> import time
>>> import asyncio


>>> t = time.time()

>>> async def printer(name):
...     print('Started %s at %.1f' % (name, time.time() - t))
...     await asyncio.sleep(0.2)
...     print('Finished %s at %.1f' % (name, time.time() - t))


>>> loop = asyncio.get_event_loop()

>>> result = loop.call_at(
...     loop.time() + .2, loop.create_task, printer('call_at'))
>>> result = loop.call_later(.1, loop.create_task,
...     printer('call_later'))
>>> result = loop.call_soon(loop.create_task,
...     printer('call_soon'))

>>> result = loop.call_soon_threadsafe(
...     loop.create_task, printer('call_soon_threadsafe'))

>>> # Make sure we stop after a second
>>> result = loop.call_later(1, loop.stop)

>>> loop.run_forever()
Started call_soon at 0.0
Started call_soon_threadsafe at 0.0
Started call_later at 0.1
Started call_at at 0.2
Finished call_soon at 0.2
Finished call_soon_threadsafe at 0.2
Finished call_later at 0.3
Finished call_at at 0.4

These call methods might appear slightly different but the internals actually boil down to two queues that are implemented through heapq. The loop._scheduled is used for scheduled operations and loop._ready is for immediate execution. When the _run_once method is called (the run_forever method wraps this method in a while True loop), the loop will first try to process all items in the loop._ready heap with the specific loop implementation (for example, SelectorEventLoop). Once everything in loop._ready is processed, the loop will continue to move items from the loop._scheduled heap to the loop._ready heap if they are due.

Both call_soon and call_soon_threadsafe write to the loop._ready heap. And the call_later method is simply a wrapper for call_at with the current value of asyncio.time added to the scheduled time, which writes to the loop._scheduled heap.

The result of this method of processing is that everything added through the call_soon* methods will always execute after everything that is added through the call_at/call_later methods.

As for the ensure_futures function, it will call loop.create_task internally to wrap the coroutine in a Task object, which is, of course, a subclass of a Future object. If you need to extend the Task class for some reason, that is easily possible through the loop.set_task_factory method.

Depending on the type of event loop, there are actually many other methods for creating connections, file handlers, and more. Those will be explained by example in later paragraphs, since they have less to do with the event loop and are more about programming with coroutines.

Processes

So far, we have simply executed specifically asynchronous Python functions, but some things are a tad more difficult to run asynchronously within Python. For example, let's assume we have a long-running external application that we wish to run. The subprocess module would be the standard approach for running external applications, and it works quite well. With a bit of care, one could even make sure that these do not block the main thread by polling the output. That still requires polling, however. Yet, won't events be better so that we can do other things while we are waiting for the results? Luckily, this is easily arranged through asyncio.Process. Similar to the Future and Task classes, this class is meant to be created through the event loop. In terms of usage, the class is very similar to the subprocess.Popen class, except that the functions have been made asynchronous. This results in the removal of the polling function, of course.

First, let's look at the traditional sequential version:

>>> import time
>>> import subprocess
>>>
>>>
>>> t = time.time()
>>>
>>>
>>> def process_sleeper():
...     print('Started sleep at %.1f' % (time.time() - t))
...     process = subprocess.Popen(['sleep', '0.1'])
...     process.wait()
...     print('Finished sleep at %.1f' % (time.time() - t))
...
>>>
>>> for i in range(3):
...     process_sleeper()
Started sleep at 0.0
Finished sleep at 0.1
Started sleep at 0.1
Finished sleep at 0.2
Started sleep at 0.2
Finished sleep at 0.3

Since everything is executed sequentially, it takes three times the 0.1 seconds that the sleep command is sleeping. So, instead of waiting for all of them at the same time, let's run them in parallel this time:

>>> import time
>>> import subprocess


>>> t = time.time()


>>> def process_sleeper():
...     print('Started sleep at %.1f' % (time.time() - t))
...     return subprocess.Popen(['sleep', '0.1'])
...
>>>
>>> processes = []
>>> for i in range(5):
...     processes.append(process_sleeper())
Started sleep at 0.0
Started sleep at 0.0
Started sleep at 0.0
Started sleep at 0.0
Started sleep at 0.0

>>> for process in processes:
...     returncode = process.wait()
...     print('Finished sleep at %.1f' % (time.time() - t))
Finished sleep at 0.1
Finished sleep at 0.1
Finished sleep at 0.1
Finished sleep at 0.1
Finished sleep at 0.1

While this looks a lot better in terms of runtime, our program structure is a bit messy now. We needed two loops, one to start the processes and one to measure the finish time. Moreover, we had to move the print statement outside of the function, which is generally not desirable either. This time, we will try the asyncio version:

>>> import time
>>> import asyncio


>>> t = time.time()


>>> async def async_process_sleeper():
...     print('Started sleep at %.1f' % (time.time() - t))
...     process = await asyncio.create_subprocess_exec('sleep', '0.1')
...     await process.wait()
...     print('Finished sleep at %.1f' % (time.time() - t))


>>> loop = asyncio.get_event_loop()
>>> for i in range(5):
...     task = loop.create_task(async_process_sleeper())

>>> future = loop.call_later(.5, loop.stop)

>>> loop.run_forever()
Started sleep at 0.0
Started sleep at 0.0
Started sleep at 0.0
Started sleep at 0.0
Started sleep at 0.0
Finished sleep at 0.1
Finished sleep at 0.1
Finished sleep at 0.1
Finished sleep at 0.1
Finished sleep at 0.1

As you can see, it is easy to run multiple applications at the same time this way. But that is the easy part; the difficult part with processes is interactive input and output. The asyncio module has several measures to make it easier, but it can still be difficult when actually working with the results. Here's an example of calling the Python interpreter, executing some code, and exiting again:

import asyncio


async def run_script():
    process = await asyncio.create_subprocess_shell(
        'python3',
        stdout=asyncio.subprocess.PIPE,
        stdin=asyncio.subprocess.PIPE,
    )

    # Write a simple Python script to the interpreter
    process.stdin.write(b'
'.join((
        b'import math',
        b'x = 2 ** 8',
        b'y = math.sqrt(x)',
        b'z = math.sqrt(y)',
        b'print("x: %d" % x)',
        b'print("y: %d" % y)',
        b'print("z: %d" % z)',
        b'for i in range(int(z)):',
        b'    print("i: %d" % i)',
    )))
    # Make sure the stdin is flushed asynchronously
    await process.stdin.drain()
    # And send the end of file so the Python interpreter will
    # start processing the input. Without this the process will
    # stall forever.
    process.stdin.write_eof()

    # Fetch the lines from the stdout asynchronously
    async for out in process.stdout:
        # Decode the output from bytes and strip the whitespace
        # (newline) at the right
        print(out.decode('utf-8').rstrip())

    # Wait for the process to exit
    await process.wait()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run_script())
    loop.close()

The code is simple enough, but there are a few parts of this code that are not obvious to us and yet required to function. While the creation of the subprocess and the writing code is quite obvious, you might be wondering about the process.stdin.write_eof() line. The problem here is buffering. To improve performance, most programs will buffer input and output by default. In the case of the Python program, the result is that unless we send the end of file (eof), the program will keep waiting for more input. An alternative solution would be to close the stdin stream or somehow communicate with the Python program that we will not send any more input. However, it is certainly something to take into consideration. Another option is to use yield from process.stdin.drain(), but that only takes care of the sending side of the code; the receiving side might still be waiting for more input. Let's see the output though:

# python3 processes.py
x: 256
y: 16
z: 4
i: 0
i: 1
i: 2
i: 3

With this implementation, we still need a loop to get all the results from the stdout stream. Unfortunately, the asyncio.StreamReader (which process.stdout is) class does not support the async for syntax yet. If it did, a simple async for out in process.stdout would have worked. A simple yield from process.stdout.read() would have worked as well, but reading per line is generally more convenient to use.

If possible, I recommend that you abstain from using stdin to send data to subprocesses and instead use some network, pipe, or file communication. As we will see in the next paragraphs, these are much more convenient to handle.

Asynchronous servers and clients

One of the most common reason for stalling scripts and applications is the usage of remote resources. With asyncio, at least a large portion of that is easily fixable. Fetching multiple remote resources and serving to multiple clients is quite a bit easier and more lightweight than it used to be. While both multithreading and multiprocessing can be used for these cases as well, asyncio is a much lighter alternative and it is actually easier to manage. There are two main methods of creating clients and servers. The coroutine way is to use asyncio.open_connection and asyncio.start_server. The class-based approach requires you to inherit the asyncio.Protocol class. While these are essentially the same thing, the workings are slightly different.

Basic echo server

The basic client and server versions are simple enough to write. The asyncio module takes care of all the low-level connection handling, leaving us with only the requirement of connecting the correct methods. For the server, we need a method to handle the incoming connections, and for the client, we need a function to create connections. And to illustrate what is happening and at which point in time, we will add a dedicated print function that prints both the time since the server process was started and the given arguments:

import time
import sys
import asyncio


HOST = '127.0.0.1'
PORT = 1234


start_time = time.time()


def printer(start_time, *args, **kwargs):
    '''Simple function to print a message prefixed with the
    time relative to the given start_time'''
    print('%.1f' % (time.time() - start_time), *args, **kwargs)


async def handle_connection(reader, writer):
    client_address = writer.get_extra_info('peername')
    printer(start_time, 'Client connected', client_address)

    # Send over the server start time to get consistent
    # timestamps
    writer.write(b'%.2f
' % start_time)
    await writer.drain()

    repetitions = int((await reader.readline()))
    printer(start_time, 'Started sending to', client_address)

    for i in range(repetitions):
        message = 'client: %r, %d
' % (client_address, i)
        printer(start_time, message, end='')
        writer.write(message.encode())
        await writer.drain()

    printer(start_time, 'Finished sending to', client_address)
    writer.close()


async def create_connection(repetitions):
    reader, writer = await asyncio.open_connection(
        host=HOST, port=PORT)

    start_time = float((await reader.readline()))

    writer.write(repetitions.encode() + b'
')
    await writer.drain()

    async for line in reader:
        # Sleeping a little to emulate processing time and make
        # it easier to add more simultaneous clients
        await asyncio.sleep(1)

        printer(start_time, 'Got line: ', line.decode(),
                end='')

    writer.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    if sys.argv[1] == 'server':
        server = asyncio.start_server(
            handle_connection,
            host=HOST,
            port=PORT,
        )
        running_server = loop.run_until_complete(server)

        try:
            result = loop.call_later(5, loop.stop)
            loop.run_forever()
        except KeyboardInterrupt:
            pass

        running_server.close()
        loop.run_until_complete(running_server.wait_closed())
    elif sys.argv[1] == 'client':
        loop.run_until_complete(create_connection(sys.argv[2]))

    loop.close()

Now we will run the server and two simultaneous clients. Since these run in parallel, the server output is a bit strange, of course. Because of that, we synchronize the start time from the server to the clients and prefix all print statements with the number of seconds since the server was started.

The server:

# python3 simple_connections.py server
0.4 Client connected ('127.0.0.1', 59990)
0.4 Started sending to ('127.0.0.1', 59990)
0.4 client: ('127.0.0.1', 59990), 0
0.4 client: ('127.0.0.1', 59990), 1
0.4 client: ('127.0.0.1', 59990), 2
0.4 Finished sending to ('127.0.0.1', 59990)
2.0 Client connected ('127.0.0.1', 59991)
2.0 Started sending to ('127.0.0.1', 59991)
2.0 client: ('127.0.0.1', 59991), 0
2.0 client: ('127.0.0.1', 59991), 1
2.0 Finished sending to ('127.0.0.1', 59991)

The first client:

# python3 simple_connections.py client 3
1.4 Got line:  client: ('127.0.0.1', 59990), 0
2.4 Got line:  client: ('127.0.0.1', 59990), 1
3.4 Got line:  client: ('127.0.0.1', 59990), 2

The second client:

# python3 simple_connections.py client 2
3.0 Got line:  client: ('127.0.0.1', 59991), 0
4.0 Got line:  client: ('127.0.0.1', 59991), 1

Since both the input and output have buffers, we need to manually drain the input after writing and use yield from when reading the output from the other party. That is exactly the reason that communication with regular external processes is more difficult than network interaction. The standard input for processes is more focused towards user input than computer input, which makes it less convenient to use.

Note

If you wish to use reader.read(BUFFER) instead of reader.readline(), that's also possible. Just note that you need to specifically separate the data because it might accidently get appended otherwise. All write operations write to the same buffer, resulting in one long return stream. On the other hand, trying to write without a new line ( ) for reader.readline() to recognize will cause the client to wait forever.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset