Coroutines are subroutines that offer non-pre-emptive multitasking through multiple entry points. The basic premise is that coroutines allow two functions to communicate with each other while running. Normally, this type of communication is reserved only for multitasking solutions, but coroutines offer a relatively simple way of achieving this at almost no added performance cost.
Since generators are lazy by default, the working of coroutines is fairly obvious. Until a result is consumed, the generator sleeps; but while consuming a result, the generator becomes active. The difference between regular generators and coroutines is that coroutines don't simply return values to the calling function but can receive values as well.
In the previous paragraphs, we saw how regular generators can yield values. But that's not all that generators can do. They can actually receive values as well. The basic usage is fairly simple:
>>> def generator(): ... value = yield 'spam' ... print('Generator received: %s' % value) ... yield 'Previous value: %r' % value >>> g = generator() >>> print('Result from generator: %s' % next(g)) Result from generator: spam >>> print(g.send('eggs')) Generator received: eggs Previous value: 'eggs'
And that's all there is to it. The function is frozen until the send
method is called, at which point it will process up to the next yield
statement.
Since generators are lazy, you can't just send a value to a brand new generator. Before a value can be sent to the generator, either a result must be fetched using next()
or a send(None)
has to be issued so that the code is actually reached. The need for this is understandable but a bit tedious at times. Let's create a simple decorator to omit the need for this:
>>> import functools >>> def coroutine(function): ... @functools.wraps(function) ... def _coroutine(*args, **kwargs): ... active_coroutine = function(*args, **kwargs) ... next(active_coroutine) ... return active_coroutine ... return _coroutine >>> @coroutine ... def spam(): ... while True: ... print('Waiting for yield...') ... value = yield ... print('spam received: %s' % value) >>> generator = spam() Waiting for yield... >>> generator.send('a') spam received: a Waiting for yield... >>> generator.send('b') spam received: b Waiting for yield...
As you've probably noticed, even though the generator is still lazy, it now automatically executes all of the code until it reaches the yield
statement again. At that point, it will stay dormant until new values are sent.
Unlike regular generators, which simply exit as soon as the input sequence is exhausted, coroutines generally employ infinite while
loops, which means that they won't be torn down the normal way. That's why coroutines also support both close
and throw
methods, which will exit the function. The important thing here is not the closing but the possibility of adding a teardown method. Essentially, it is very comparable to how context wrappers function with an __enter__
and __exit__
method, but with coroutines in this case:
@coroutine def simple_coroutine(): print('Setting up the coroutine') try: while True: item = yield print('Got item: %r' % item) except GeneratorExit: print('Normal exit') except Exception as e: print('Exception exit: %r' % e) raise finally: print('Any exit') print('Creating simple coroutine') active_coroutine = simple_coroutine() print() print('Sending spam') active_coroutine.send('spam') print() print('Close the coroutine') active_coroutine.close() print() print('Creating simple coroutine') active_coroutine = simple_coroutine() print() print('Sending eggs') active_coroutine.send('eggs') print() print('Throwing runtime error') active_coroutine.throw(RuntimeError, 'Oops...') print()
This generates the following output, which should be as expected—no strange behavior but simply two methods of exiting a coroutine:
# python3 H06.py Creating simple coroutine Setting up the coroutine Sending spam Got item: 'spam' Close the coroutine Normal exit Any exit Creating simple coroutine Setting up the coroutine Sending eggs Got item: 'eggs' Throwing runtime error Exception exit: RuntimeError('Oops...',) Any exit Traceback (most recent call last): ... File ... in <module> active_coroutine.throw(RuntimeError, 'Oops...') File ... in simple_coroutine item = yield RuntimeError: Oops...
In the previous paragraphs, we saw pipelines; they process the output sequentially and one-way. However, there are cases where this is simply not enough—times where you need a pipe that not only sends values to the next pipe but also receives information back from the sub-pipe. Instead of always having a single list that is processed, we can maintain the state of the generator between executions this way. So, let's start by converting the earlier pipelines to coroutines. First, the lines.txt
file again:
spam eggs spam spam eggs eggs spam spam spam eggs eggs eggs
Now, the coroutine pipeline. The functions are the same as before but using coroutines instead:
>>> @coroutine ... def replace(search, replace): ... while True: ... item = yield ... print(item.replace(search, replace)) >>> spam_replace = replace('spam', 'bacon') >>> for line in open('lines.txt'): ... spam_replace.send(line.rstrip()) bacon eggs bacon bacon eggs eggs bacon bacon bacon eggs eggs eggs
Given this example, you might be wondering why we are now printing the value instead of yielding it. Well! We can, but remember that generators freeze until a value is yielded. Let's see what would happen if we simply yield
the value instead of calling print
. By default, you might be tempted to do this:
>>> @coroutine ... def replace(search, replace): ... while True: ... item = yield ... yield item.replace(search, replace) >>> spam_replace = replace('spam', 'bacon') >>> spam_replace.send('spam') 'bacon' >>> spam_replace.send('spam spam') >>> spam_replace.send('spam spam spam') 'bacon bacon bacon'
Half of the values have disappeared now, so the question is, "Where did they go?" Notice that the second yield
isn't storing the results. That's where the values are disappearing. We need to store those as well:
>>> @coroutine ... def replace(search, replace): ... item = yield ... while True: ... item = yield item.replace(search, replace) >>> spam_replace = replace('spam', 'bacon') >>> spam_replace.send('spam') 'bacon' >>> spam_replace.send('spam spam') 'bacon bacon' >>> spam_replace.send('spam spam spam') 'bacon bacon bacon'
But even this is far from optimal. We are essentially using coroutines to mimic the behavior of generators right now. Although it works, it's just a tad silly and not all that clear. Let's make a real pipeline this time where the coroutines send the data to the next coroutine (or coroutines) and actually show the power of coroutines by sending the results to multiple coroutines:
# Grep sends all matching items to the target >>> @coroutine ... def grep(target, pattern): ... while True: ... item = yield ... if pattern in item: ... target.send(item) # Replace does a search and replace on the items and sends it to # the target once it's done >>> @coroutine ... def replace(target, search, replace): ... while True: ... target.send((yield).replace(search, replace)) # Print will print the items using the provided formatstring >>> @coroutine ... def print_(formatstring): ... while True: ... print(formatstring % (yield)) # Tee multiplexes the items to multiple targets >>> @coroutine ... def tee(*targets): ... while True: ... item = yield ... for target in targets: ... target.send(item) # Because we wrap the results we need to work backwards from the # inner layer to the outer layer. # First, create a printer for the items: >>> printer = print_('%s') # Create replacers that send the output to the printer >>> replacer_spam = replace(printer, 'spam', 'bacon') >>> replacer_eggs = replace(printer, 'spam spam', 'sausage') # Create a tee to send the input to both the spam and the eggs # replacers >>> branch = tee(replacer_spam, replacer_eggs) # Send all items containing spam to the tee command >>> grepper = grep(branch, 'spam') # Send the data to the grepper for all the processing >>> for line in open('lines.txt'): ... grepper.send(line.rstrip()) bacon spam bacon bacon sausage bacon bacon bacon sausage spam
This makes the code much simpler and more readable, but more importantly, it shows how a single source can be split into multiple destinations. While this might not look too exciting, it most certainly is. If you look closely, you will see that the tee
method splits the input into two different outputs, but both of those outputs write back to the same print_
instance. This means that it's possible to route your data along whichever way is convenient for you while still having it end up at the same endpoint with no effort whatsoever.
Regardless, the example is still not that useful, as these functions still don't use all of the coroutine's power. The most important feature, a consistent state, is not really used in this case.
The most important lesson to learn from these lines is that mixing generators and coroutines is not a good idea in most cases since it can have very strange side effects if used incorrectly. Even though both use the yield
statement, they are significantly different creatures with different behavior. The next paragraph will show one of the few cases where mixing coroutines and generators can be useful.
Now that we know how to write basic coroutines and which pitfalls we have to take care of, how about writing a function where remembering the state is required? That is, a function that always gives you the average value of all sent values. This is one of the few cases where it is still relatively safe and useful to combine the coroutine and generator syntax:
>>> @coroutine ... def average(): ... count = 1 ... total = yield ... while True: ... total += yield total / count ... count += 1 >>> averager = average() >>> averager.send(20) 20.0 >>> averager.send(10) 15.0 >>> averager.send(15) 15.0 >>> averager.send(-25) 5.0
It still requires some extra logic to work properly though. To make sure we don't divide by zero, we initialize the count
to 1
. After that, we fetch our first item using yield
, but we don't send any data at that point because the first yield
is the primer and is executed before we get the value. Once that's all set up, we can easily yield the average value while summing. Not all that bad, but the pure coroutine version is slightly simpler to understand since we don't have to worry about priming:
>>> @coroutine ... def print_(formatstring): ... while True: ... print(formatstring % (yield)) >>> @coroutine ... def average(target): ... count = 0 ... total = 0 ... while True: ... count += 1 ... total += yield ... target.send(total / count) >>> printer = print_('%.1f') >>> averager = average(printer) >>> averager.send(20) 20.0 >>> averager.send(10) 15.0 >>> averager.send(15) 15.0 >>> averager.send(-25) 5.0
As simple as it should be, just keeping the count and the total value and simply send the new average for every new value.
Another nice example is itertools.groupby
, also quite simple to do with coroutines. For comparison, we will once again show both the generator coroutine and the pure coroutine version:
>>> @coroutine ... def groupby(): ... # Fetch the first key and value and initialize the state ... # variables ... key, value = yield ... old_key, values = key, [] ... while True: ... # Store the previous value so we can store it in the ... # list ... old_value = value ... if key == old_key: ... key, value = yield ... else: ... key, value = yield old_key, values ... old_key, values = key, [] ... values.append(old_value) >>> grouper = groupby() >>> grouper.send(('a', 1)) >>> grouper.send(('a', 2)) >>> grouper.send(('a', 3)) >>> grouper.send(('b', 1)) ('a', [1, 2, 3]) >>> grouper.send(('b', 2)) >>> grouper.send(('a', 1)) ('b', [1, 2]) >>> grouper.send(('a', 2)) >>> grouper.send((None, None)) ('a', [1, 2])
As you can see, this function uses a few tricks. We store the previous key
and value
so that we can detect when the group (key
) changes. And that is the second issue; we obviously cannot recognize a group until the group has changed, so only after the group has changed will the results be returned. This means that the last group will be sent only if a different group is sent after it, hence the (None, None)
. And now, here is the pure coroutine version:
>>> @coroutine ... def print_(formatstring): ... while True: ... print(formatstring % (yield)) >>> @coroutine ... def groupby(target): ... old_key = None ... while True: ... key, value = yield ... if old_key != key: ... # A different key means a new group so send the ... # previous group and restart the cycle. ... if old_key and values: ... target.send((old_key, values)) ... values = [] ... old_key = key ... values.append(value) >>> grouper = groupby(print_('group: %s, values: %s')) >>> grouper.send(('a', 1)) >>> grouper.send(('a', 2)) >>> grouper.send(('a', 3)) >>> grouper.send(('b', 1)) group: a, values: [1, 2, 3] >>> grouper.send(('b', 2)) >>> grouper.send(('a', 1)) group: b, values: [1, 2] >>> grouper.send(('a', 2)) >>> grouper.send((None, None)) group: a, values: [1, 2]
While the functions are fairly similar, the pure coroutine version is, once again, quite a bit simpler. This is because we don't have to think about priming and values that might get lost.