Introduction to Pangool: Topical Word Count (1/3)

This introduction guides you into the basics of Pangool through a Word-Count-like example.

Through this introduction you'll quickly see what is Pangool about and how will Pangool make your life easier.

First of all, you'll quickly see how to use Tuples, Schemas and build a simple Tuple MapReduce Job that groups by some fields.

Topical Word Count

We’re sure you’ve seen several times the "typical word count". We have extended the problem for showing the advantages of using Pangool. We like extending things - Pangool is an implementation of an "extended model of MapReduce" we like to call Tuple MapReduce.

This example is the "topical word count". (Crowd applauds!).

In the input we will have one JSON record per line, containing the fields "text" and "topic". Topic is an integer that labels the textual content as belonging to a certain topic. This example record has a text "foo bar2" labelled as topic = 1.

 { text: "foo bar", topic: 1 }

We want to count the appearances of each word within a topic. That is, if we have this input:

 { text: "foo bar", topic: 1 }
 { text: "foo", topic: 1 }
 { text: "bar bar", topic: 2 }
 { text: "foo", topic: 2 }

And considering an output schema like (topic, word, count), we want to have as output:

 (1, "foo", 2)
 (1, "bar", 1)
 (2, "bar", 2)
 (2, "foo", 1)

You can check the full code of this example on github by clicking here.

We’ll start by writing a Mapper. For Pangool Jobs, we need to extend TupleMapper.

 public static class TokenizeMapper extends TupleMapper {

   protected Tuple tuple;
   protected ObjectMapper mapper; // This is Jackson's JSON Parser that we'll use for parsing the input

   public void setup(TupleMRContext context, Collector collector) throws IOException, InterruptedException {
     this.mapper = new ObjectMapper();
     tuple = new Tuple(context.getTupleMRConfig().getIntermediateSchema(0));
   }

   public void map(LongWritable key, Text value, TupleMRContext context, Collector collector) throws IOException, InterruptedException {
     Map document = mapper.readValue(value.toString(), Map.class); // Parse JSON
     tuple.set("topic", (Integer) document.get("topicId"));
     StringTokenizer itr = new StringTokenizer((String) document.get("text"));
     tuple.set("count", 1);
     while(itr.hasMoreTokens()) {
       tuple.set("word", itr.nextToken());
       emitTuple(collector); // We are creating a method for this to allow overriding it from subclasses
     }
   }
 }

TupleMappers have three methods available: setup(), map() and cleanup().
As in Hadoop, setup() is called beginning of each Map task, cleanup() end of each task and map() for each input record. We see how it is possible to cache a Tuple for avoiding excessive object creation:

 tuple = new Tuple(context.getTupleMRConfig().getIntermediateSchema(0));

Tuples need to be created with an Schema. We can grab the Schema we define by index or by name. (We’ll show how to define this Schema later).

When working with tuples, we can set fields using the name of the field defined in the Schema:

 tuple.set("word", itr.nextToken());

Finally, we see how we use the Pangool collector for emitting Tuples:

 collector.write(tuple);

(Apart from the Pangool Collector, we also have the TupleMRContext. This class wraps the information that we setup in the Tuple MapReduce Job + the standard Hadoop Context. You can use it to access the Hadoop Counters, for instance.

For more information, check the "Creating Pangool Jobs" user guide page.

Now, we need to write a Reducer. The Reducer will extend TupleReducer. (Again, this Reducer only has two types. It always receives Tuples from the Mapper, so we need to specify the two types of the output. In this case we’ll also output ITuple, NullWritable)

 public static class CountReducer extends TupleReducer {

   public void reduce(ITuple group, Iterable tuples, TupleMRContext context, Collector collector) throws IOException, InterruptedException, TupleMRException {
     int count = 0;
     ITuple outputTuple = null;
     for(ITuple tuple : tuples) {
       count += (Integer) tuple.get("count");
       outputTuple = tuple;
     }
     outputTuple.set("count", count);
     collector.write(outputTuple, NullWritable.get());
   }
 }

The reducer counts and emits a Tuple with the total count. (Note how we reuse a Tuple instance from the Iterator in order to avoid object creation and boilerplate code).

And now for the most important part of it. Let’s create a Tuple MapReduce!

 TupleMRBuilder mr = new TupleMRBuilder(conf, "Pangool Topical Word Count");
 mr.addIntermediateSchema(getSchema());
 // We will count each (topicId, word) pair
 // Note that the order in which we defined the fields of the Schema is not relevant here
 mr.setGroupByFields("topic", "word");
 mr.addInput(new Path(args[0]), new HadoopInputFormat(TextInputFormat.class), new TokenizeMapper());
 // We'll use a TupleOutputFormat with the same schema than the intermediate schema
 mr.setTupleOutput(new Path(args[1]), getSchema());
 mr.setTupleReducer(new CountReducer());
 mr.setTupleCombiner(new CountReducer());

There are several things to note here:

 cg.addIntermediateSchema(getSchema());
  • When using Tuple MapReduce, we need to define intermediate schemas. These are the Schemas that Pangool will use for serializing the tntermediate output. In this case our schema is formed by "word" (Utf8), "topic" (Int) and "count" (Int). Tuples that we emit in the Mapper need to conform to this Schema.
 List fields = new ArrayList();
 // The schema has 3 fields: word, topicId and count
 fields.add(Field.create("word", Type.STRING));
 fields.add(Field.create("topic", Type.INT));
 fields.add(Field.create("count", Type.INT));
 return new Schema("schema", fields);
  • Note that the order in which we define the fields is not important.
For more information, you can go to the "Tuples & Schemas & Types" user guide page.

This is one of the key Pangool’s features. With this setting, we are saying that we want to group by these two fields (that need to be present in the intermediate schema):

 cg.setGroupByFields("topic", "word");

Because the input is JSON, we will use a Hadoop input format for the input to receive the whole line of text in the mapper. (If we wanted to parse the input text directly into a tuple, we could use Pangool's native Text Input/Output).

 cg.addInput(new Path(args[0]), new HadoopInputFormat(TextInputFormat.class), new TokenizeMapper());

Note how Mappers and InputFormats are instances. This will allow you to manage trivial state information easily. You’ll see this deeply in the next step.

And that’s about it! Note how we’ve created a MapReduce Job that works with 3 fields, grouping by two of them and how painlessly it has been! - And by the way, it will perform very efficiently!

You can run this example by doing:

 hadoop jar $PANGOOL_EXAMPLES_JAR topical_word_count [input] [output]

You can also use an input data generator for generating random input for this example:

 hadoop jar $PANGOOL_EXAMPLES_JAR topical_word_count_gen_data [out-file] [nRegisters] [nTopics]

What's next? Leveraging instances!