Introduction to MapReduce with Hadoop on Linux

When your data and work grow, and you still want to produce results in a timely manner, you start to think big. Your one beefy server reaches its limits. You need a way to spread your work across many computers. You truly need to scale out.

In pioneer days they used oxen for heavy pulling, and when one ox couldn't budge a log, they didn't try to grow a larger ox. We shouldn't be trying for bigger computers, but for more systems of computers.—Grace Hopper

Clearly, cluster computing is old news. What's changed? Today:

  • We collect more data than ever before.
  • Even small-to-medium-size businesses can benefit from tools like Hadoop and MapReduce.
  • You don't have to have a PhD to create and use your own cluster.
  • Many decent free/libre open-source tools can help you easily cluster commodity hardware.

Let me start with some simple examples that will run on one machine and scale to meet larger demands. You can try them on your laptop and then transition to a larger cluster—like one you've built with commodity Linux machines, your company or university's Hadoop cluster or Amazon Elastic MapReduce.

Parallel Problems

Let's start with problems that can be divided into smaller independent units of work. These problems are roughly classified as "embarrassingly parallel" and are—as the term suggests—suitable for parallel processing. Examples:

  • Classify e-mail messages as spam.
  • Transcode video.
  • Render an Earth's worth of map tile images.
  • Count logged lines matching a pattern.
  • Figure out errors per day of week for a particular application.

Now the hard work begins. Parallel computing is complex. Race conditions, partial failure and synchronization impede our progress. Here's where MapReduce saves our proverbial bacon.

MapReduce by Example

MapReduce is a coding pattern that abstracts much of the tricky bits of scalable computations. We're free to focus on the problem at hand, but it takes practice. So let's practice!

Say you have 100 10GB log files from some custom application—roughly a petabyte of data. You do a quick test and estimate it will take your desktop days do grep every line (assuming you even could fit the data on your desktop). And, that's before you add in logic to group by host and calculate totals. Your tried-and-true shell utilities won't help, but MapReduce can handle this without breaking a sweat.

First let's look at the raw data. Log lines from the custom application look like this:


localhost: restarting
dsl5.example.com: invalid user 'bart'
dsl5.example.com: invalid user 'charlie'
dsl5.example.com: invalid user 'david'
dsl8.example.net: invalid password for user 'admin'
dsl8.example.net: user 'admin' logged in

The log format is hostname, colon, message. Your boss suspects someone evil is trying to brute-force attack the application. The same host trying many different user names may indicate an attack. He wants totals of "invalid user" messages grouped by hostname. Filtering the above log lines should yield:


dsl5.example.com        3

With gigabytes of log files, your trusty shell tools do just fine. For a terabyte, more power is needed. This is a job for Hadoop and MapReduce.

Before getting to Hadoop, let's summon some Python and test locally on a small dataset. I'm assuming you have a recent Python installed. I tested with Python 2.7.3 on Ubuntu 12.10.

The first program to write consumes log lines from our custom application. Let's call it map.py:


#!/usr/bin/python
import sys
for line in sys.stdin:
  if 'invalid user' in line:
    host = line.split(':')[0]
    print '%s\t%s' % (host, 1)

map.py prints the hostname, a tab character and the number 1 any time it sees a line containing the string "invalid user". Write the example log lines to log.txt, then test map.py:


chmod 755 map.py
./map.py < log.txt

The output is:


dsl5.example.com        1
dsl5.example.com        1
dsl5.example.com        1

Output of map.py will be piped into our next program, reduce.py:


#!/usr/bin/python
import sys
last_host = None
last_count = 0
host = None
for line in sys.stdin:
  host, count = line.split('\t')
  count = int(count)
  if last_host == host:
    last_count += count
  else:
    if last_host:
      print '%s\t%s' % (last_host, last_count)
    last_host = host
    last_count = count
if last_host == host:
  print '%s\t%s' % (last_host, last_count)

reduce.py totals up consecutive lines of a particular host. Let's assume lines are grouped by hostname. If we see the same hostname, we increment a total. If we encounter a different hostname, we print the total so far and reset the total and hostname. When we exhaust standard input, we print the total if necessary. This assumes lines with the same hostname always appear consecutively. They will, and I'll address why later. Test by piping it together with map.py like so:


chmod 755 reduce.py
./map.py < log.txt | sort | ./reduce.py

Later, I'll explain why I added sort to the pipeline. This prints:


dsl5.example.com        3

Exactly what we want. A successful test! Our test log lines contain three "invalid user" messages for the host dsl5.example.com. Later we'll get this local test running on a Hadoop cluster.

Let's dive a little deeper. What exactly does map.py do? It transforms unstructured log data into tab-separated key-value pairs. It emits a hostname for a key, a tab and the number 1 for a value (again, only for lines with "invalid user" messages). Note that any number of log lines could be fed to any number of instances of the map.py program—each line can be examined independently. Similarly, each output line of map.py can be examined independently.

Output from map.py becomes input for reduce.py. The output of reduce.py (hostname, tab, number) looks very similar to its input. This is by design. Key-value pairs may be reduced multiple times, so reduce.py must handle this gracefully. If we were to re-reduce our final answer, we would get the exact same result. This repeatable, predictable behavior of reduce.py is known as idempotence.

We just tested with one instance of reduce.py, but you could imagine many instances of reduce.py handling many lines of output from map.py. Note that this works only if lines with the same hostname appear consecutively. In our test, we enforce this constraint by adding sort to the pipeline. This simulates how our code behaves within Hadoop MapReduce. Hadoop will group and sort input to reduce.py similarly.

We don't have to bother with how execution will proceed and how many instances of map.py and reduce.py will run. We just follow the MapReduce pattern and Hadoop does the rest.

______________________

Adam Monsen is a seasoned software engineer and FLOSS zealot. He lives with his wonderful wife and children in Seattle, Washington. He blogs semi-regularly at adammonsen.com.

Comments

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.

correction: terabyte, not petabyte

meonkeys's picture

Say you have 100 10GB log files from some custom application—roughly a petabyte of data.

That should read "roughly a terabyte", since, as one reader pointed out, 100 x 10GB is 1 TB.

My comment on that

Dan Smilry's picture

"Say you have 100 10GB log files from some custom application—roughly a petabyte of data. You do a quick test and estimate it will take your desktop days do grep every line (assuming you even could fit the data on your desktop)." That's a huge amount of time, and not so accurate testing.

Good point. That's why I said

meonkeys's picture

Good point. That's why I said "estimate"!

You can strive them on your

Marjorie Lanphear's picture

You can strive them on your laptop computer and so transition to a bigger cluster—like one you've got engineered with goods UNIX system machines, your company or university's Hadoop cluster or Amazon Elastic MapReduce.

  • If you would like wager on-line then this pokies have all the top internet gambling games.
White Paper
Linux Management with Red Hat Satellite: Measuring Business Impact and ROI

Linux has become a key foundation for supporting today's rapidly growing IT environments. Linux is being used to deploy business applications and databases, trading on its reputation as a low-cost operating environment. For many IT organizations, Linux is a mainstay for deploying Web servers and has evolved from handling basic file, print, and utility workloads to running mission-critical applications and databases, physically, virtually, and in the cloud. As Linux grows in importance in terms of value to the business, managing Linux environments to high standards of service quality — availability, security, and performance — becomes an essential requirement for business success.

Learn More

Sponsored by Red Hat

White Paper
Private PaaS for the Agile Enterprise

If you already use virtualized infrastructure, you are well on your way to leveraging the power of the cloud. Virtualization offers the promise of limitless resources, but how do you manage that scalability when your DevOps team doesn’t scale? In today’s hypercompetitive markets, fast results can make a difference between leading the pack vs. obsolescence. Organizations need more benefits from cloud computing than just raw resources. They need agility, flexibility, convenience, ROI, and control.

Stackato private Platform-as-a-Service technology from ActiveState extends your private cloud infrastructure by creating a private PaaS to provide on-demand availability, flexibility, control, and ultimately, faster time-to-market for your enterprise.

Learn More

Sponsored by ActiveState