The Python language provides several functions and modules that will allow you to create, start, and control multiple threads. This chapter is designed to help you understand how to quickly implement threads into your programs to provide faster and easier processing of data.
Working with multiple threads that share the same data at the same time can be problematic. For example, two or more threads could try to access the same data at the same time, causing race conditions that can lead to deadlocks. For that reason, this chapter includes using thread locks and queues to manage data so that access to the CPU and data can be synchronized across multiple threads.
Timer-interrupted threads can be extremely valuable to provide notification status, as well as to clean up operations at specific intervals. The final phrase of this chapter discusses how to create and start a timer-interrupted thread.
You should be careful when using multiple threads that invoke methods in some of the extension modules. Not all the extension modules are particularly friendly. For example, they might block execution of all other threads for extended amounts of time until they are completed. However, most functions included in the Python standard library are written to work well in a multithreaded environment.
Example .
thread.start_new_thread(print_time, ("Thread01", 2,)) thread.start_new_thread(print_time, ("Thread02", 4,))
The start_new_thread
(function, args [, kwargs]
) method in the Python thread module enables a fast and efficient way to create new threads in both Linux and Windows. It accepts a function name as the first parameter and a set of arguments as the second. The optional third parameter allows you to pass a dictionary containing keyword arguments.
The start_new_thread
method creates a new thread and then starts code execution of the function. Control is immediately returned to the calling thread, and the new thread executes the specified function and returns silently.
If the code being executed by a new thread encounters an exception, a stack trace will be printed and the thread will exit. However, other threads will continue to run.
Although it is very effective for low-level threading, the thread module is very limited compared to the newer threading module.
import thread import time def print_time(threadName, delay): while 1: time.sleep(delay) print "%s: %s" % (threadName, time.ctime(time.time())) #Start threads to print time at different intervals thread.start_new_thread(print_time, ("Thread01", 2,)) thread.start_new_thread(print_time, ("Thread02", 4,)) while 1: pass
Thread01: Wed Jun 14 12:46:21 2006 Thread01: Wed Jun 14 12:46:23 2006 Thread02: Wed Jun 14 12:46:23 2006 Thread01: Wed Jun 14 12:46:25 2006 Thread01: Wed Jun 14 12:46:27 2006 Thread02: Wed Jun 14 12:46:27 2006 Thread01: Wed Jun 14 12:46:29 2006 Thread01: Wed Jun 14 12:46:31 2006 . . . . .
Output from create_thread.py code
Example .
class newThread (threading.Thread): def __init__(self, threadID, name, counter): self.threadID = threadID self.name = name self.counter = counter threading.Thread.__init__(self) . . . . . if doExit: thread.exit()
The newer threading module included with Python 2.4 provides much more powerful, high-level support for threads than the thread module discussed in the previous phrase. It is a little more complicated to implement; however, it provides the ability to better control and synchronize threads.
The threading module introduces a Thread
class that represents a separate thread of execution. To implement a new thread using the threading module, first define a new subclass of the Thread
class. Override the __init__(self [,
args
]
) method to add additional arguments. Then override the run(self
[,args])
method to implement what the thread should do when started.
Once you have created the new Thread
subclass, you can create an instance of it and then start a new thread by invoking the start()
or run()
methods.
import threading import thread import time doExit = 0 class newThread (threading.Thread): def __init__(self, threadID, name, counter): self.threadID = threadID self.name = name self.counter = counter threading.Thread.__init__(self) def run(self): print "Starting " + self.name print_time(self.name, self.counter, 5) print "Exiting " + self.name def print_time(threadName, delay, counter): while counter: if doExit: thread.exit() time.sleep(delay) print "%s: %s" % (threadName, time.ctime(time.time())) counter -= 1 #Create new threads thread1 = newThread(1, "Thread01", 1) thread2 = newThread(2, "Thread02", 2) #Start new Threads thread1.start() thread2.run() while thread2.isAlive(): if not thread1.isAlive(): doExit = 1 pass print "Exiting Main Thread"
exit_thread.py
Starting Thread01 Starting Thread02 Thread01: Wed Jun 14 13:06:10 2006 Thread01: Wed Jun 14 13:06:11 2006 Thread02: Wed Jun 14 13:06:11 2006 Thread01: Wed Jun 14 13:06:12 2006 Thread01: Wed Jun 14 13:06:13 2006 Thread02: Wed Jun 14 13:06:13 2006 Thread01: Wed Jun 14 13:06:14 2006 Exiting Thread01 Thread02: Wed Jun 14 13:06:15 2006 Exiting Main Thread
Example .
threadLock = threading.Lock() . . . threadLock.acquire() print_time(self.name, self.counter, 3) threadLock.release()
The threading module provided with Python includes a simple-to-implement locking mechanism that will allow you to synchronize threads. A new lock is created by calling the Lock()
method, which returns the new lock.
Once the new lock object has been created, you can force threads to run synchronously by calling the acquire
(blocking)
method. The optional blocking
parameter enables you to control whether the thread will wait to acquire the lock. If blocking is set to 0, the thread will return immediately with a 0 value if the lock cannot be acquired and with a 1 if the lock was acquired. If blocking is set to 1, the thread will block and wait for the lock to be released.
When you are finished with the lock, the lock is released by calling the release()
method of the new lock object.
import threading import time class newThread (threading.Thread): def __init__(self, threadID, name, counter): self.threadID = threadID self.name = name self.counter = counter threading.Thread.__init__(self) def run(self): print "Starting " + self.name #Get lock to synchronize threads threadLock.acquire() print_time(self.name, self.counter, 3) #Free lock to release next thread threadLock.release() def print_time(threadName, delay, counter): while counter: time.sleep(delay) print "%s: %s" % (threadName, time.ctime(time.time())) counter -= 1 threadLock = threading.Lock() threads = [] #Create new threads thread1 = newThread(1, "Thread01", 1) thread2 = newThread(2, "Thread02", 2) #Start new Threads thread1.start() thread2.start() #Add threads to thread list threads.append(thread1) threads.append(thread2) #Wait for all threads to complete for t in threads: t.join() print "Exiting Main Thread"
sync_thread.py
Starting Thread01 Starting Thread02 Thread01: Tue Jun 20 10:06:24 2006 Thread01: Tue Jun 20 10:06:25 2006 Thread01: Tue Jun 20 10:06:26 2006 Thread02: Tue Jun 20 10:06:28 2006 Thread02: Tue Jun 20 10:06:30 2006 Thread02: Tue Jun 20 10:06:32 2006 Exiting Main Thread
Output from sync_thread.py code
Example .
queueLock = threading.Lock() workQueue = Queue.Queue(10) queueLock.acquire() for word in wordList: workQueue.put(word) queueLock.release() while not workQueue.empty(): pass . . . queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release()
The Queue module provides an invaluable way to manage processing large amounts of data on multiple threads. The Queue module allows you to create a new queue object that can hold a specific number of items. Items can be added and removed from the queue using the get()
and put()
methods of the queue object.
The queue object also includes the empty()
, full()
, and qsize()
methods to determine whether the queue is empty, full, or the approximate size, respectively. The qsize
method is not always reliable because of multiple threads removing items from the queue.
If necessary, you can implement the thread locking discussed in the previous phrase to control access to the queue. This will make queue management much safer and provide you with more control of the data processing.
import Queue import threading import time import thread doExit = 0 class newThread (threading.Thread): def __init__(self, threadID, name, q): self.threadID = threadID self.name = name self.q = q threading.Thread.__init__(self) def run(self): print "Starting " + self.name process_data(self.name, self.q) print "Exiting " + self.name def process_data(tName, q): while not doExit: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print "%s processing %s" % (tName, data) else: queueLock.release() time.sleep(1) threadList = ["Thread1", "Thread2", "Thread3"] wordList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = Queue.Queue(10) threads = [] tID = 1 #Create new threads for tName in threadList: thread = newThread(tID, tName, workQueue) thread.start() threads.append(thread) tID += 1 #Fill the queue queueLock.acquire() for word in wordList: workQueue.put(word) queueLock.release() #Wait for queue to empty while not workQueue.empty(): pass #Notify threads it's time to exit doExit = 1 #Wait for all threads to complete for t in threads: t.join() print "Exiting Main Thread"
Starting Thread1 Starting Thread2 Starting Thread3 Thread1 processing One Thread2 processing Two Thread3 processing Three Thread1 processing Four Thread2 processing Five Exiting Thread1 Exiting Thread2 Exiting Thread3 Exiting Main Thread
Output from queue_thread.py code
Common threads invoked on Linux servers are the timer threads to clean up resources, provide notification, and check status, as well as many other functions. The threading module included with Python provides an easy way of creating a simple timer-interrupted thread.
The Timer
(interval, func [,args [, kwargs]])
method of the threading module creates a new timer-interrupted thread object. The interval specifies the number of seconds to wait before executing the function specified in the func
argument.
Once the new timer-interrupted thread object is created, it can be started at any time using the start
method of the object. Once the start
method is invoked, the thread will wait the specified timer interval and then begin execution.
A timer thread can be cancelled after it is started, using the cancel()
method of the object, provided that the function has not yet been executed.
import threading import os def clean_queue (qPath): jobList = os.listdir(qPath) for j in jobList: delPath = "%s/%s" % (qPath, j) os.remove(delPath) print "Removing " + delPath qPath = "/print/queue01" waitTime = 600 #10 minutes #Create timer thread wakeCall = threading.Timer(waitTime, clean_queue, (qPath ,)) #Start timer thread wakeCall.start()
timer_thread.py
Removing /print/queue01/102.txt Removing /print/queue01/103.txt Removing /print/queue01/104.txt Removing /print/queue01/105.txt Removing /print/queue01/106.txt Removing /print/queue01/107.txt
Output from timer_thread.py code