Easy Map-Reduce With Hadoop Streaming

If you're considering doing large scale analysis of structured data (access logs, for example), there are dozens of enterprise-level solutions ranging from specialized streaming databases, to the more mundane data warehousing solutions with star topologies and column store semantics. Google, facing the same problem, developed a system called Sawzall, which leverages their existing Map-Reduce clusters for large scale parallel data analysis by adding a DSL for easy manipulation of data.

Undoubtedly inspired by Google's work, the guys at NY Times recently released MRToolkit, a Ruby framework for setting up and running Apache Hadoop jobs with a heavy Sawzall flavor. Short of a great community contribution, and a great source tree to get familiar with, perhaps the most interesting part of the project is the great showcase of Hadoop Streaming at work - an interface worth getting familiar with.

Hadoop Streaming: Simple Map-Reduce

Writing a Map/Reduce job in Hadoop usually entails writing two Java functions: a map which splits the dataset into independent chunks, and a reduce which combines the results to perform some useful analysis. The framework takes care of the sorting, coordination and scheduling, and thus provides the underlying abstraction for distributed computing. However, although the framework is implemented in Java, Hadoop's Streaming interface is an important, and an often overlooked feature, which allows us to write Map/Reduce applications in any language that is capable of working with STDIN and STDOUT!

In fact, we can create a simple Map/Reduce word count application with nothing but standard *nix tools available on any machine: cat, and wc. Assuming your data is already loaded into an HDFS cluster, we can kick-off our job:

$ $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /bin/wc

As the example shows, our map and reduce scripts can be any executable that reads input from STDIN (line by line) and emit output to STDOUT. Have a Ruby/Python/Bash script that is capable of both? Congratulations, you can write a Map/Reduce job on Hadoop!

MRToolkit: Hadoop + Ruby = Sawzall

Map/Reduce Toolkit by NY Times engineers is a great example of a Ruby DSL on top of the Hadoop Streaming interface. Specifically aimed at simplifying their internal log processing jobs, it exposes just the necessary bits for handling the access log inputs and provides a number of predefined reduce steps: unique, counter, etc. For example, to get a list of all unique visitor IP's, the entire program consists of:

require 'mrtoolkit'

class MainMap < MapBase
  def declare
    # declare log fields
    field :ip
    field :request
    field :status

    emit :ip_ua
    emit :ip
    emit :ua
  end

  def process(input, output)
    ua = input.ua.split(/\s/)[0]
    output.ip_ua = "#{input.ip}|#{ua}"
    output.ip = input.ip
    output.ua = ua
    output
  end
end

class MainJob < JobBase
  def job
    mapper MainMap
    reducer UniqueCountReduce, 2
    indir "logs"
    outdir "ip-ua"
  end
end

Take a closer look at the execution logic, as that's where all the magic happens for spooling up Hadoop jobs and invoking the Ruby interpreter. This pattern can be easily adapted for any language and offers a very powerful way to leverage Hadoop instead of trying to build your own distributed processing layer.

In fact, we should think of other higher-order frameworks we can build on top of this pattern: data mining & machine learning, video & audio processing, etc., it's all fair game!

Ilya GrigorikIlya Grigorik is a web ecosystem engineer, author of High Performance Browser Networking (O'Reilly), and Principal Engineer at Shopify — follow on Twitter.