This introduction guides you into the basics of Pangool through a Word-Count-like example.
Finally, you'll learn how to use Pangool's easy secondary sort and how to write to different outputs from a Tuple MapReduce Job!
Now we'll expand the "topical word count" example to compute a "fingerprint" for each topic.
The fingerprint will be formed by the "top n" words for each topic. Each word and topic pair has an associated count that we have calculated in the first Job we did. We will do two things:
You can check the full code of this example in github by clicking here.
For that, we will create a new Tuple Reducer that will look like:
public static class TopNWords extends TupleReducer{ int n; // We will emit as many words as this parameter (at maximum) for each group Tuple outputCountTuple; public TopNWords(int n) { this.n = n; } public void setup(TupleMRContext context, Collector collector) throws IOException, InterruptedException, TupleMRException { outputCountTuple = new Tuple(getOutputCountSchema()); } public void reduce(ITuple group, Iterable tuples, TupleMRContext context, Collector collector) throws java.io.IOException, InterruptedException, TupleMRException { int totalCount = 0; Iterator iterator = tuples.iterator(); for(int i = 0; i < n && iterator.hasNext(); i++) { ITuple tuple = iterator.next(); collector.write(tuple, NullWritable.get()); totalCount += (Integer) tuple.get("count"); } outputCountTuple.set("topic", group.get("topic")); outputCountTuple.set("totalcount", totalCount); collector.getNamedOutput(OUTPUT_TOTALCOUNT).write(outputCountTuple, NullWritable.get()); } }
(Note how we used the ability of using instances for storing the "n" parameter by passing it via constructor.)
Let’s analyze the lines above:
for(int i = 0; i < n && iterator.hasNext(); i++) { ITuple tuple = iterator.next(); collector.write(tuple, NullWritable.get());
That’s quite simple. We are just emitting "n" tuples at maximum, per each group. Note that we are doing this because we are assuming that tuples will be sorted by their count within each group - We'll see later how we can accomplish this.
We wanted to use an extra output for storing the total count per each topic, right? This output will be a Tuple, but it will have a different Schema. We define the schema like:
public static Schema getOutputCountSchema() { Listfields = new ArrayList (); fields.add(Field.create("topic", Type.INT)); fields.add(Field.create("totalcount", Type.INT)); return new Schema("outputcount", fields); }
We use a Tuple with this schema and emit it to a "named output":
outputCountTuple.set("topic", group.get("topic")); outputCountTuple.set("totalcount", totalCount); collector.getNamedOutput(OUTPUT_TOTALCOUNT).write(outputCountTuple, NullWritable.get());
The same collector that we use for writing to the Job’s main output is the one that we’ll use for writing to extra, named output. That is why we say that Pangool has built-in, first-class multiple outputs (and inputs!). They are part of its standard API.
Finally, let's see how the Job configuration looks like:
cg.setGroupByFields("topic"); cg.setOrderBy(new SortBy().add("topic", Order.ASC).add("count", Order.DESC));
This is a quite important part. We'll ellaborate more on it. We want to emit the "top n" words per each topic, therefore we need to group by topic. But we also want to receive words already sorted so we'll just emit the first "n". For that, we configure the Job by to sort by topic AND count.
And, last but not least, let's see how we can declare a “named output”:
cg.addNamedTupleOutput(OUTPUT_TOTALCOUNT, getOutputCountSchema());
In this introduction you have learned some of the core key features of Pangool through the "topical word count" example. We have shown how to use Tuples, intermediate Schemas, “Group by”, “Sort by”, how convenient it is to use instances instead of static classes and how easy it is to define and use multiple (named) outputs.