Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

Proposal for hash based aggregation in map

...

Introduction

Pig does (sort based) partial aggregation in map side through the use of combiner. MR serializes the output of map to a buffer, sorts it on the keys, deserializes and passes the values grouped on the keys to combiner phase. The same work of combiner can be done in the map phase itself by using a hash-map on the keys. This hash based (partial) aggregation can be done with or without a combiner phase.

...

  • Save on cost of serializing and de-serializing
  • Save on cost of lock calls on the combiner input buffer. (I have found this to be a significant cost for a query that was doing multiple group-by's in a single MR job. -Thejas)
  • The For some types of queries it is useful to turn off combiner in reduce side. This is not possible with hadoop combiner. By doing this, the problem of running out of memory in reduce side, for queries like COUNT(distinct col) can be avoided. The OOM issue happens because very large records get created after the combiner run on merged reduce input. In case of combiner, you have no way of telling MR not to combine records in reduce side. The workaround is to disable combiner completely, and the opportunity to reduce map output size is lost., which does not fit into MR framework's buffers. Usually the size after partial aggregation on map side does not cause records to be too large to cause a problem.
  • Pig does not use hadoop combiner when there is a non algebraic function on the grouped bag column or if that column is projectedWhen the foreach after group-by has both algebraic and non-algebraic functions, or if a bag is being projected, the combiner is not used. This is because the data size reduction in typical those cases are usually not significant enough to justify the additional (de)serialization costs. But hash based aggregation can be used in such cases as wellbecause there is no (de)serialization overhead.
  • It is possible to turn off the in-map combine automatically if there is not enough 'combination' that is taking place to justify the overhead of the in-map combiner. (Idea borrowed from Hive jira.)
  • If input data is sorted, it is possible to do efficient map side (partial) aggregation with in-map combiner.

...

In the initial implementation, combiner will be supported only when all projections are either expressions on the group column or expressions on algebraic UDFs. This is because column pruning does not currently discard unused columns within a grouped-bag, and in such cases there will not be data size reduction happening because of the use of in-map combiner.

For the query -

Code Block

l = load 'x'   as (a,b,c); 
g = group l by a;
f = foreach g generate  group, COUNT(l.b);

The existing plan -

Code Block

Map Plan
g: Local Rearrange[tuple]{bytearray}(false) - scope-73
|   |
|   Project[bytearray][0] - scope-74
|
|---f: New For Each(false,false)[bag] - scope-61
    |   |
    |   Project[bytearray][0] - scope-62
    |   |
    |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-63
    |   |
    |   |---Project[bag][1] - scope-64
    |       |
    |       |---Project[bag][1] - scope-65
    |
    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-75
        |
        |---l: New For Each(false,false,false)[bag] - scope-47

Will change to -

Code Block

Map Plan
g: Local Rearrange[tuple]{bytearray}(false) - scope-73
|   |
|   Project[bytearray][0] - scope-74
|
|---f: HashAgg 
    |   |
    |   Project[bytearray][0] - scope-62
    |   |
    |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-63
    |   |
    |   |---Project[bag][1] - scope-64
    |
    |---f: New For Each(false,false)[bag] - scope-61
        |   |
        |   Project[bytearray][0] - scope-62
        |   |
        |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-63
        |   |
        |   |---Project[bag][1] - scope-64
        |       |
        |       |---Project[bag][1] - scope-65
        |
        |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-75
            |
            |---l: New For Each(false,false,false)[bag] - scope-47

The MR combiner will also be supported and by default in-map combiner will not be used. There will be a property that will need to be set to enable it. There will be another property that will control use of MR combiner along with in-map combiner. After sufficient testing is done, we can change the default execution mode and properties.

...

2. Memory Management
Similar to the current strategy of InternalCachedBag, find the average size of the entries and estimate the size held by current number of entries. If the size exceeds the internal cached bag size threshold, it will write a portion of the hashmap entries to output.
In case of multi-query, there will be multiple such bags, and the memory limit will be shared equally between them.