Lesson 5: Hadoop and MapReduce
Now that we’ve gone through how to do some basic tweet processing in Python, we need to scale this up so we are able to process many tweets together. While individual documents can be interesting, usually we want to gain some information about trends across many documents to make a more general statement.
Enter Hadoop and MapReduce. Hadoop is a distributed file system that allows for distributed processing of large data sets across clusters of computers. MapReduce is a system for parallel processing of large data sets. The Hadoop project is maintained by the Apache Foundation and is distributed under a F/OSS license.
Why should we use something like Hadoop and MapReduce for large data processing?
- Scalable – Hadoop lets us run analyses at scale. For analyses that may take hours or days to complete on a single commodity machine, we can do in minutes.
- Extendable – The more servers we add, the easier it makes it to process data at scale.
- Fast – Processing many items quickly and in parallel.
- Cheap – Adding commodity servers to the setup lets us add more processing power for lower costs.
The figure below shows the typical MapReduce processing flow. There are effectively three processes that go on here:
- Map process – take some of data and split them between distinct keys and values
- Intermediate process – shuffle the keys into some logically order (this usually means sorting them)
- Reduce process – group data by keys and apply some function to their values
Exercise: Word count
One of the easiest MapReduce taks is to count the words for a set of documents. The workflow of doing this in MapReduce could be described with the following pseudocode:
for each line of input: remove punctuation convert to lowercase for each word in the line: emit word, 1
for each key: n = sum of all the values
We’re going to try that on a small set of test data. First, you should grab the bundle of goodies that we’re going to use for the next three modules. Make sure that you are in your
sandbox directory. Then, type the following commands into your terminal:
me@blogclub:~/sandbox$ wget http://www.alex-hanna.com/tworkshops/wp-content/uploads/2012/11/november-tworkshop.tar me@blogclub:~/sandbox$ tar xvf november-tworkshop.tar
The first command downloads tar file, which is an archive like a zip file. The second command “untars” it in your sandbox directory.
Next, you want to change directories so you are in
Now we get to the fun part. The data in the tar file includes every inaugural speech by a US president. These are in the directory
Just for kicks, try the following command.
me@blogclub:~/sandbox/november-tworkshop/bin$ cat ../data/inaugural/*.txt
That will spit out all the text from those speeches into your terminal. Let’s move towards making some more sense of it.
One of the files included in the tar archive is a Python script called
wordcount.py, which is inside of
sandbox/november-tworkshop/bin/. You type
cat wordcount.py to see the contents of the file.
#/usr/bin/env python import string, sys for line in sys.stdin: ## removes the whitespace line = line.strip() ## removes punctuation line = line.translate( string.maketrans(string.punctuation, ' ' * len(string.punctuation)) ) ## makes sure that the text is a valid Unicode string ## ignores the characters that are not valid line = unicode(line, errors='ignore') ## converts the text to lowercase line = line.lower() ## this splits words at all whitespace words = line.split(None) ## prints out all the words with a count of 1 for w in words: print "\t".join([w, "1"])
What this does is what is outlined above, takes input from stdin (this means whatever is passed in via the pipe), removes punctuation, turns all letters to lowercase, and tokenizes then. Then it prints out each word and the number one. The reason that it just prints out the number one is because we are trying to collect counts across many different computing nodes, which will eventually come together in the Reducer process. The key is the word and the value is one.
Try running the following command to simulate what the mapper task does:
me@blogclub:~/sandbox/november-tworkshop/bin$ cat ../data/inaugural/*.txt | python wordcount.py
You’ll get a bunch of output, and every word ever said at a presidential inaugural address on one line.
Now to the intermediate task. All we are doing for the intermediate task is sorting by the key. For this we don’t have to write another Python program because UNIX-based systems have a built-in sort program, aptly called
Do the following:
me@blogclub:~/sandbox/november-tworkshop/bin$ cat ../data/inaugural/*.txt | python wordcount.py | sort
Now you should see all the lines you ran before, except sorted by word.
The last part is the reducer. For the reducer we will use the script
nReduce.py. This is the code (which is adapted from another Hadoop MapReduce tutorial):
from operator import itemgetter import sys def main(): if len(sys.argv) < 2: print "Usage: nReduce.py " sys.exit(0) c_key = None c_count = 0 nkey = int(sys.argv) # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper row = line.split('\t') key = "\t".join( row[0:nkey] ) count = row[nkey] # convert count (currently a string) to int try: count = int(count) except ValueError: # count was not a number, so silently # ignore/discard this line continue # this IF-switch only works because Hadoop sorts map output # by key (here: word) before it is passed to the reducer if c_key == key: c_count += count else: if c_key: # write result to STDOUT print '%s\t%s' % (c_key, c_count) c_count = count c_key = key # do not forget to output the last word if needed! if c_key == key: print '%s\t%s' % (c_key, c_count) if __name__ == '__main__': main()
This is a lot of code, but the basic idea behind this is that it attempts to sum up all the values for a given key. For instance, since we see “youth 1” in sorted output 4 times, this adds all the 1’s in those lines together and spits out “youth 4”.
Let’s complete the pattern now and run all these commands together. You have to be sure to type the number “1” after
python nReduce.py or you will get an error. This tells the program how many fields are in the key. You’ll see how this works differently in the next module.
me@blogclub:~/sandbox/november-tworkshop/bin$ cat ../data/inaugural/*.txt | python wordcount.py | sort | python nReduce.py 1
And voilà! You’ve counted the words in all US inaugural addresses. You can throw this stuff into a statpack like SPSS or R to do something more interesting with the data. Here’s a plot of the words. Unsurprisingly, stuff like “the” and “a” come up closer to the front. But ‘government’ comes in later on.