Building queues and supplying data

Objects are transferred among processes through queues or pipes. Building queues involves creating instances of multiprocessing.Queue or one of its subclasses. For this example, we can use the following:

setup_q = multiprocessing.SimpleQueue() 
results_q = multiprocessing.SimpleQueue() 

We created two queues that define the processing pipeline. When we put a simulation request into setup_q, we expect that a Simulation process will get the request pair and run the simulation. This should generate a results triple, which is put in results_q. The results triple should, in turn, lead the work being done by the Summarize process. Here's how we can start a single Summarize process:

result = Summarize(results_q) 
result.start() 

Here's how we can create four concurrent simulation processes:

    simulators = [] 
    for i in range(4): 
        sim = Simulation(setup_q, results_q) 
        sim.start() 
        simulators.append(sim) 

The four concurrent simulators will be competing for work. Each one will be attempting to grab the next request from the queue of pending requests. Once all the four simulators are busy working, the queue will start to get filled with unprocessed requests. The ideal size of the pool of workers is difficult to predict. It depends on the number of cores a processor has, and it also depends on the workload. A pool of processors doing a great deal of input and output (I/O) work will also do a lot of waiting for I/O to complete; so, the pool can be very large. On a smaller machine with only a four cores and a compute-intensive workload, the pool will be smaller.

After the queues and processes are waiting for work, the driver function can start putting requests into the setup_q queue. Here's a loop that will generate a flood of requests:

table = Table(
decks=6, limit=50, dealer=Hit17(), split=ReSplit(),
payout=(3, 2)
)
for bet in Flat, Martingale, OneThreeTwoSix:
player = Player(SomeStrategy(), bet(), 100, 25)
for sample in range(5):
setup_q.put((table, player))

We have created a Table object. For each of the three betting strategies, we have created a Player object, and then queued up a simulation request. The pickled two-tuple will be fetched from the queue by the Simulation object and then it will be processed. In order to have an orderly termination, we'll need to queue sentinel objects for each simulator:

for sim in simulators:
setup_q.put((None, None))

# Wait for the simulations to all finish.
for sim in simulators:
sim.join()

We put a sentinel object into the queue for each simulator to consume. Once all the simulators have consumed the sentinels, we can wait for the processes to finish execution and join back into the parent process.

Once the Process.join() operation is finished, no more simulation data will be created. We can enqueue a sentinel object into the simulation results queue, as well:

results_q.put((None, None, None))
result.join()

Once the results sentinel object is processed, the Summarize process will stop accepting input and we can join() it as well.

We used multiprocessing to transmit objects from one process to another. This gives us a relatively simple way to create high-performance, multi-processing data pipelines. The multiprocessing module uses pickle, so there are few limitations on the nature of objects that can be pushed through the pipelines.

It's informative to adjust the pool size to see the impact of more workers and fewer workers on the elapsed runtime. The interaction among processing cores, memory, and the nature of the workload is difficult to predict, and an empirical study is helpful to find the optimum number of workers in the pool.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset