Chapter 28

Actors

image

Similar to the Letterbox style (Chapter 11), but where the things have independent threads of execution.

28.1 Constraints

  • The larger problem is decomposed into things that make sense for the problem domain.
  • Each thing has a queue meant for other things to place messages in it.
  • Each thing is a capsule of data that exposes only its ability to receive messages via the queue.
  • Each thing has its own thread of execution independent of the others.

28.2 A Program in this Style

  1 #!/usr/bin/env python
  2
  3 import sys, re, operator, string
  4 from threading import Thread
  5 from Queue import Queue
  6
  7 class ActiveWFObject(Thread):
  8 def __init__(self):
  9    Thread.__init__(self)
 10    self.name = str(type(self))
 11    self.queue = Queue()
 12    self._stop = False
 13    self.start()
 14
 15 def run(self):
 16    while not self._stop:
 17        message = self.queue.get()
 18        self._dispatch(message)
 19        if message[0] == 'die':
 20        self._stop = True
 21
 22 def send(receiver, message):
 23 receiver.queue.put(message)
 24
 25 class DataStorageManager(ActiveWFObject):
 26 """ Models the contents of the file """
 27 _data = ''
 28
 29 def _dispatch(self, message):
 30    if message[0] == 'init':
 31        self._init(message[1:])
 32    elif message[0] == 'send_word_freqs':
 33        self._process_words(message[1:])
 34    else:
 35        # forward
 36        send(self._stop_word_manager, message)
 37
 38 def _init(self, message):
 39    path_to_file = message[0]
 40    self._stop_word_manager = message[1]
 41    with open(path_to_file) as f:
 42        self._data = f.read()
 43    pattern = re.compile('[W_]+')
 44    self._data = pattern.sub(' ', self._data).lower()
 45
 46 def _process_words(self, message):
 47    recipient = message[0]
 48    data_str = ''.join(self._data)
 49    words = data_str.split()
 50    for w in words:
 51        send(self._stop_word_manager, ['filter', w])
 52    send(self._stop_word_manager, ['top25', recipient])
 53
 54 class StopWordManager(ActiveWFObject):
 55 """ Models the stop word filter """
 56 _stop_words = []
 57
 58 def _dispatch(self, message):
 59    if message[0] == 'init':
 60        self._init(message[1:])
 61    elif message[0] == 'filter':
 62        return self._filter(message[1:])
 63    else:
 64        # forward
 65        send(self._word_freqs_manager, message)
 66
 67 def _init(self, message):
 68    with open('../stop_words.txt') as f:
 69        self._stop_words = f.read().split(',')
 70    self._stop_words.extend(list(string.ascii_lowercase))
 71    self._word_freqs_manager = message[0]
 72
 73 def _filter(self, message):
 74    word = message[0]
 75    if word not in self._stop_words:
 76        send(self._word_freqs_manager, ['word', word])
 77
 78 class WordFrequencyManager(ActiveWFObject):
 79 """ Keeps the word frequency data """
 80 _word_freqs = {}
 81
 82 def _dispatch(self, message):
 83    if message[0] == 'word':
 84        self._increment_count(message[1:])
 85    elif message[0] == 'top25':
 86        self._top25(message[1:])
 87
 88 def _increment_count(self, message):
 89    word = message[0]
 90    if word in self._word_freqs:
 91        self._word_freqs[word] += 1
 92    else:
 93        self._word_freqs[word] = 1
 94
 95 def _top25(self, message):
 96    recipient = message[0]
 97    freqs_sorted = sorted(self._word_freqs.iteritems(), key=
      operator.itemgetter(1), reverse=True)
 98    send(recipient, ['top25', freqs_sorted])
 99
 100 class WordFrequencyController(ActiveWFObject):
 101
 102 def _dispatch(self, message):
 103    if message[0] == 'run':
 104        self._run(message[1:])
 105    elif message[0] == 'top25':
 106        self._display(message[1:])
 107    else:
 108        raise Exception("Message not understood " + message
       [0])
 109
 110 def _run(self, message):
 111    self._storage_manager = message[0]
 112    send(self._storage_manager, ['send_word_freqs', self])
 113
 114 def _display(self, message):
 115    word_freqs = message[0]
 116    for (w, f) in word_freqs[0:25]:
 117        print w, ' - ', f
 118    send(self._storage_manager, ['die'])
 119    self._stop = True
 120
 121 #
 122 # The main function
 123 #
 124 word_freq_manager = WordFrequencyManager()
 125
 126 stop_word_manager = StopWordManager()
 127 send(stop_word_manager, ['init', word_freq_manager])
 128
 129 storage_manager = DataStorageManager()
 130 send(storage_manager, ['init', sys.argv[1], stop_word_manager])
 131
 132 wfcontroller = WordFrequencyController()
 133 send(wfcontroller, ['run', storage_manager])
 134
 135 # Wait for the active objects to finish
 136 [t.join() for t in [word_freq_manager, stop_word_manager,
  storage_manager, wfcontroller]]

28.3 Commentary

THIS STYLE is a direct extension of the Letterbox style, but where the objects have their own threads. These objects are also known as active objects or actors. Objects interact with each other by sending messages that are placed in queues. Each active object performs a continuous loop over its queue, processing one message at a time, and blocking if the queue is empty.

The example program starts by defining a class, ActiveWFObject (lines #7–20), that implements generic behavior of active objects. Active objects inherit from Thread (line #7), a Python class that supports concurrent threads of execution. This means that their run method (lines #15–20) is spawned concurrently when the thread's start method is called in line #13. Each active object has a name (line #10) and a queue (line #11). The Queue object in Python implements a queue data type where threads that call the get operation may be blocked if the queue is empty. The run method (lines #15–20) runs an infinite loop that takes one message from the queue, possibly blocking if the queue is empty, and that dispatches that message. One special message die breaks the loop and makes the thread stop (lines #19–20). Any active application objects inherit the behavior from ActiveWFObject.

Lines #22–23 define a function for sending a message to a receiver. In this case, sending a message means placing it in the queue of the receiver (line #23).

Next, we have the four active application objects. In this program, we have followed the same design as that used in the Letterbox style (Chapter 11), so the classes and their roles are exactly the same: there is a data storage entity (lines #25–52), a stop word entity (lines #54–76), an entity for keeping word frequency counts (lines #78–98) and the application controller (lines #100–119). All of these inherit from ActiveWFObject, meaning that any instantiation of these classes spawns new threads independently running the run method (lines #15–20).

In the main method (lines #124–136), we instantiate one object of each class, so when the application runs, it results in 4 threads plus the main thread. The main thread simply blocks until the active objects' threads all stop (line #136).

A message in our program is simply a list with any number of elements that has the message tag in its first position. Object references can be sent via messages. For example, the 'init' message that the main thread sends to the StopWordManager object is ['init', word_freq_manager] (line #127), where_word_freq manager is the reference to another active object, an instance of WordFrequencyManager; the 'init' message that the main thread sends to the DataStorageManager object is ['init', sys.argv[1], stop_word_manager].

Let's look into each active application object in more detail, and the messages that are exchanged among them. The application starts by sending the 'init' message to both the stop word manager (line #127) and the data storage manager (line #130). These messages are dispatched by the corresponding active objects' threads (line #18), which results in the execution of the corresponding dispatch methods – lines #58–65 and lines #29–36, respectively. In both cases, the 'init' messages result in files being read, and its data processed in some form. Next, the main thread sends the 'run' message to the application controller (line #133), and this triggers the execution of the term frequency task over the input data. Let's see how.

Upon reception of the 'run' message, the word frequency controller stores the reference for the data storage object (line #111) and sends it the message 'send_word_freqs' with a reference to itself (line #112). In turn, when the data storage object receives 'send_word_freqs' (line #32), it starts processing the words (lines #46–52), which results in sending each word to the stop word manager object, along with the message 'filter' (lines #50–51). Upon receiving 'filter' messages, the stop word manager object filters the word (lines #73–76), which results in sending the non-stop words along with the message 'word' to the word frequency manager (lines #75–76). In turn, the word frequency manager object increments the counts for each word received via the message 'word' (lines #88–93).

When the data storage manager runs out of words, it sends the message 'top25' to the stop word manager, along with the reference to the recipient (line #52) – remember the recipient is the application controller (see line #112). The stop word manager, however, does not understand that message, as it is not one of the expected messages in its dispatch method (lines #58–65). Its dispatch method is implemented so that any message that is not explicitly expected is simply forwarded to the word frequency manager object, so the 'top25' message is forwarded. In turn, the word frequency manager understands the 'top25' message (line #85); upon its reception, it sends the sorted list of word frequencies to the recipient (lines #95–98) along with the message 'top25'. The recipient, the application controller, upon receiving the 'top25' message (line #105) prints the information on the screen (lines #115–117), and sends the 'die' message down the chain of objects, which makes them all stop (lines #18–19). At that point, all threads are finished, the main thread is unblocked, and the application ends.

Unlike the Letterbox style, the Actors style is inherently asynchronous, with blocking queues serving as the interfaces between the agents. The calling objects place messages in queues of the callees and continue without waiting for the dispatch of those messages.

28.4 This Style in Systems Design

This style is a natural match for large distributed systems: without distributed shared memory, components in different nodes of a network interact by sending messages to each other. There are a few ways of designing message-based systems; one of them, known as point-to-point messaging, where the message has a single, well-known receiver, maps directly to this style. The Java Message Service (JMS) framework is a popular framework that supports this style, along with the publish-subscribe style described in the previous chapter. In the mobile arena, the Google Cloud Messaging for Android is another example of this style in action at planetary scale.

But this style is not just for large distributed systems. Components that consist of a single multi-threaded process also benefit from the application of this style – threaded objects with queues – as a way to limit the amount of internal concurrency.

28.5 Historical Notes

This style targets programming for concurrent and distributed applications. The general idea is as old as the first operating systems that supported concurrency, and it emerged in several forms during the 1970s. Message-passing processes had been known to be a flexible way to structure operating systems; from the beginning, this model has co-existed with the alternative shared memory model. In the mid 1980s, Gul Agha formalized the model, giving these processes with queues the general name of Actors.

28.6 Further Reading

Agha, G. (1985). Actors: A model of concurrent computation in distributed systems. Doctoral dissertation, MIT Press.
Synopsis: This is the original work proposing the Actor model for concurrent programming.

Lauer, H. and Needham, R. (1978). On the duality of operating system structures. Second International Symposium on Operating Systems.
Synopsis: Long before concurrent programming was its own topic, researchers and developers were well aware of the design tradeoffs concerning communication between different units of execution. This paper presents a nice overview of message-passing vs. shared memory models.

28.7 Glossary

Actor: An object with its own thread of execution, or a process node on a network. Actors have a queue to receive messages, and interact with each other only by sending messages.

Asynchronous request: A request where the requester doesn't wait for the reply, and where the reply, if any, arrives at some later point in time.

Message: A data structure carrying information from a sender to a known receiver, possibly transported via a network.

28.8 Exercises

28.1 Another language. Implement the example program in another language, but preserve the style.

28.2 3+1 Threads. Write another version of the example program also in the Actors style, but with only three active objects plus the main thread.

28.3 Lazy Rivers, take 2. Languages like Java don't have the yield statement explained in the Lazy Rivers style (Chapter 27). Implement the data-centric program in that chapter without using yield, and using the Actors style.

28.4 A different task. Write one of the tasks proposed in the Prologue using this style.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset