Secondary Sorting in Hadoop

September 25, 2013 Garima Dosi

We continue our technical series with a post on Secondary Sorting in Hadoop.

Let’s start by defining a use cases which will be used in this post to illustrate the concept.

Data Description

Clickstream data with following fields: click_time, ip_addr, page_url, page_name, session_id, country

Use case

Use-Case-1: On clickstream data, to analyze the click trends for each website country-wise

Hadoop Sorting, Grouping and Partitioning

Sorting, grouping and partitioning are done in shuffle-sort phase of a MapReduce task. But how is it done is a question? So, it is known that the mapper passes {key, value} or {K,V} pairs to the reduce phase. The basic requirement of the key class is that it should implement the WritableComparable interface and value class should be of a Writable type. All Hadoop data types like IntWritable, Text etc do implement the WritableComparable interface and hence are eligible for being used as a key class. The method in WritableComparable method which aids sorting is:

public int compareTo(WritableComparable o){}

It compares two values as per the comparison logic wriiten in it and returns an integer number that indicates inequality(<,>) or equality(=) of the values. The sorting mechanism calls this method to determine the order of all {K, V} emitted by mappers. Apparently, default sorting is done on compareTo method of the key class. And this is advantageous for secondary sorting (which is described later)

Moving on to partitioning and grouping! Other than sorting, shuffle-sort phase is also used to distribute the mapper values amongst different reducers. This is done by partitioning the key values. Partitioner decides which {K, V} pair should be assigned to which reducer for processing. It is implemented by using a subclass of the Partitioner class. This class has a method:

public int getPartitions(K key, V value,int numReduceTasks) {}

which has to be overridden to return the partition/reduce task number for a particular {K, V} pair passed as input. (One can see that to start a single reduce task for a map reduce job, we can simply override this method to return a single value always. The better way is obviously to set the number of reduce tasks to 1 in job configuration). By default, HashPartitioner is used to partition the data among different reducers. partition() method of HashPartitioner class uses the hashCode() and equals() method of the key class to determine the partition. This would be again helpful in making secondary sort simple as we will see later.

While partitioning decides the reduce task, grouping decides which {K, V} pairs should be clubbed up in one reduce call. Each reduce task’s data is divided into various groups and reduce () method is called for each group (multiple reduce method calls may be made within one reduce task. Number of groups = number of reduce calls). But the bigger question is how are groups decided? The groups are decided by using a comparator again. The group comparator class should extend from WritableComparator class and override its compareTo() method. While grouping, {K, V} pairs are considered to be in the same group till the compareTo() call returns a 0.  A new group is formed once a non-zero value is returned from the group comparator. This also implies that the keys should be sorted prior to grouping (which is already taken care in the sort phase); else grouping may emit incorrect results. By default, the key class’ compareTo() that was used for sorting is also used for group comparison. Therefore, if the map output’s key class is Text then sorting and grouping would use the same comparator written for Text class.

Secondary Sorting

A normal sort would generally involve sorting on one field’s data or sorting on multiple fields such that they could be represented by a single data type. For instance, in the clickstream dataset if we had to order the data by page_url, we would have chosen a Text type. We would also have chosen Text type for map output key if we were asked to order the data by page_url & page_name. Such scenarios can easily be accomplished by normal sorting.

But in most of the cases, we would end up with sorting on more than one key. For instance, in a geographical dataset, a use case may require sorting of data on two numerical values – latitude in ascending order and longitude in descending order. Secondary sorting is required in such cases. Secondary sorting means sorting to be done on two or more field values of the same or different data types. Additionally we might also have deal with grouping and partitioning. The best and most efficient way to do secondary sorting in Hadoop is by writing our own key class. I inferred this from Tip 5 mentioned in the link http://blog.cloudera.com/blog/2009/12/7-tips-for-improving-mapreduce-per.... Before reading this blog, I had implemented secondary sorting by using a Text key and grouping comparators which would have to perform inefficient string to numeric conversions and vice-versa.

So, let’s try to implement secondary sort with the clickstream use-case mentioned right at the beginning of the blog.

This use case is an ideal scenario for secondary sorting. To perform the mentioned analysis, the reducer would need data per website per country and the data should also be ordered by click_time. Thus, we have to sort the data on three fields: website, country and click_time whereas data has to be grouped on website and country fields only so that one country’s data is sent in one reduce call for the click-related analysis. Additionally, we should also ensure that the data for a group should all be correctly assigned to one reduce task. So, our partitioner should partition the data on website and country fields only.

The mapper should emit {K, V} where K should be website (which can be extracted from the page_url), country and click_time and V should be the click_time, ip_addr, page_url, page_name, session_id, country. One important point to note here is that although country and click time are included in the key, it should also be present in the value V. This is because when grouping is done only one key is retained. So, if we do not send the fields as part of the value, we would lose the required data.

A step-by-step discussion on how to achieve secondary sorting by designing a custom key class is discussed next.

1. Identify the fields to be included in the Key class required for sorting, grouping & partitioning

Purpose

Fields Identified

Sorting

website, country and click_time

Grouping

website, country

Partitioning

website, country

 

The data types of each field are also outlined below.

Fields

Data types

website

string (basically character type)

country

string (basically character type)

click_time

click_time (time in milliseconds since last epoch – a numerical long value)

 

2. Writing the key class

Let’s call our class the ClickKey. The code comments(in blue) explain the implementation.

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

 

 

 

/* The composite key class which implements WritableComparable and  

hence can be used as a key class. Have not included getters and setters in this piece of code */

public class ClickKey implements WritableComparable<ClickKey> {

/* Key fields are defined here as members of the class. read and write methods for these fields are also overridden for serialization and deserialization purposes.

*/

    private String website;

    private String country;

    private Long clickTime;

     /*

     * write method for serialization. should be in sync with read()

     */

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeUTF(website);

        out.writeUTF(country);

        out.writeLong(clickTime);

    }

    /**

     * read method should be in sync with write()

     */

    @Override

    public void readFields(DataInput in) throws IOException {

        this.website = in.readUTF();

        this.country = in.readUTF();

        this.clickTime = in.readLong();

    }

/* This is the most important method here. We have to sort by website and then country and then click time, hence, the logic should be implemented accordingly. 1) Check for website equality/inequality 2) Check for country equality/inequality 3) check for click time equality/inequality. We use each data types comparator (already provided by Java) to do the comparisons */

    @Override

    public int compareTo(ClickKey other) {

 

        int res = this.website.compareTo(other.website);

        if (res == 0) {

            res = this.country.compareTo(other.country);

            if (res == 0) {

                res = this.clickTime.compareTo(other.clickTime);

            }

        }

        return res;

    }

    /*The default hash partitioner would use this function and hence it is important to override hashcode() as required.*/

    @Override

    public int hashCode() {

        int hash = 7;

        hash = 13 * hash + (this.website != null ? this.website.hashCode() : 0);

        hash = 13 * hash + (this.country != null ? this.country.hashCode() : 0);

        hash = 13 * hash + (this.clickTime != null ? this.clickTime.hashCode() : 0);

        return hash;

    }

    /* The default hash partitioner would use this function and hence it is important to override equals() appropriately */

    @Override

    public boolean equals(Object obj) {

 

        final ClickKey other = (ClickKey) obj;

 return this.website.equals(other.website) &&    this.country.equals(other.country) && this.clickTime.equals(other.clickTime);

    }

}

3. Modifying key class for grouping & partitioning

The class definition (in the previous step) is appropriate if we had to sort and group the data on the same fields. However, grouping should be done on two fields only – website and country (in that order). Hence we would have to implement a different group comparator class and set it in the job configuration to override the defaults.


3.1.    Modifying the ClickKey class to include group comparison

We add one method and one inner class to our key class

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class ClickKey implements WritableComparable<ClickKey> {

// .... methods already defined above

// adding new classes and methods here.

/* Compare group method to be used by the group comparator class to * check to do comparisons on website and country only

*/

public int compareGroup(ClickKey other) {

        int res = this.website.compareTo(other.website);

        if (res == 0) {

            res = this.country.compareTo(other.country);

        }

        return res;

 }

/* Modified hashCode and equals methods to match grouping requirements else same group values may be distributed to different reduce tasks. Note that clickTime has been removed from these methods*/

 

@Override

    public int hashCode() {

        int hash = 7;

        hash = 13 * hash + (this.website != null ? this.website.hashCode() : 0);

        hash = 13 * hash + (this.country != null ? this.country.hashCode() : 0);

return hash;

    }

    @Override

    public boolean equals(Object obj) {

        final ClickKey other = (ClickKey) obj;

 return this.website.equals(other.website) &&    this.country.equals(other.country) &&

    }

/* Group comparator class which is a subclass of WritableComparator. The important things to note here are: 1) the default constructor which calls the super class constructor with the key class type, and the compare method which calls the compareGroup method of ClickKey class */

 public static class GroupComparator extends WritableComparator {

        public GroupComparator() {

            super(ClickKey.class, true);

        }

        @Override

        public int compare(WritableComparable a, WritableComparable b) {

            ClickKey w1 = (ClickKey) a;

            ClickKey w2 = (ClickKey) b;

            return w1.compareGroup(w2);

        }

    }

}

 3.2.    Setting job configuration

We have to override the default group comparator configuration by using the following setting

job.setGroupingComparatorClass(ClickKey.GroupComparator.class);

 3.3.    Modifying the partitioner

As discussed in the previous section, we have overridden the hashCode() and equals() method so that it can be used for partitioning by the default hash partitioner. This way we avoid writing a different partitioner class. However, if we at all have to use a different class for partitioning we should also set it up in the job configuration like so.

job.setPartitionerClass(ClickKeyPartitioner.class);

A sample of a custom partitioner class is also shown below.

public static class ClickKeyPartitioner extends Partitioner<ClickKey, Text> {

        @Override

        public int getPartition(ClickKey key, Text value, int numReduceTasks) {

            int hash = 7;

            hash = 13 * hash + (key.website != null ? key.website.hashCode() : 0);

            hash = 13 * hash + (key.country != null ? key.country.hashCode() : 0);

 

            return hash % numReduceTasks;

        }

    }

These classes can now be used for secondary sorting in our use case.

An optimization hint

We should try to use raw comparators wherever applicable. Raw comparators are written to perform comparisons at the byte level without deserializing the entire object. This can speed up performance. A good example would be the Text class which does comparison at byte level.

The code pasted here is all based on the new mapreduce api. A very good explanation of writing custom keys and data types using the mapred api is available at http://developer.yahoo.com/hadoop/tutorial/module5.html. This link was my starting point for implementing keys for secondary sorting.

Secondary sorting can also be done in Hadoop Streaming.

{{cta('e783a238-b7e9-4e56-863f-c41e7b467b61')}}

 

About the Author

Garima Dosi

Solution Architect Engineering - Guwahati, India Services

More Content by Garima Dosi
Previous Article
Using Globs and Wildcards with MapReduce
Using Globs and Wildcards with MapReduce

A typical question we face while writing MapReduce jobs is how to include multiple source files to the job....

No More Articles

×

Get the latest tips and how-to's delivered straight to your inbox!

First Name
Last Name
Zaloni Blog Email Subscription
Thank you!
Error - something went wrong!