Defining processes

We must design each processing step as a simple loop that gets a request from a queue, processes that request, and places the results into another queue. This decomposes the larger problem into a number of stages that form a pipeline. Because each of these stages runs concurrently, the system resource use will be maximized. Furthermore, as the stages involve simple gets and puts into independent queues, there are no problems with complex locking or shared resources. A process can be a simple function or a callable object. We'll focus on defining processes as subclasses of multiprocessing.Process. This gives us the most flexibility.

For the simulation of a stateful process such as a game, we can break the simulation down into a three-step pipeline:

  1. An overall driver puts simulation requests into a processing queue.
  2. A pool of simulators will get a request from the processing queue, perform the simulation, and put the statistics into a results queue.
  3. A summarizer will get the results from the result queue and create a final tabulation of the results.

Using a process pool allows us to have as many simulations running concurrently as our CPU can handle. The pool of simulators can be configured to ensure that simulations run as quickly as possible.

Here's a definition of the simulator process:

import multiprocessing

class Simulation(multiprocessing.Process):

def __init__(
self,
setup_queue: multiprocessing.SimpleQueue,
result_queue: multiprocessing.SimpleQueue,
) -> None:
self.setup_queue = setup_queue
self.result_queue = result_queue
super().__init__()

def run(self) -> None:
"""Waits for a termination"""
print(f"{self.__class__.__name__} start")
item = self.setup_queue.get()
while item != (None, None):
table, player = item
self.sim = Simulate(table, player, samples=1)
results = list(self.sim)
self.result_queue.put((table, player, results[0]))
item = self.setup_queue.get()
print(f"{self.__class__.__name__} finish")

We've extended multiprocessing.Process. This means that we must do two things to work properly with multiprocessing: we must ensure that super().__init__() is executed, and we must override run().

Within the body of run(), we're using two queues. setup_queue will contain two-tuples of the Table and Player objects. The process will use these two objects to run a simulation. It will put the resulting three-tuple into result_queue. The API for the Simulate class is this:

class Simulate:

def __init__(
self,
table: Table,
player: Player,
samples: int
) -> None: ...

def __iter__(self) -> Iterator[Tuple]: ...

The iterator will yield the requested number, samples, of statistical summaries. We've included a provision for a sentinel object to arrive via setup_queue. This object will be used to gracefully close down the processing. If we don't use a sentinel object, we'll be forced to terminate the processes, which can disrupt locks and other system resources. Here's the summarization process:

class Summarize(multiprocessing.Process):

def __init__(self, queue: multiprocessing.SimpleQueue) -> None:
self.queue = queue
super().__init__()

def run(self) -> None:
"""Waits for a termination"""
print(f"{self.__class__.__name__} start")
count = 0
item = self.queue.get()
while item != (None, None, None):
print(item)
count += 1
item = self.queue.get()
print(f"{self.__class__.__name__} finish {count}")

This also extends multiprocessing.Process. In this case, we're fetching items from a queue and simply counting them. A more useful processing might use several collection.Counter objects to accumulate more interesting statistics.

As with the Simulation class, we're also going to detect a sentinel and gracefully close down the processing. The use of a sentinel object allows us to close down processing as soon as the work is completed by the process. In some applications, the child process can be left running indefinitely.

Let's see how to build queues and supply data in the next section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset