Hadoop Designing With Patterns

Filtering Patterns: Don’t change records. Only get a part of the data!. Examples:  Simple Filter

Bloom Filter (more efficient), Sampling, Random Sampling, Top K

Top K

In RDBMS you would normally first sort the data, then take top K records.  In mapreduce this kind of approach will not work, because the data is not sorted and is processed on several machines. Thus, the mappers will first have to find their own top K lists, without sorting the data, and then send the local lists to the reducers who then can find the global top K list.

Let’s find the top 5 longest posts in our forum!

Your mapper function should print out 10 lines containing longest posts, 
sorted in ascending order from shortest to longest.
import sys
import csv

def mapper():
    reader = csv.reader(sys.stdin, delimiter='\t')
    writer = csv.writer(sys.stdout, delimiter='\t', quotechar='"', quoting=csv.QUOTE_ALL)
    res = []
    for line in reader:
        res.append([len(line[4]), line])
    res = sorted(res, reverse=True) 
    for i in range(10):

test_text = """\"\"\t\"\"\t\"\"\t\"\"\t\"333\"\t\"\"

# This function allows you to test the mapper with the provided test string
def main():
    import StringIO
    sys.stdin = StringIO.StringIO(test_text)
    sys.stdin = sys.__stdin__


Download the file and unzip in Linux

curl -O 'http://content.uc-data.com/course/hadoop/forum_data.tar.gz'
tar zxvf forum_data.tar.gz

Summarization Patterns: Give top level view of data. Examples: Numerical Summarizations (counting, min, max, mean, median, first, last, standard deviation, inverted index)
Inverted Index – Build a reverse index from a data set, to enable faster searching. The example would be a web search engine. You need to create a mapping from keywords to web links, to enable faster finding of relevant information. Think of it as an index for a book – you have a word, or term, and all pages you can find this term.

import sys
import csv
import re
#map_file = open('forum_node_map.txt', 'w')
for line in sys.stdin:
    reader = csv.reader(sys.stdin, delimiter='\t')
    #writer = csv.writer(sys.stdout, delimiter='\t', quotechar='"', quoting=csv.QUOTE_ALL)
    #delimiters =('.','$',';','!','?',':','(',')','[',']','<','>','#','=','-','/',' ')
    delimiters =('\n\r')
    regexPattern = '|'.join(map(re.escape, delimiters))

    words = {}
    for line in reader:
        node_id =line[0]
          title =line[1].lower()
        forum =line[4].lower()
        forum = re.split(regexPattern,forum)
        title = re.split(regexPattern,title)
        for t in title:
            if (len(t)>1):               
              print node_id,"\t",t
              print>>map_file, node_id,"\t",t
        for f in forum:
            if (len(f)>1):
                print node_id,"\t",f
 #print>>map_file, node_id,"\t",f
import sys
import csv

reader = csv.reader(sys.stdin, delimiter='\t')
for line in reader:
    forum =line[1]
    nid = line[0]
    if forum.find("fantastic")!=-1:
    if forum.find("fantastically")!=-1:
print i, sorted(nodeid)

#test code in local machine
def main():
    import StringIO
    #sys.stdin = StringIO.StringIO(red_text)
    #"rb" reads special character
    sys.stdin = open('forum_node.tsv', 'rb')
    sys.stdin = open('forum_node_map.txt', 'rb')
    sys.stdin = sys.__stdin__

345 [17583, 1007765, 1025821, 7004477, 9006895] 

Structured-to-Hierarchical Patterns:  Examples: combining 2 datasets.
When migrating data from an RDBMS to a Hadoop system, one of the first things you should consider doing is reformatting your data. Since Hadoop doesn’t care what format your data is in, you can take advantage of hierarchical data to avoid doing live joins of several datasets at the time of analysis. If you know what kind of information you will want to get later, you might save significant time by reformatting the data.

You can use this pattern if you have data sources that are linked by some set of foreign keys and your data is structured and row-based.

Post table will contain information like this: (forum_node.tsv)

"id"    "title"    "tagnames"    "author_id"    "body"    "node_type"    "parent_id"    "abs_parent_id"    "added_at"    "score"    "state_string"    "last_edited_id"    "last_activity_by_id"    "last_activity_at"    "active_revision_id"    "extra"    "extra_ref_id"    "extra_count"    "marked"
"34052"    "Office hours 3 europe"    "cs101 officehours"    "100001302"    "<p>I live in Innsbruck, Tirol, Austria  </p>"    "question"    "\N"    "\N"    "2012-03-13 11:53:39.439269+00"    "1"    ""    "\N"    "100001302"    "2012-03-13 11:53:39.439269+00"    "44424"    "\N"    "\N"    "133"    "f"

User table will have this: (forum_users.tsv)

"author_id"    "reputation"    "gold"    "silver"    "bronze"
"100001302"    "79"    "0"    "2"    "7"

hadoop foruminput folder contains  2 files – forum_node.tsv  & forum_users.tsv

run –

hs mapcombine.py redcombine.py foruminput cobmineoutput

mapper will print the following output

"id"    "title"    "tagnames"    "author_id"    "body"    "node_type"    "parent_id"    "abs_parent_id"    "added_at"    "score"    "state_string"    "last_edited_id"    "last_activity_by_id"    "last_activity_at"    "active_revision_id"    "extra"    "extra_ref_id"    "extra_count"    "marked"
"author_id"    "reputation"    "gold"    "silver"    "bronze"
"34052"    "Office hours 3 europe"    "cs101 officehours"    "100001302"    "<p>I live in Innsbruck, Tirol, Austria and live office hours were too early for me(4a.m.), maybe next time you can make them at a different time?<br>
But anyhow guys , thanks for this great course, it is really a fantastic opportunity for me to learn<br>
some good python basics. </p>"    "question"    "\N"    "\N"    "2012-03-13 11:53:39.439269+00"    "1"    ""    "\N"    "100001302"    "2012-03-13 11:53:39.439269+00"    "44424"    "\N"    "\N"    "133"    "f"
"100001302"    "79"    "0"    "2"    "7"

Reducer will print the following output

"id"    "title"    "tagnames"    "author_id"    "node_type"    "parent_id"    "abs_parent_id"    "added_at"    "score"    "reputation"    "gold"    "silver"    "bronze"
"34052"    "Office hours 3 europe"    "cs101 officehours"    "100001302"    "question"    "\N"    "\N"    "2012-03-13 11:53:39.439269+00"    "1"    "79"    "0"    "2"    "7"


import sys
import csv

def mapper():
    reader = csv.reader(sys.stdin, delimiter='\t')
    writer = csv.writer(sys.stdout, delimiter='\t', quotechar='"', quoting=csv.QUOTE_ALL)

    for line in reader:
        postuser = []
        if len(line) == 5: # line comes from forum_user, just pick the ones needed
          # user_ptr_id, reputation, gold, silver, bronze ||||| Total fields 5
          postuser = [line[0], "A", line[1], line[2], line[3], line[4]]

        if len(line) == 19: # line comes from forum_node, just pick the ones needed
          # id, title, tagnames, author_id, body, node_type,
          # parent_id, abs_parent_id, added_at, score, state_string,
          # last_edited_id, last_activity_by_id, last_activity_at,
          # active_revision_id, extra, extra_ref_id, extra_count,
          # marked ||||| Total fields 19
          postuser = [line[3], "B", line[0], line[1], line[2], line[5], line[6], line[7], line[8], line[9]]



# Here you will be able to combine the values that come from 2 sources
# Value that starts with A will be the user data
# Values that start with B will be forum node data

import sys
import csv

def reducer():
    post = [] # store post data
    user = [] # store user data
    oldUser = None

    for line in sys.stdin:
        data = line.strip().split("\t")
        #print data[1], data[0], len(data), currentUser
        if oldUser and oldUser != data[0]:
            oldUser = data[0];
            #print "cu", len(user), len(post) , post[2]
            #print user
            #print data
            if len(user) != 6 or len(post) != 10:
            mergePostUser(user, post)

        oldUser = data[0]
        if data[1] == "\"A\"":
          user = list(data)
        if data[1] == "\"B\"":
          post = list(data)
    if oldUser != None:
        if len(user) == 6 and len(post) == 10:
            mergePostUser(user, post)

def mergePostUser(user, post):
    # user = [author_id, "A", reputation, gold, silver, bronze]
    del user[0] # remove author_id
    del user[0] # remove "A"
    # user  --> [reputation, gold, silver, bronze]

    # post = [author_id, "B", id, title, tagNames, nodeType, parentId, absParentId, addedAt, score]
    aid = post[0]
    del post[0] # remove author_id
    del post[0] # remove "B"
    post.insert(3, aid) # add authord_id at 5 position
    # post = [id, title, tagNames, author_id, nodeType, parentId, absParentId, addedAt, score]

    # print everything tab separated
    print ''.join(post) + "\t" + "\t" . join(user)


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: