Thread communication

For this example, we will be using a normal Queue, from the queue module:

# comm_queue.py
import threading
from queue import Queue

SENTINEL = object()

def producer(q, n):
a, b = 0, 1
while a <= n:
q.put(a)
a, b = b, a + b
q.put(SENTINEL)

def consumer(q):
while True:
num = q.get()
q.task_done()
if num is SENTINEL:
break
print(f'Got number {num}')

q = Queue()
cns = threading.Thread(target=consumer, args=(q, ))
prd = threading.Thread(target=producer, args=(q, 35))
cns.start()
prd.start()
q.join()

The logic is very basic. We have a producer function that generates Fibonacci numbers and puts them in a queue. When the next number is greater than a given n, the producer exits the while loop, and puts one last thing in the queue: a SENTINEL. A SENTINEL is any object that is used to signal something, and in our case, it signals to the consumer that the producer is done.

The interesting bit of logic is in the consumer function. It loops indefinitely, reading values out of the queue and printing them out. There are a couple of things to notice here. First, see how we are calling q.task_done()? That is to acknowledge that the element in the queue has been processed. The purpose of this is to allow the final instruction in the code, q.join(), to unblock when all elements have been acknowledged, so that the execution can end.

Second, notice how we use the is operator to compare against the items in order to find the sentinel. We'll see shortly that when using a multiprocessing.Queue this won't be possible any more. Before we get there, would you be able to guess why?

Running this example produces a series of lines, such as Got number 0Got number 1, and so on, until 34, since the limit we put is 35, and the next Fibonacci number would be 55.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset