Pangool User Guide

Reduce-side Joins

One of the most interesting features of Pangool is its inherent reduce-side join capabilities.

In this section we’ll comment one of the examples that can be found in the examples sub-project of Pangool (the URL resolution example) to illustrate how easy it is to perform arbitrary reduce-side joins with Pangool.

We have one file with URL registers: [url, timestamp, ip] and another file with canonical URL mappings: [nonCanonicalUrl, canonicalUrl]. We want to output the URL registers file and have the url be substituted with the canonical one according to the mapping file. In other words, we want the output to be like: [canonicalUrl, timestamp, ip]

For that, we need to join the URL registers file with the URL mappings one. To achieve that we need to join URL registers grouping by url and URL mappings grouping by nonCanonicalUrl. Using a reduce-side join, we could store the “canonicalUrl” for each group and apply the substitution to each of the URL registers records associated with that url.

To make the join totally scalable, we need to receive the canonicalUrl first in each reduce group so that we only need to stream through the URL registers afterwards.

We’ll configure a Pangool Job to accept two inputs and therefore use two intermediate schemas. As we can parse the text input files directly using Pangool's Text I/O, we don't need to perform any specific mapping logic.

 TupleMRBuilder mr = new TupleMRBuilder(conf, "Pangool Url Resolution");
 mr.addIntermediateSchema(getURLMapSchema());
 mr.addIntermediateSchema(getURLRegisterSchema());
 mr.addInput(new Path(input1), new TupleTextInputFormat(getURLMapSchema(), false, false, '\t',
   NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER, null, null), new IdentityTupleMapper());
 mr.addInput(new Path(input2), new TupleTextInputFormat(getURLRegisterSchema(), false, false, '\t', 
   NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER, null, null), new IdentityTupleMapper());

Let’s now check to see the Reducer that performs the join:

 public static class Handler extends TupleReducer<ITuple, NullWritable> {

   private Tuple result;

   public void reduce(ITuple group, Iterable<ITuple> tuples, TupleMRContext context, Collector collector)
     throws IOException, InterruptedException, TupleMRException {
     if (result == null) {
       result = new Tuple(getURLRegisterSchema());
     }
     String cannonicalUrl = null;
     for(ITuple tuple : tuples) {
       if("urlMap".equals(tuple.getSchema().getName())) {
         cannonicalUrl = tuple.get("canonicalUrl").toString();
       } else {
         result.set("url", cannonicalUrl);
         result.set("timestamp", tuple.get("timestamp"));
         result.set("ip", tuple.get("ip"));
         collector.write(result, NullWritable.get());
       }
     }
   }
 }

Let’s comment on this specific part of the code:

 for(ITuple tuple : tuples) {
   if("urlMap".equals(tuple.getSchema().getName())) {
     cannonicalUrl = tuple.get("canonicalUrl").toString();
   } else {
     ...     
   }
 }

What happens is that we are iterating over each of the Tuples associated with the same url group, but we are keeping a state variable in case that the Tuple belongs to one of the schemas (the one that processed URL mapping registers). Then, we use this state variable for emitting each of the URL registers. Note how we are assuming that the URL mapping comes always before the URL registers. In other words, there is a specific source sort within the records of the joined Tuple list. We’ll see this in greater detail when we configure the job for that purpose.

Let’s now check how we configure the reduce-side join:

 builder.setFieldAliases("urlMap",new Aliases().add("url","nonCanonicalUrl"));
 builder.setGroupByFields("url");
 ...

Note: The specified fields in setGroupByFields must be present in all the intermediate schemas defined.If the field we want to group by is named differently among the schemas,like the case above, then we must declare an alias for it, using builder.setFieldAliases() and use that alias to group by, as it's shown above.

And that’s all! We have just defined two schemas and defined the group key. Note that even url field is not present in schema urlMap, there exists an alias url that refers to nonCanonicalUrl, so actually tuples are grouped by nonCanonicalUrl. By default, Pangool will sort by the groupBy fields and will peform inter-source sorting based on the order in which we defined the sources.
In this case, for each group, the URL mapping will come first - and that’s why the code above works.

In order to configure a special sorting for the join, you’ll need to use addSchemaOrder() as needed depending on which place you want to put the inter-source ordering in your custom sorting.

More info in Group-By & Sort-By section.

Next: Map-only Jobs »