Pangool was conceived to solve in a simple way how records emitted in the Map stage are grouped and ordered when they reach the Reduce phase.
Pangool allows tuples be grouped by a subset of their fields. For instance:
TupleMRBuilder b = new TupleMRBuilder(); b.addIntermediateSchema(new Schema("my_schema", Fields.parse("url:string, timestamp:long"))); b.setGroupByFields(“url”);
In the example above tuples containing a url
and timestamp
arrive to the
Reduce stage grouped by url
.
This is a trivial case which can be easily achieved by using plain MapReduce,
but let's complicate things a bit more..
Let's say that you want tuples to be ordered in descending "url" order. In Pangool this is as easy as adding:
b.setOrderBy(OrderBy.parse("url:desc"))
As shown above, just adding a new line allows us to define the criteria (ASC / DESC) used to sort tuples. This is apparently straight-forward, however plain MapReduce usually forces you to specify custom comparators to perform this simple task.
Let's complicate things further. Imagine that you still want to group tuples
by their url
but you also need them be ordered by descending timestamp
for each group.
In this case the ordering criteria contains the fields url
and timestamp
which
exceed those that we defined in "group by" (url
).
This way of ordering is called Secondary Order and it's achieved easily in Pangool by adding:
b.setGroupByFields("url"); b.setOrderBy(OrderBy.parse("url:desc, timestamp:desc"))
As shown in the snippet above, the fields defined in "order by" include those in "group by". This is a restriction in Pangool, where Group-By must be a prefix of Order-By.
Contrary to what it looks, this is tough to achieve in plain Map-Reduce, especially when several fields are involved in grouping and ordering. Programming in plain Map-Reduce usually forces you to define custom logic for binary comparison, grouping and partitioning. Pangool excels in this task, removing all this burden and offering it to you with no pain.Note: The partitioning strategy that Pangool chooses is coherent with the group-by / sort-by criterias. In this case, Pangool will partition by the Group-by fields unless we modify the default partitioning strategy. See the Custom Partitioners section for more about this.
When multiple intermediate schemas are defined and a Reduce-side Join is performed (see joins section) then Sort-By shows us more options and advanced features. Let's see the next example:
TupleMRBuilder b = new TupleMRBuilder(); b.addIntermediateSchema(new Schema("urls_register", Fields.parse("url:string,timestamp:long"))); b.addIntermediateSchema(new Schema("urls_map", Fields.parse("url:string,canonical_url:string"))); b.setGroupByFields(“url”);
Usually when performing Joins the desired behavior is to receive tuples from one data source before than tuples from the other. This eases programming and avoids having to keep data in memory before performing the actual join business logic. (For instance, in a Cross-Product). In Pangool this is achieved adding this:
b.setGroupByFields(“url”); b.setOrderBy(new OrderBy().add("url", Order.ASC).addSchemaOrder(Order.DESC)));
The special method addSchemaOrder
in OrderBy
indicates that
tuples, after being ordered by url
, they will be ordered according to the schema they belong.
That is, given the dataset below:
URLS_REGISTERS: { "url1.com", 10000 } { "url2.com", 20000 } { "url1.com", 30000 } { "url2.com", 40000 } { "url1.com", 20000 } URLS_MAPS: { "url1.com", "http://canonical_url1.com" } { "url2.com", "http://canonical_url2.com" }
According to the definition above :
TupleMRBuilder b = new TupleMRBuilder(); b.addIntermediateSchema(new Schema("urls_register", Fields.parse("url:string,timestamp:long"))); b.addIntermediateSchema(new Schema("urls_map", Fields.parse("url:string,canonical_url:string"))); b.setGroupByFields(“url”); b.setOrderBy(new OrderBy().add("url",Order.ASC).addSchemaOrder(Order.DESC)));Tuples will be received in the Reduce phase in the following way:
REDUCE_GROUP_1 ("url1.com"): { "url1.com", "http://canonical_url1.com" } { "url1.com", 10000 } { "url1.com", 30000 } { "url1.com", 20000 } REDUCE_GROUP_2 ("url2.com"): { "url2.com", "http://canonical_url2.com" } { "url2.com", 20000 } { "url2.com", 40000 }
Receiving the canonical_url
first allows us to process
the remaining urls without needing to keep them in memory (or spilling them to disk).
Note:
Any field used in setOrderBy
must be present in every intermediate schema defined with the same name and type.
In some cases, when performing a Reduce-side join, we could be interested in ordering tuples within a particular schema in
a specific criteria. This is achieved by using the method setSpecificOrderBy
.
We can reuse the previous example to specify a particular ordering for the tuples from urls_register
:
TupleMRBuilder b = new TupleMRBuilder(); b.addIntermediateSchema(new Schema("urls_register", Fields.parse("url:string,timestamp:long"))); b.addIntermediateSchema(new Schema("urls_map", Fields.parse("url:string,canonical_url:string"))); b.setGroupByFields(“url”); b.setOrderBy(new OrderBy().add("url", Order.ASC).addSchemaOrder(Order.DESC))); b.setSpecificOrderBy("urls_register", new OrderBy().add("timestamp", Order.DESC));
Given the previous input dataset, the output would be :
REDUCE_GROUP_1 ("url1.com"): { "url1.com", "http://canonical_url1.com" } { "url1.com", 30000 } { "url1.com", 20000 } { "url1.com", 10000 } REDUCE_GROUP_2 ("url2.com"): { "url2.com", "http://canonical_url2.com" } { "url2.com", 40000 } { "url2.com", 20000 }
Note how tuples from "urls_register" now come sorted by timestamp.
Since version 0.60, some tuples could carry fields with nulls values. It is important to be able
to manage how null values are sorted when compared with a non null value. That can be controlled
by setting the proper NullOrder
in the OrderBy
definition. The possible
values are:
NullOrder value | Meaning |
---|---|
NullableOrder.NULL_SMALLEST | Nulls are the smallest possible value. Nulls will appear the first if Order.ASC is defined or last if Order.DESC is defined. (Default behaviour) |
NullableOrder.NULL_BIGGEST | Nulls are the biggest possible value. Nulls will appear the last if Order.ASC is defined or first if Order.DESC is defined |
The following examples shows how to declare the NullOrder
in OrderBy
definitions:
new OrderBy().add("url", Order.DESC, NullOrder.NULL_BIGGEST);