It is a basic fact of life that I/O (for example, file or database access) is slow. I/O is not only slow, but also unpredictable. In a common scenario, we wait for data (from a web service or sensors) and write the data to the filesystem or a database. In such a situation, we can find ourselves to be I/O bound—spending more time waiting for the data than actually processing it. We can poll for data periodically or act on event triggers (either check your watch or set an alarm). GUIs usually have special threads that wait for user input in an infinite loop.
The Python asyncio
module for asynchronous I/O uses the concept of coroutines with a related function decorator. A brief example of this module was also given in the Scraping the web recipe of Chapter 5, Web Mining, Databases, and Big Data. Subroutines can be thought of as a special case of coroutines. A subroutine has a start and exit point, either through an early exit with a return statement or by reaching the end of the subroutine definition. In contrast, a coroutine can yield with the yield from
statement by calling another coroutine and then resuming execution from that exit point. The coroutine is letting another coroutine take over, as it were, and is going back to sleep until it is activated again.
Subroutines can be placed on a single stack. However, coroutines require multiple stacks, which makes understanding the code and potential exceptions more complex.
The code is in the accessing_asyncio.ipynb
file in this book's code bundle:
import dautil as dl import ch12util from functools import partial import matplotlib.pyplot as plt import numpy as np from scipy.stats import skew import asyncio import time from IPython.display import HTML STATS = []
def resample(arr): sample = ch12util.bootstrap(arr) STATS.append((sample.mean(), sample.std(), skew(sample)))
class Bootstrapper(): def __init__(self, data, queue): self.data = data self.log = dl.log_api.conf_logger(__name__) self.queue = queue @asyncio.coroutine def run(self): while not self.queue.empty(): index = yield from self.queue.get() if index % 10 == 0: self.log.debug('Bootstrap {}'.format( index)) resample(self.data) # simulates slow IO yield from asyncio.sleep(0.01)
def serial(arr, n): for i in range(n): resample(arr) # simulates slow IO time.sleep(0.01)
def parallel(arr, n): q = asyncio.Queue() for i in range(n): q.put_nowait(i) bootstrapper = Bootstrapper(arr, q) policy = asyncio.get_event_loop_policy() policy.set_event_loop(policy.new_event_loop()) loop = asyncio.get_event_loop() tasks = [asyncio.async(bootstrapper.run()) for i in range(n)] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
pressure = dl.data.Weather.load()['PRESSURE'].dropna().values np.random.seed(33) parallel_times = ch12util.time_many(partial(parallel, pressure)) serial_times = ch12util.time_many(partial(serial, pressure)) dl.options.mimic_seaborn() ch12util.plot_times(plt.gca(), serial_times, parallel_times) sp = dl.plotting.Subplotter(2, 2, context) ch12util.plot_times(sp.ax, serial_times, parallel_times) STATS = np.array(STATS) ch12util.plot_distro(sp.next_ax(), STATS.T[0], pressure.mean()) sp.label() ch12util.plot_distro(sp.next_ax(), STATS.T[1], pressure.std()) sp.label() ch12util.plot_distro(sp.next_ax(), STATS.T[2], skew(pressure)) sp.label() HTML(sp.exit())
asyncio
module at https://docs.python.org/3/library/asyncio.html (retrieved January 2016)