An interesting, advanced feature of Pangool is its rollup capability.
There are some cases where it is interesting to perform aggregate sub-groupings within a Reducer.
Let’s pose an example.
Let’s imagine we want to process a set of Tweets and calculate the top N hashtags for each
location and date.
We could write a job that groups all tweets by (location
, date
, hashtag
) and counts each hashtag.
Then, we could write a second job that performs the top N selection grouping by (location, date).
However, using rollup, we can merge both things easily in a single job.
Let’s visualize the intermediate schema that we will have in such a job - and some data, in no particular order:
[location, date, hashtag, count] "L1" "d1" "h1" 5 "L1" "d1" "h2" 5 "L1" "d2" "h1" 1 "L1" "d2" "h1" 1 "L1" "d1" "h1" 3 "L1" "d1" "h2" 7
We want to group by (location
, date
, hashtag
) to be able
to aggregate all the counts per each (location
, date
, hashtag
) group.
Therefore tuples will also be sorted by (location
, date
, hashtag
).
At the end of each reduce()
method we’ll output the total count:
[location, date, hashtag, count] "L1" "d1" "h1" 5 "L1" "d1" "h1" 3 output("L1", "d1", "h1", 8) "L1" "d1" "h2" 5 "L1" "d1" "h2" 7 output("L1", "d1", "h2", 12) "L1" "d2" "h1" 1 "L1" "d2" "h1" 1 output("L1", "d2", "h1", 2)
However, we can leverage the fact that we are sorting by all the fields in the (location
, date
, hashtag
)
group to add business logic every time that some of these fields change its value - in other words, adding subgroups.
This way, apart from having one reduce() method, we can also have onClose()
/onOpen()
-like methods for other fields of the group.
So we could modify our job to save each (location
, date
, hashtag
)
total count in a n-sized heap in memory.
Then, in the onClose()
of the date
field, we could simply flush
the top n hashtags to the output, which would be the ones that remain in the heap.
Let’s imagine we want to get the “top 1” hashtag for each location, date. Let’s assume we rollup from date:
[location, date, hashtag, count] "L1" "d1" "h1" 5 "L1" "d1" "h1" 3 reduce("L1", "d1", "h1") : "Add to a 1-sized heap: (h1, 8)" "L1" "d1" "h2" 5 "L1" "d1" "h2" 7 reduce("L1", "d1", "h2") : "Add to a 1-sized heap: (h2, 12)" onClose("d1") : output("h2", 12) "L1" "d2" "h1" 1 "L1" "d2" "h1" 1 reduce("L1", "d2", "h1") : "Add to a 1-sized heap: (h1, 2)" onClose("d2") : output("h1", 2)
Got the idea? The idea is that, by leveraging the way we are sorting the Tuples, we can inject
business logic everytime a wider group opens or closes.
For this example, we would configure the job as:
grouper.setGroupByFields("location", "date", "hashtag"); grouper.setRollupFrom("date");
We would then need to use a special reducer that extends TupleRollupReducer.
This Reducer will have extra methods: onOpen()
and onClose()
where
we’ll be able to add arbitrary business logic when any of the fields configured in rollup from change.
In this example, we’ll be notified of fields date
and hashtag
.
We’ll still be able to use the reduce()
method for the location
, date
, hashtag
groups.
You can see this example in : TopNHashTags.java
Note that when using rollup, the default partitioning strategy is changed.
To keep things coherent, Pangool will need to partition by the fields that fall left to the rollupFrom
field (including it).
In the example, Pangool will partition by location
, date
.
If we rollupFrom from location
then Pangool would just partition by location
.
The wider the subgroups we create with rollup, the worse the partition function will be.
So, rollup has to be used with care in order not to create too-wide groups that could make reducer completion time deviate too much from one another.