Lesson 5: Hadoop and MapReduce

Now that we’ve gone through how to do some basic tweet processing in Python, we need to scale this up so we are able to process many tweets together. While individual documents can be interesting, usually we want to gain some information about trends across many documents to make a more general statement.

Enter Hadoop and MapReduce. Hadoop is a distributed file system that allows for distributed processing of large data sets across clusters of computers. MapReduce is a system for parallel processing of large data sets. The Hadoop project is maintained by the Apache Foundation and is distributed under a F/OSS license.

Why should we use something like Hadoop and MapReduce for large data processing?

  • Scalable – Hadoop lets us run analyses at scale. For analyses that may take hours or days to complete on a single commodity machine, we can do in minutes.
  • Extendable – The more servers we add, the easier it makes it to process data at scale.
  • Fast – Processing many items quickly and in parallel.
  • Cheap – Adding commodity servers to the setup lets us add more processing power for lower costs.

The figure below shows the typical MapReduce processing flow. There are effectively three processes that go on here:

  1. Map process – take some of data and split them between distinct keys and values
  2. Intermediate process – shuffle the keys into some logically order (this usually means sorting them)
  3. Reduce process – group data by keys and apply some function to their values


Source: http://blog.jteam.nl/2009/08/04/introduction-to-hadoop/

Exercise: Word count

One of the easiest MapReduce taks is to count the words for a set of documents. The workflow of doing this in MapReduce could be described with the following pseudocode:

  1. Mapper:
    for each line of input:
        remove punctuation
        convert to lowercase
        for each word in the line:
            emit word, 1
  2. Intermediate:
    sort keys
  3. Reducer:
    for each key:
        n = sum of all the values
    

We’re going to try that on a small set of test data. First, you should grab the bundle of goodies that we’re going to use for the next three modules. Make sure that you are in your sandbox directory. Then, type the following commands into your terminal:

me@blogclub:~/sandbox$ wget http://www.alex-hanna.com/tworkshops/wp-content/uploads/2012/11/november-tworkshop.tar
me@blogclub:~/sandbox$ tar xvf november-tworkshop.tar

The first command downloads tar file, which is an archive like a zip file. The second command “untars” it in your sandbox directory.

Next, you want to change directories so you are in sandbox/november-tworkshop/bin.

Now we get to the fun part. The data in the tar file includes every inaugural speech by a US president. These are in the directory sandbox/november-tworkshop/data/inaugural.

Just for kicks, try the following command.

me@blogclub:~/sandbox/november-tworkshop/bin$ cat ../data/inaugural/*.txt

That will spit out all the text from those speeches into your terminal. Let’s move towards making some more sense of it.

One of the files included in the tar archive is a Python script called wordcount.py, which is inside of sandbox/november-tworkshop/bin/. You type cat wordcount.py to see the contents of the file.

#/usr/bin/env python

import string, sys

for line in sys.stdin:
    ## removes the whitespace
    line = line.strip()

    ## removes punctuation
    line = line.translate( string.maketrans(string.punctuation, ' ' * len(string.punctuation)) )

    ## makes sure that the text is a valid Unicode string
    ## ignores the characters that are not valid
    line = unicode(line, errors='ignore')

    ## converts the text to lowercase 
    line = line.lower()

    ## this splits words at all whitespace
    words = line.split(None)

    ## prints out all the words with a count of 1
    for w in words:
        print "\t".join([w, "1"])

What this does is what is outlined above, takes input from stdin (this means whatever is passed in via the pipe), removes punctuation, turns all letters to lowercase, and tokenizes then. Then it prints out each word and the number one. The reason that it just prints out the number one is because we are trying to collect counts across many different computing nodes, which will eventually come together in the Reducer process. The key is the word and the value is one.

Try running the following command to simulate what the mapper task does:

me@blogclub:~/sandbox/november-tworkshop/bin$ cat ../data/inaugural/*.txt | python wordcount.py

You’ll get a bunch of output, and every word ever said at a presidential inaugural address on one line.

Now to the intermediate task. All we are doing for the intermediate task is sorting by the key. For this we don’t have to write another Python program because UNIX-based systems have a built-in sort program, aptly called sort.

Do the following:

me@blogclub:~/sandbox/november-tworkshop/bin$ cat ../data/inaugural/*.txt | python wordcount.py | sort

Now you should see all the lines you ran before, except sorted by word.

The last part is the reducer. For the reducer we will use the script nReduce.py. This is the code (which is adapted from another Hadoop MapReduce tutorial):

from operator import itemgetter
import sys

def main():
    if len(sys.argv) < 2:
        print "Usage: nReduce.py "
        sys.exit(0)

    c_key   = None
    c_count = 0
    nkey    = int(sys.argv[1])

    # input comes from STDIN
    for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()

        # parse the input we got from mapper
        row   = line.split('\t')
        key   = "\t".join( row[0:nkey] )
        count = row[nkey]

        # convert count (currently a string) to int
        try:
            count = int(count)
        except ValueError:
            # count was not a number, so silently
            # ignore/discard this line
            continue

        # this IF-switch only works because Hadoop sorts map output
        # by key (here: word) before it is passed to the reducer
        if c_key == key:
            c_count += count
        else:
            if c_key:
                # write result to STDOUT
                print '%s\t%s' % (c_key, c_count)
            c_count = count
            c_key   = key

    # do not forget to output the last word if needed!
    if c_key == key:
        print '%s\t%s' % (c_key, c_count)

if __name__ == '__main__':
    main()

This is a lot of code, but the basic idea behind this is that it attempts to sum up all the values for a given key. For instance, since we see “youth 1” in sorted output 4 times, this adds all the 1’s in those lines together and spits out “youth 4”.

Let’s complete the pattern now and run all these commands together. You have to be sure to type the number “1” after python nReduce.py or you will get an error. This tells the program how many fields are in the key. You’ll see how this works differently in the next module.

me@blogclub:~/sandbox/november-tworkshop/bin$ cat ../data/inaugural/*.txt | python wordcount.py | sort | python nReduce.py 1

And voilà! You’ve counted the words in all US inaugural addresses. You can throw this stuff into a statpack like SPSS or R to do something more interesting with the data. Here’s a plot of the words. Unsurprisingly, stuff like “the” and “a” come up closer to the front. But ‘government’ comes in later on.