Pangool User Guide

Creating Pangool Jobs

The class TupleMRBuilder is the one responsible for building Pangool jobs. Here you have an example extracted from the URL resolution example:

 TupleMRBuilder mr = new TupleMRBuilder(conf, "Pangool Url Resolution");
 mr.addIntermediateSchema(getURLMapSchema());
 mr.addIntermediateSchema(getURLRegisterSchema());
 mr.addInput(new Path(input1), new TupleTextInputFormat(getURLMapSchema(), false, false, '\t', 
   NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER, null, null), new IdentityTupleMapper());
 mr.addInput(new Path(input2), new TupleTextInputFormat(getURLRegisterSchema(), false, false, '\t', 
   NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER, null, null), new IdentityTupleMapper());
 mr.setFieldAliases("urlMap", new Aliases().add("url", "nonCanonicalUrl"));
 mr.setGroupByFields("url");
 mr.setOrderBy(new OrderBy().add("url", Order.ASC).addSchemaOrder(Order.ASC));
 mr.setSpecificOrderBy("urlRegister", new OrderBy().add("timestamp", Order.ASC));
 mr.setTupleReducer(new Handler());
 mr.setOutput(new Path(output), new TupleTextOutputFormat(getURLRegisterSchema(), false, '\t', 
   NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER), ITuple.class, NullWritable.class);

Important: The method cleanUpInstanceFiles() needs to be called to properly remove all serialized objects that Pangool uses for each Job.

Let's analyze the code line by line. The following line creates the TupleMRBuilder. The parameters are a Hadoop Configuration and the name of the generated job:

 TupleMRBuilder mr = new TupleMRBuilder(conf, "Pangool Url Resolution");

Intermediate Schemas

We need to define which schemas (if more than one) will be allowed for TupleMapper output (intermediate schemas).

 mr.addIntermediateSchema(getURLMapSchema());
 mr.addIntermediateSchema(getURLRegisterSchema());

Important: The order in which intermediate schemas are provided to the TupleMRBuilder is important. It may have impact in the default order in which tuples of different schemas will be received in the TupleReducer. See Reduce-side joins for more information.

Grouping and Sorting

An important configuration parameter is how tuples must be grouped and sorted before reaching the TupleReducer. For this particular case, tuples are grouped by url. Sorting is not specified.

 mr.setGroupByFields("url");

See Grouping and Sorting for more possibilities.

Setting the job input and associated TupleMapper

You can configure a particular TupleMapper per each input path. That is, you can have several inputs, and each of them can be processed differently. For this particular example, we are configuring two input paths (input1 and input2), an both are processed by a default Pangool's mapper IdentityTupleMapper, because we don't need to perform any specific mapping logic.

For parsing input text files into Tuples we use "TupleTextInputFormat" (see Text Input/Output for more information).

 mr.addInput(new Path(input1), new TupleTextInputFormat(getURLMapSchema(), false, false, '\t', 
  NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER, null, null), new IdentityTupleMapper());
 mr.addInput(new Path(input2), new TupleTextInputFormat(getURLRegisterSchema(), false, false, '\t', 
  NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER, null, null), new IdentityTupleMapper());

Pangool can also use old Hadoop-native InputFormats like TextInputFormat. For that, a wrapper class HadoopInputFormat exists which receives a class as argument (unlike in Hadoop, in Pangool everything are instances: Mapper, Reducer, Input/Output Format, ...)

Using (binary) Tuples as input/output

Pangool can serialize Tuples in an efficient binary format with the method setTupleOutput().

You can then configure your job to read binary tuples by using the method addTupleInput() instead of addInput().

You can also use addTupleInput(Path path, Schema targetSchema, ...) to read binary tuple files whose schema don't necessarily match another target schema. Fields that are not used anymore will be skipped, and new fields not present in the input file will be received as null values.

In order to take full advantage of backwards compatibility, you can also use default values for fields in your Schema. In this way, instead of receiving null values for new fields, you will receive default values. Use the appropriate Field creator for this. The following code creates a string Field with default value "foo":

 Field.create("c", Type.STRING, true, "foo")

Setting the reducer

The reducer is configured with the method setTupleReducer(). It should extend the class TupleReducer:

 mr.setTupleReducer(new Handler());

A combiner class can be provided by using the method setTupleCombiner(). You can see an example of its use in the topical word count example.

Setting the job output

The next step is to set the job output. We want to store the output into the folder output using Pangool's Text I/O:

 mr.setOutput(new Path(output), new TupleTextOutputFormat(getURLRegisterSchema(), false, '\t', 
   NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER), ITuple.class, NullWritable.class);

Note that TupleMapper and TupleReducer can also have more than one output by using named outputs.

And finally... build and launch your job!

The following code builds a Hadoop job, executes it, and block until completion. That's all, folks!

 mr.createJob().waitForCompletion(true);

Next: Group by / Sort by »