Using Map Reduce to find the twitter trends

Few weeks back while preparing for our presentation for agileNCR 2013, Sanchit and I started working on an interesting problem statement to solve using MapReduce.

We thought of applying MapReduce algorithm to find the trends in Twitter.

A Tweet in a twitter can have hashTags (#helloTwitter) and a certain hashTag used most number of times in tweets globally is said to have highest trend. More details can be found here.

This data is huge and also keeps on increasing, so processing it in traditional manner would not be possible.

Hence we would require hadoop to help us solve this problem.

Twiiter uses Cassandra to store the data in key-value format. Lets assume for simplicity that the key value pair for tweet data looks something like this < twitterHandle,Tweet >.

So, in order to find the top n trends in a given snapshot, we would need to:
1. Process all Tweets and parse out tokens with HashTags.
2. Count all the hashTags.
3. Find out top n hashtags by sorting them.

So, the input data for our Mapper could be a flat file generated out of Values of the Key-Value of <twitterHandle,Tweet>.
It would look something like this :

I love #agileNCR.
Attenindg sesssion on #hadoop.
.....
....

click here to see the sample input file.

Assumption: As one tweet can be of maximum 140 characters we can store the data in such a format that each line is new tweet.

Step 1: Mapper
Mapper while tokenising would collect only the tokens which start by #.

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, 
   InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
         String token = tokenizer.nextToken();
       if(token.startsWith("#")){
         word.set(token.toLowerCase());
         // Context here is like a multi set which allocates value "one" for key "word".
         context.write(word, one);
         }
    }

Step 2: Combiner
Combiner would combine all the same hashtags together.

Step3: Reducer

@Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
    throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }

Reducer will generate the output something like this :

#agileNCR 21
#hello 4
#xomato 88
#zo 36

The problem with this output is that it is sorted by the key values, as in the mapping phase the shuffle and sort step sorts them alphabeticaly on the basis of keys.

To get the desired out of sorting it on the basis of number of occurances of each hashTag, we would need them to be sorted on the basis of values.
So we decided to pass this output to second Map-Reduce job which will swap the key and the value and then perform sort.

Hence :
Step4: Mapper 2

Tokenise the input and put 2nd token(the number) as key and 1st token (hashtag) as value.
While mapping it will shuffle and sort on the basis of key.
However, the sorting of the keys by default is in ascending order and we would not get out desired list.
So, we would need to use a Comparator.
We would need to use LongWritable.ReverseComparator.

 @Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
        String line = value.toString(); // agilencr 4
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
        
         String token = tokenizer.nextToken();	
        
        // Context here is like a multi set which allocates value "one" for key "word".
        
         context.write(new LongWritable(Long.parseLong(tokenizer.nextToken().toString())), new Text(token));
        
        }
    }

Step5: Reducer 2
In reducer we will swap back the result again.

@Override
	public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {

        String line = value.toString(); // agilencr 4
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {

        	String token = tokenizer.nextToken();

        // Context here is like a multi set which allocates value "one" for key "word".

        	context.write(new LongWritable(Long.parseLong(tokenizer.nextToken().toString())),
                 new Text(token));

        }
    }

So that we get the desired output like this:

#xomato 88
#zo 36
#agileNCR 21
#hello 4

It might not be the best way but its quite useful to understand MapReduce and Comparator.
The complete source code can be downloaded from here .

This post was originally posted at http://xebee.xebia.in/2013/02/19/using-map-reduce-to-find-the-twitter-trends/

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s