The class TupleMRBuilder
is the one responsible for building Pangool jobs. Here
you have an example extracted from the
URL resolution example:
TupleMRBuilder mr = new TupleMRBuilder(conf, "Pangool Url Resolution"); mr.addIntermediateSchema(getURLMapSchema()); mr.addIntermediateSchema(getURLRegisterSchema()); mr.addInput(new Path(input1), new TupleTextInputFormat(getURLMapSchema(), false, false, '\t', NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER, null, null), new IdentityTupleMapper()); mr.addInput(new Path(input2), new TupleTextInputFormat(getURLRegisterSchema(), false, false, '\t', NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER, null, null), new IdentityTupleMapper()); mr.setFieldAliases("urlMap", new Aliases().add("url", "nonCanonicalUrl")); mr.setGroupByFields("url"); mr.setOrderBy(new OrderBy().add("url", Order.ASC).addSchemaOrder(Order.ASC)); mr.setSpecificOrderBy("urlRegister", new OrderBy().add("timestamp", Order.ASC)); mr.setTupleReducer(new Handler()); mr.setOutput(new Path(output), new TupleTextOutputFormat(getURLRegisterSchema(), false, '\t', NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER), ITuple.class, NullWritable.class);
Important: The method cleanUpInstanceFiles() needs to be called to properly remove all serialized objects that Pangool uses for each Job.
Let's analyze the code line by line. The following line creates the TupleMRBuilder
.
The parameters are a Hadoop Configuration
and the name of the generated job:
TupleMRBuilder mr = new TupleMRBuilder(conf, "Pangool Url Resolution");
We need to define which schemas (if more than one) will be allowed for TupleMapper
output (intermediate schemas).
mr.addIntermediateSchema(getURLMapSchema()); mr.addIntermediateSchema(getURLRegisterSchema());
Important: The order in which intermediate schemas are provided to the
TupleMRBuilder
is important. It may have impact in the
default order in which tuples of different schemas will be received in the
TupleReducer
. See Reduce-side joins for
more information.
An important configuration parameter is how tuples must be grouped
and sorted before reaching the TupleReducer
. For this particular case,
tuples are grouped by url
. Sorting is not specified.
mr.setGroupByFields("url");
See Grouping and Sorting for more possibilities.
You can configure a particular TupleMapper
per each input path.
That is, you can have several inputs, and each of them can be processed differently.
For this particular example, we are configuring two input paths (input1 and input2),
an both are processed by a default Pangool's mapper IdentityTupleMapper, because we don't need
to perform any specific mapping logic.
For parsing input text files into Tuples we use "TupleTextInputFormat" (see Text Input/Output for more information).
mr.addInput(new Path(input1), new TupleTextInputFormat(getURLMapSchema(), false, false, '\t', NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER, null, null), new IdentityTupleMapper()); mr.addInput(new Path(input2), new TupleTextInputFormat(getURLRegisterSchema(), false, false, '\t', NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER, null, null), new IdentityTupleMapper());
Pangool can also use old Hadoop-native InputFormats like TextInputFormat. For that, a wrapper class HadoopInputFormat
exists which receives a class as argument (unlike in Hadoop, in Pangool everything are instances: Mapper, Reducer, Input/Output Format, ...)
Pangool can serialize Tuples in an efficient binary format with the method setTupleOutput()
.
You can then configure your job to read binary tuples by using the method addTupleInput()
instead of addInput()
.
You can also use addTupleInput(Path path, Schema targetSchema, ...)
to read binary tuple files whose schema don't necessarily match another target schema.
Fields that are not used anymore will be skipped, and new fields not present in the input file will be received as null values.
In order to take full advantage of backwards compatibility, you can also use default values for fields in your Schema. In this way, instead of receiving null
values for new fields, you will receive default values. Use the appropriate Field
creator for this. The following code creates a string Field
with default value "foo"
:
Field.create("c", Type.STRING, true, "foo")
The reducer is configured with the method setTupleReducer()
. It should
extend the class TupleReducer
:
mr.setTupleReducer(new Handler());
A combiner class can be provided by using the method setTupleCombiner()
. You
can see an example of its use in the
topical word count example.
The next step is to set the job output. We want to store the output into
the folder output
using Pangool's Text I/O:
mr.setOutput(new Path(output), new TupleTextOutputFormat(getURLRegisterSchema(), false, '\t', NO_QUOTE_CHARACTER, NO_ESCAPE_CHARACTER), ITuple.class, NullWritable.class);
Note that TupleMapper
and TupleReducer
can also have more than one output by using
named outputs.
The following code builds a Hadoop job, executes it, and block until completion. That's all, folks!
mr.createJob().waitForCompletion(true);