Hadoop Combiners

Mapper/Reducer like this:
1.    Your mapper may have gone through the records and output a key; value pair that looked like: day of week; value.
2.    For each day of the week, your reducer kept a running total of the value as well as a count of the number of records.
3.    You divided the total value by the number of records to get the mean.

But there’s a problem here. That second step involves moving a lot of data around your network. What if we could do some of the reduction locally before sending the data to the reducers?

Yes, We can use combiners! Combiners will, in essence, do as much reduction as possible locally before sending that data to the reducers.

  • This might save significant network traffic in you have a lot of records, but much less unique keys. You will need to add additional command to the command line script to use this functionality or use the full java command. Please see configuration comments for detailed information.
  • When you run a job, you will see some output on the screen, which includes a tracking url:
  • url
  • Open it in a browser
  • job
  • You will see a job page, containing information about the job, as it is being run
  • Here are the comparison screenshots from 2 jobs, one run without combiner and one with a combiner:
  • mrcombiner

As you can see, when using a reducer (second screenshot), reducers get significantly less records and have to shuffle less bytes than without a combiner. Without combiner – 4,138,472 records, with combiner – 28 records. While it does not lead to time savings on a single node pseudodistributed cluster run in a VM, like you have, in real world it could save a lot of network traffic.

Generally, we want our combiners to do the same thing as our reducers. Also, keep in mind that this intermediate step may introduce some complications when we try to do our computation. You’ll encounter that in the next exercise.

Configuration Commands
To use combiner, you will have to add a new shortcut command to your VM. In the terminal window type
gedit ~/.bashrc

In the editor that opens, find a function definition “run_mapreduce”. Copy the contents and create a new function (within the same file), let’s say “run_mapreduce_with_combiner”. Add the following “-combiner $2” right after “-reducer $2”.

And at the end of the file, add a line for the alias:
alias hsc=run_mapreduce_with_combiner

Now save the file and exit the gedit program. Run the following in the terminal:
source ~/.bashrc
This will reload the configuration file you just edited, and your new alias should be ready to use.

The new alias will take the second parameter (which is the reducer script) and also use it for combiner. If you want, you can actually make another alias, that allows you to use a different script for combiner. You would need to also -upload it, same as you did for mapper and reducer scripts.

# .bashrc
# User specific aliases and functions
alias rm='rm -i'
 alias cp='cp -i'
 alias mv='mv -i'
run_mapreduce() {
 hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.1.jar -mapper $1 -reducer $2 -file $1 -file $2 -input $3 -output $4
 }
alias hs=run_mapreduce
run_mapreduce_with_combiner() {
 hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.1.jar -mapper $1 -reducer $2 -combiner $2 -file $1 -file $2 -input $3 -output $4
 }
alias hs=run_mapreduce_with_combiner
# Source global definitions
 if [ -f /etc/bashrc ]; then
 . /etc/bashrc
 fi
mapper.py
#!/usr/bin/python
import sys
import csv
import re
from datetime import datetime

for line in sys.stdin:
    reader = csv.reader(sys.stdin, delimiter='\t')
    for line in reader:
        date, time, place, item, price, card = line
        weekday = datetime.strptime(date, "%Y-%m-%d").weekday()    
        print weekday,'\t', price



reducer.py
#!/usr/bin/python

import sys
import collections

sales = collections.defaultdict(list)

for line in sys.stdin:
    data_mapped = line.strip().split("\t")
    if len(data_mapped) != 2:
        continue

    thisKey, thisSale = data_mapped
    sales[int(thisKey)].append(float(thisSale))

for k in sales:
    print k, "\t", sum(sales[k])      

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: