Chapter 30

Map Reduce

image

30.1 Constraints

  • Input data is divided in blocks.
  • A map function applies a given worker function to each block of data, potentially in parallel.
  • A reduce function takes the results of the many worker functions and recombines them into a coherent output.

30.2 A Program in this Style

  1 #!/usr/bin/env python
  2 import sys, re, operator, string
  3
  4 #
  5 # Functions for map reduce
  6 #
  7 def partition(data_str, nlines):
  8 """
  9 Partitions the input data_str (a big string)
 10 into chunks of nlines.
 11 """
 12 lines = data_str.split('
')
 13 for i in xrange(0, len(lines), nlines):
 14    yield '
'.join(lines[i:i+nlines])
 15
 16 def split_words(data_str):
 17 """
 18 Takes a string, returns a list of pairs (word, 1),
 19 one for each word in the input, so
 20 [(w1, 1), (w2, 1), ..., (wn, 1)]
 21 """
 22 def _scan(str_data):
 23    pattern = re.compile('[W_]+')
 24    return pattern.sub(' ', str_data).lower().split()
 25
 26 def _remove_stop_words(word_list):
 27    with open('../stop_words.txt') as f:
 28        stop_words = f.read().split(',')
 29    stop_words.extend(list(string.ascii_lowercase))
 30    return [w for w in word_list if not w in stop_words]
 31
 32 # The actual work of splitting the input into words
 33 result = []
 34 words = _remove_stop_words(_scan(data_str))
 35 for w in words:
 36    result.append((w, 1))
 37 return result
 38
 39 def count_words(pairs_list_1, pairs_list_2):
 40 """
 41 Takes two lists of pairs of the form
 42 [(w1, 1), ...]
 43 and returns a list of pairs [(w1, frequency), ...],
 44 where frequency is the sum of all the reported occurrences
 45 """
 46 mapping = dict((k, v) for k, v in pairs_list_1)
 47 for p in pairs_list_2:
 48    if p[0] in mapping:
 49        mapping[p[0]] += p[1]
 50    else:
 51        mapping[p[0]] = 1
 52 return mapping.items()
 53
 54 #
 55 # Auxiliary functions
 56 #
 57 def read_file(path_to_file):
 58 with open(path_to_file) as f:
 59    data = f.read()
 60 return data
 61
 62 def sort(word_freq):
 63 return sorted(word_freq, key=operator.itemgetter(1), reverse=
      True)
 64
 65 #
 66 # The main function
 67 #
 68 splits = map(split_words, partition(read_file(sys.argv[1]), 200))
 69 splits.insert(0, []) # Normalize input to reduce
 70 word_freqs = sort(reduce(count_words, splits))
 71
 72 for (w, c) in word_freqs[0:25]:
 73 print w, ' - ', c

30.3 Commentary

IN THIS STYLE, the problem's input data is divided into chunks, each chunk is processed independently of the others, possibly in parallel, and the results are combined at the end. The Map Reduce style, commonly known as MapReduce, comprises two key abstractions: (1) a map function takes chunks of data, as well as a function, as arguments, and applies that function to each chunk independently, producing a collection of results; (2) a reduce function takes a collection of results as well as a function, as arguments, and applies that function to the collection of results in order to extract some global knowledge out of that collection.

The key observation for the term frequency task is that counting words can be done in a divide-and-conquer manner: we can count words on smaller portions of the input file (e.g. each page of the book), and then combine those counts. Not all problems can be done in this manner, but term frequency can. When this is feasible, the MapReduce solution can be very effective for very large input data, with the use of several processing units in parallel.

Let's look at the example program, starting at the bottom, lines #68–73. In line #68, the main block starts by reading the input file, partitioning it into blocks of 200 lines; those blocks are given as the second parameter to Python's map function, which takes as first parameter a worker function split_words. The result of that map is a list of partial word counts, one from each worker function, which we called splits. We then prepare those splits for reduction (line #69) – more about this later. Once ready, the splits are given as the second argument to Python's reduce function, which takes as first argument the worker function count_words (line #70). The result of that application is a list of pairs, each corresponding to a word and corresponding frequency. Let's now look into the three main functions – partition, split_words and count_words – in detail.

The partition function (lines #7–14) is a generator that takes a multiline string and a number of lines as inputs, and generates strings with the requested number of lines. So, for example, Pride and Prejudice has 13,426 lines, so we are dividing it into 68 blocks of 200 lines (see line #68), with the last block having less than 200 lines. Note that the function yields, rather than returns, blocks. As seen before, this is a lazy way of processing input data, but it's functionally equivalent to returning the complete list of 68 blocks.

The split_words function (lines #16–37) takes a multi-line string – one block of 200 lines, as used in line #68 – and processes that block. The processing is similar to what we have seen before. However, this function returns its data in a very different format than that seen in other chapters for equivalent functions. After producing the list of non-stop words (lines #22–34), it iterates through that list constructing a list of pairs; the first element of each pair is a word occurrence and the second is the number 1, meaning "this word, one occurrence." For Pride and Prejudice's first block, the first few entries in the resulting list look like this:

[('project',1),('gutenberg',1),('ebook',1),
 ('pride',1),('prejudice',1), ('jane',1),
 ('austen',1),('ebook',1),...]

This seems a rather strange data structure, but it is common for MapReduce applications to make worker functions do as little computation as possible. In this case, we aren't even counting the number of occurrences of words in each block; we are simply transforming the block of words into a data structure that supports a very simple counting procedure later on.

To recapitulate, line #68 results in a list of these data, one for each of the 68 blocks.

The count_words function (lines #39–52) is the reducing worker used as the first argument to reduce in line #70. In Python, reducer functions have two arguments, which are meant to be merged in some way, and return one single value at the end. Our function takes two of the data structures described above: the first one is the result of the previous reduction, if any, starting with the empty list (line #69); the second is the new split to be merged. count_words starts by producing a dictionary out of the first list of pairs (line #46); it then iterates through the second list of pairs, incrementing the corresponding word counts in the dictionary (lines #47–51). At the end, it returns the dictionary as a list of key-value pairs. This return value is then fed into the next reduction, and the process continues until there are no more splits.

30.4 This Style in Systems Design

MapReduce is a naturally good fit for data-intensive applications where the data can be partitioned and processed independently, and the partial results recombined at the end. These applications benefit from the use of many computing units – cores, servers – that perform the mapping and reducing functions in parallel, therefore reducing the processing time by several orders of magnitude than that of a single processor. The next chapter looks into these variations of MapReduce in more detail.

Our example program, however, does not employ threads or concurrency. The example is more in line with the original LISP MapReduce. Language processors can implement the several applications of the mapped function in parallel, but that is not what Python does.1 Nevertheless, in this book this style is grouped with the styles for concurrent programming, because those are the applications that gain the most from this style.

30.5 Historical Notes

The concept of mapping and reducing sequences, as currently used, was included in Common LISP in the late 1970s. However, those concepts predate Common LISP by at least a decade. A version of map was present in McCarthy's LISP system in 1960, under the name of maplist; this function took another function as argument that was then mapped onto each successive tail of a list argument, rather than onto each element. By the mid-1960s many dialiects of LISP had mapcar, which maps the function onto each element. Reduce was known to LISP programmers in the early 1970s. Both map and reduce were present in APL for built-in scalar operations.

Several decades later, in the early 2000s, a variation of this model was made popular by Google, who applied it at the data center scale. The model was then adopted more widely with the emergence of open source MapReduce frameworks such as Hadoop.

30.6 Further Reading

MAC LISP (1967). MIT A.I. Memo No.116A. Available at: http://www.softwarepreservation.org/projects/LISP/MIT/AIM-116A-White-Interim_User_Guide.pdf
Synopsis: This is the manual for one of the flavors of LISP, the MAC LISP, listing the functions available in that programming system. The map functions are featured prominently.

Steele, G. (1984). Common LISP the Language. Chapter 14.2: Concatenating, Mapping and Reducing Sequences. Digital Press. Available at: http://www.cs.cmu.edu/Groups/AI/html/cltl/clm/clm.html
Synopsis: Common LISP had both map and reduce operations.

30.7 Glossary

Map: A function takes blocks of data, as well as a function, as arguments, and applies that function to each block independently, producing a collection of results.

Reduce: A function takes a collection of results as well as a function, as arguments, and applies that function to the current merged result and the next result in the collection in order to extract some global knowledge out of that collection.

30.8 Exercises

30.1 Another language. Implement the example program in another language, but preserve the style.

30.2 Partial counts. Change the example program so that split_words (lines #16–37) produces a list of partial word counts. Are there any advantages in doing this vs. doing what the original example program does?

30.3 Concurrency. Python's map and reduce functions are not multi-threaded. Write a concurrent_map function that takes a function and a list of blocks and launches a thread for each function application. Use your function instead of map in line #68. It's ok to make a few changes to the program, but try to minimize those changes.

30.4 A different task. Write one of the tasks proposed in the Prologue using this style.

1Python 3.x includes a new module called concurrent.futures that provides a concurrent implementation of map.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset