MapReduce is a very simple programming paradigm that is particularly useful when dealing with a lot of data. That's not to say that it can't be used for a small set of data.

The fundamentals are simple enough that you could read the Wikipedia page and understand how to use it, but I will run through a complete example.

Counting Words

Let's say you have to write a program that counts unique words. So for example:

"We were going really, really fast!"

Would produce the output:

fast: 1
going: 1
really: 2
we: 1
were: 1

Simple enough. There are many ways to accomplish this depending how and where you store your data. Of course, since this is an article on MapReduce that's how we're going to do it.

The name "MapReduce" is the combination of two phases. The map phase, and the reduce phase. In practice theres a lot more too it than that, but I'm only going to talk about those here.

Map

Usually you will be processing a set of data, like records from a database. No matter how complicated the data you're trying to process is, the map phase emits events made of a key and a value. The key here is, well... the key. The key is very important in the reduce stage, next.

Here are the two records we wish to process:

records = [
"The buffalo from Buffalo who are buffaloed by buffalo",
"Buffalo, buffalo (verb) other buffalo from Buffalo",
]

For each of these records the map function will take the text, split it and emit each of the words as an event with the value of 1. The 1 represents how many times the word has occurred; not in all of time just in this single event.

# Split text into words, ignoring punctuation and capitalisation.
def mapper(emit, text):
words = re.split('[^\w]', text.lower())

for word in words:
if word:
emit(word, 1)

The program will later show us all the events emitted, but here's a spoiler:

emit(the: 1)
emit(buffalo: 1)
emit(from: 1)

The key also represents the queue that the value will be added to. After processing all the events each word will have a queue of values:

{
"from": [1, 1],
"who": [1],
"buffaloed": [1],
"verb": [1],
"are": [1],
"the": [1],
"other": [1],
"buffalo": [1, 1, 1, 1, 1, 1, 1],
"by": [1]
}

The order of the keys/queues has no relevance to how the data is processed. In fact, this is important because data is usually processed in extremely distributed environments.

Reduce

The reducer takes a list of a values for a given key/queue and performs some processing on them to return less (or the same) rows. The amount of values that the reducer receives varies a lot and is often based on runtime decisions made by the MapReduce algorithm. For the purposes of demonstrating I'll pick a fixed and small number.

In reality the emitter does not run entirely before the reducer. The algorithm will adapt at runtime to the amount of data and choose when is the right time to map or reduce as to minimise memory consumption.

Since we are counting words the reducer simply has to sum all the values:

def reducer(key, values):
total = 0
for value in values:
total += value

return [total]

It's important this is a sum and not a count operation because the reducer is run as many times as needed to reduce the set of values. Let's say we chose 3 as the maximum amount of records that can be reduced. When the reducer is run on the queue "buffalo" the values are continually reduced in groups of three until we are left with just one value:

[1, 1, 1, 1, 1, 1, 1]
[3, 3, 1]
[7]

MapReduce Runner

If the fundamentals above make sense, it doesn't take much code to tie the logic together into a very simple algorithm to perform the MapReduce:

class MapReducer:
def __init__(self, records, mapper, reducer):
self.queues = {}
self.records = records
self.mapper = mapper
self.reducer = reducer
self.total_emits = 0

# Create the queue if it does not exist yet, or append to an existing queue.
def emit(self, key, value):
if key not in self.queues:
self.queues[key] = [value]
else:
self.queues[key].append(value)

self.total_emits += 1
print "emit(%s: %s)" % (key, value)

# Start the MapReducer and return the final result.
def run(self):
# Map each of the records.
for record in self.records:
self.mapper(self.emit, record)

print "Finished mapping %d records into %d queues with %d values.\n" % (
len(self.records), len(self.queues), self.total_emits
)

# Reduce until it we have just one value for each queue.
size = 3
for queue in self.queues:
while len(self.queues[queue]) > 1:
queue_size = len(self.queues[queue])
new_queue = []
for i in xrange(0, len(self.queues[queue]), size):
values = self.queues[queue][i:i + size]
new_queue.extend(self.reducer(queue, values))

self.queues[queue] = new_queue

print "Reduced %d values for '%s' into %d values." % (
queue_size, queue, len(self.queues[queue]))
print

# Print the result.
for queue in self.queues:
print "%s: %s" % (queue, self.queues[queue][0])

To run it:

mr = MapReducer(records, mapper, reducer)
mr.run()


Discuss

MapReduce really shines when dealing with a lot of data because it can be so easily distributed by the MapReduce algorithm across CPUs and other servers.

Theres so much more to it, but I'll leave that for the next article. Stay tuned!