Chapter 5. Managing Threads

The Python language provides several functions and modules that will allow you to create, start, and control multiple threads. This chapter is designed to help you understand how to quickly implement threads into your programs to provide faster and easier processing of data.

Working with multiple threads that share the same data at the same time can be problematic. For example, two or more threads could try to access the same data at the same time, causing race conditions that can lead to deadlocks. For that reason, this chapter includes using thread locks and queues to manage data so that access to the CPU and data can be synchronized across multiple threads.

Timer-interrupted threads can be extremely valuable to provide notification status, as well as to clean up operations at specific intervals. The final phrase of this chapter discusses how to create and start a timer-interrupted thread.

Caution

You should be careful when using multiple threads that invoke methods in some of the extension modules. Not all the extension modules are particularly friendly. For example, they might block execution of all other threads for extended amounts of time until they are completed. However, most functions included in the Python standard library are written to work well in a multithreaded environment.

Starting a New Thread

Example . 

thread.start_new_thread(print_time, ("Thread01",
2,))
thread.start_new_thread(print_time, ("Thread02",
4,))

The start_new_thread(function, args [, kwargs]) method in the Python thread module enables a fast and efficient way to create new threads in both Linux and Windows. It accepts a function name as the first parameter and a set of arguments as the second. The optional third parameter allows you to pass a dictionary containing keyword arguments.

The start_new_thread method creates a new thread and then starts code execution of the function. Control is immediately returned to the calling thread, and the new thread executes the specified function and returns silently.

Note

If the code being executed by a new thread encounters an exception, a stack trace will be printed and the thread will exit. However, other threads will continue to run.

Although it is very effective for low-level threading, the thread module is very limited compared to the newer threading module.

import thread
import time

def print_time(threadName, delay):
    while 1:
        time.sleep(delay)
        print "%s: %s" % (threadName, 
            time.ctime(time.time()))

#Start threads to print time at different intervals
thread.start_new_thread(print_time, ("Thread01",
2,))
thread.start_new_thread(print_time, ("Thread02",
4,))

while 1:
    pass

create_thread.py

Thread01: Wed Jun 14 12:46:21 2006
Thread01: Wed Jun 14 12:46:23 2006
Thread02: Wed Jun 14 12:46:23 2006
Thread01: Wed Jun 14 12:46:25 2006
Thread01: Wed Jun 14 12:46:27 2006
Thread02: Wed Jun 14 12:46:27 2006
Thread01: Wed Jun 14 12:46:29 2006
Thread01: Wed Jun 14 12:46:31 2006
. . . . .

Output from create_thread.py code

Creating and Exiting Threads

Example . 

class newThread (threading.Thread):
    def __init__(self, threadID, name, counter):
        self.threadID = threadID
        self.name = name
        self.counter = counter
        threading.Thread.__init__(self)
. . . . .
if doExit:
    thread.exit()

The newer threading module included with Python 2.4 provides much more powerful, high-level support for threads than the thread module discussed in the previous phrase. It is a little more complicated to implement; however, it provides the ability to better control and synchronize threads.

The threading module introduces a Thread class that represents a separate thread of execution. To implement a new thread using the threading module, first define a new subclass of the Thread class. Override the __init__(self [,args]) method to add additional arguments. Then override the run(self [,args]) method to implement what the thread should do when started.

Once you have created the new Thread subclass, you can create an instance of it and then start a new thread by invoking the start() or run() methods.

import threading
import thread
import time

doExit = 0

class newThread (threading.Thread):
    def __init__(self, threadID, name, counter):
        self.threadID = threadID
        self.name = name
        self.counter = counter
        threading.Thread.__init__(self)
    def run(self):
        print "Starting " + self.name
        print_time(self.name, self.counter, 5)
        print "Exiting " + self.name

def print_time(threadName, delay, counter):
    while counter:
        if doExit:
            thread.exit()
        time.sleep(delay)
        print "%s: %s" % (threadName, 
            time.ctime(time.time()))
        counter -= 1

#Create new threads
thread1 = newThread(1, "Thread01", 1)
thread2 = newThread(2, "Thread02", 2)

#Start new Threads
thread1.start()
thread2.run()


while thread2.isAlive():
    if not thread1.isAlive():
        doExit = 1

    pass

print "Exiting Main Thread"

exit_thread.py

Starting Thread01
Starting Thread02
Thread01: Wed Jun 14 13:06:10 2006
Thread01: Wed Jun 14 13:06:11 2006
Thread02: Wed Jun 14 13:06:11 2006
Thread01: Wed Jun 14 13:06:12 2006
Thread01: Wed Jun 14 13:06:13 2006
Thread02: Wed Jun 14 13:06:13 2006
Thread01: Wed Jun 14 13:06:14 2006
Exiting Thread01
Thread02: Wed Jun 14 13:06:15 2006
Exiting Main Thread

Output from exit_thread.py code

Synchronizing Threads

Example . 

threadLock = threading.Lock()
. . .
threadLock.acquire()
print_time(self.name, self.counter, 3)
threadLock.release()

The threading module provided with Python includes a simple-to-implement locking mechanism that will allow you to synchronize threads. A new lock is created by calling the Lock() method, which returns the new lock.

Once the new lock object has been created, you can force threads to run synchronously by calling the acquire(blocking) method. The optional blocking parameter enables you to control whether the thread will wait to acquire the lock. If blocking is set to 0, the thread will return immediately with a 0 value if the lock cannot be acquired and with a 1 if the lock was acquired. If blocking is set to 1, the thread will block and wait for the lock to be released.

When you are finished with the lock, the lock is released by calling the release() method of the new lock object.

import threading
import time

class newThread (threading.Thread):
    def __init__(self, threadID, name, counter):
        self.threadID = threadID
        self.name = name
        self.counter = counter
        threading.Thread.__init__(self)
    def run(self):
        print "Starting " + self.name
#Get lock to synchronize threads
        threadLock.acquire()
        print_time(self.name, self.counter, 3)
#Free lock to release next thread
        threadLock.release()

def print_time(threadName, delay, counter):
    while counter:
        time.sleep(delay)
        print "%s: %s" % (threadName, 
            time.ctime(time.time()))
        counter -= 1

threadLock = threading.Lock()
threads = []

#Create new threads
thread1 = newThread(1, "Thread01", 1)
thread2 = newThread(2, "Thread02", 2)

#Start new Threads
thread1.start()
thread2.start()

#Add threads to thread list
threads.append(thread1)
threads.append(thread2)

#Wait for all threads to complete
for t in threads:
    t.join()

print "Exiting Main Thread"

sync_thread.py

Starting Thread01
Starting Thread02
Thread01: Tue Jun 20 10:06:24 2006
Thread01: Tue Jun 20 10:06:25 2006
Thread01: Tue Jun 20 10:06:26 2006
Thread02: Tue Jun 20 10:06:28 2006
Thread02: Tue Jun 20 10:06:30 2006
Thread02: Tue Jun 20 10:06:32 2006
Exiting Main Thread

Output from sync_thread.py code

Implementing a Multithreaded Priority Queue

Example . 

queueLock = threading.Lock()
workQueue = Queue.Queue(10)
queueLock.acquire()
for word in wordList:
    workQueue.put(word)
queueLock.release()
while not workQueue.empty():
    pass
. . .
queueLock.acquire()
if not workQueue.empty():
    data = q.get()
    queueLock.release()

The Queue module provides an invaluable way to manage processing large amounts of data on multiple threads. The Queue module allows you to create a new queue object that can hold a specific number of items. Items can be added and removed from the queue using the get() and put() methods of the queue object.

The queue object also includes the empty(), full(), and qsize() methods to determine whether the queue is empty, full, or the approximate size, respectively. The qsize method is not always reliable because of multiple threads removing items from the queue.

If necessary, you can implement the thread locking discussed in the previous phrase to control access to the queue. This will make queue management much safer and provide you with more control of the data processing.

import Queue
import threading
import time
import thread

doExit = 0

class newThread (threading.Thread):
    def __init__(self, threadID, name, q):
        self.threadID = threadID
        self.name = name
        self.q = q
        threading.Thread.__init__(self)
    def run(self):
        print "Starting " + self.name
        process_data(self.name, self.q)
        print "Exiting " + self.name

def process_data(tName, q):
    while not doExit:
        queueLock.acquire()
        if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print "%s processing %s" % (tName, data)
        else:
            queueLock.release()
        time.sleep(1)

threadList = ["Thread1", "Thread2", "Thread3"]
wordList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
tID = 1

#Create new threads
for tName in threadList:
    thread = newThread(tID, tName, workQueue)
    thread.start()
    threads.append(thread)
    tID += 1

#Fill the queue
queueLock.acquire()
for word in wordList:
    workQueue.put(word)
queueLock.release()

#Wait for queue to empty
while not workQueue.empty():
    pass

#Notify threads it's time to exit
doExit = 1

#Wait for all threads to complete
for t in threads:
    t.join()

print "Exiting Main Thread"

queue_thread.py

Starting Thread1
Starting Thread2
Starting Thread3
Thread1 processing One
Thread2 processing Two
Thread3 processing Three
Thread1 processing Four
Thread2 processing Five
Exiting Thread1
Exiting Thread2
Exiting Thread3
Exiting Main Thread

Output from queue_thread.py code

Initiating a Timer-Interrupted Thread

Example . 

wakeCall = threading.Timer(waitTime, 
                clean_queue, (qPath ,))
wakeCall.start()

Common threads invoked on Linux servers are the timer threads to clean up resources, provide notification, and check status, as well as many other functions. The threading module included with Python provides an easy way of creating a simple timer-interrupted thread.

The Timer(interval, func [,args [, kwargs]]) method of the threading module creates a new timer-interrupted thread object. The interval specifies the number of seconds to wait before executing the function specified in the func argument.

Once the new timer-interrupted thread object is created, it can be started at any time using the start method of the object. Once the start method is invoked, the thread will wait the specified timer interval and then begin execution.

Note

A timer thread can be cancelled after it is started, using the cancel() method of the object, provided that the function has not yet been executed.

import threading
import os

def clean_queue (qPath):
    jobList = os.listdir(qPath)
    for j in jobList:
        delPath = "%s/%s" % (qPath, j)
        os.remove(delPath)
        print "Removing " + delPath

qPath = "/print/queue01"
waitTime = 600 #10 minutes

#Create timer thread
wakeCall = threading.Timer(waitTime, 
                clean_queue, (qPath ,))

#Start timer thread
wakeCall.start()

timer_thread.py

Removing /print/queue01/102.txt
Removing /print/queue01/103.txt
Removing /print/queue01/104.txt
Removing /print/queue01/105.txt
Removing /print/queue01/106.txt
Removing /print/queue01/107.txt

Output from timer_thread.py code

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset