You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Proposal for hash based aggregation in map (in-map combiner)

Introduction

Pig does (sort based) partial aggregation in map side through the use of combiner. MR serializes the output of map to a buffer, sorts it on the keys, deserializes and passes the values grouped on the keys to combiner phase. The same work of combiner can be done in the map phase itself by using a hash-map on the keys. This hash based (partial) aggregation can be done with or without a combiner phase.

Benefits

It will send fewer records to combiner and thereby -

  • Save on cost of serializing and de-serializing
  • Save on cost of lock calls on the combiner input buffer. (I have found this to be a significant cost for a query that was doing multiple group-by's in a single MR job. -Thejas)
  • The problem of running out of memory in reduce side, for queries like COUNT(distinct col) can be avoided. The OOM issue happens because very large records get created after the combiner run on merged reduce input. In case of combiner, you have no way of telling MR not to combine records in reduce side. The workaround is to disable combiner completely, and the opportunity to reduce map output size is lost.
  • When the foreach after group-by has both algebraic and non-algebraic functions, or if a bag is being projected, the combiner is not used. This is because the data size reduction in typical cases are not significant enough to justify the additional (de)serialization costs. But hash based aggregation can be used in such cases as well.
  • It is possible to turn off the in-map combine automatically if there is not enough 'combination' that is taking place to justify the overhead of the in-map combiner. (Idea borrowed from Hive.)
  • If input data is sorted, it is possible to efficient map side (partial) aggregation with in-map combiner.

Concerns

Memory used by the in-map combiner will have to be managed carefully, to avoid any out of memory errors.

Open questions

It is not clear, if both MR-combiner and in-map combiner should be enabled in a pig MR job. If in-map combiner is used, the data reduction that would happen because of MR-combiner might not be sufficient to justify the costs. But MR-combiner might help in further reducing the data that gets written to disk, if multiple spill files get merged or if multiple waves of sort-merge happen on reduce side.

Changes

Design Option 1 - Simulate combiner in map task

In this option, the work done by MR to group values by key before invoking the combiner plan is simulated within the map using a hash-map. The hash-map will group rows on the group-by keys. When the hash-map is big enough, run the accumulated groups through the combiner plan. The results of the in-map combine plan execution get written as the map output. This option is easier to implement. But it is not going to be efficient in its memory footprint as input tuples will be kept around until the configured memory limit forces it to combine them. This will result in a smaller number of key-values that will be in memory at a time, and result in fewer values being aggregated, and a larger map output size.

query plan

MapReduceOper class that represents MR job in pig MR plan will now have a new member inMapCombinePlan, which is a PhysicalPlan. In the initial implementation, combiner physical plan (the member called combinePlan) can be cloned here.

But for supporting in-map combine for cases where combiner does not get used (eg. when there is a bag/non-algebraic udf), the MR plan optimizer rules need to change. But for such cases, the output type of map and combine plan would be different, that could be a problem.

plan execution

A new class that extends PigMapBase will have a collect call that accumulates the key-values into a hash-map. The hash-map will spill into the combine plan, when its estimated size exceeds a configurable threshold . This would be similar to the InternalCacheBag implementation.

Design Option 2

In this option, there will be a new physical operator, POHashAgg, that will do the hash based aggregation. This will be the last node before the LORearrange in the map plan of MR job for the group operation. For every new input record, it will generate the key, and run the plan on values (which includes the UDF.getInitial.exec()) to get input to intermediate aggregation. It will store the key and input values in a hashmap.
When there are two, or a small number of values for a group key (set), it will compute the new partial aggregate value and store back in the hash-map as intermediate result. The memory management of the hash-map will be similar to that of InternalCacheBag, it will estimate its memory footprint. It will flush some % (5%?) of entries when it exceeds configurable memory limit. The least recently used keys can be chosen to be flushed (keeping track of few most recently used might be enough). These flushed entries will be written as map output.

It might be useful to have a new udf interface that accepts a tuple at a time to compute a partial aggregate, so that new bags don't have to be created for each new tuple that needs to be aggregated. But the bag creation overhead and overhead of calling the udf multiple times could be reduced by calling the udf only after few values have been accumulated in the hash-map.

This design option will have a smaller memory footprint because input tuples can be aggregated sooner. The local-rearrange transformations done for combiner input will not have to be done in this case.

This will result also in smaller output size because more records can be held in the hash-map. But the work involved with this option is more because as new physical operator is needed and the MR plan optimization rules need to change to create internal plans for this new operator.

Design 2 initial implementation

In the initial implementation, combiner will be supported only when all projections are either expressions on the group column or expressions on algebraic UDFs. This is because column pruning does not currently discard unused columns within a grouped-bag, and in such cases there will not be data size reduction happening because of the use of in-map combiner.

The MR-compiler will not be used in addition to the hash-based combiner. There will be a flag to switch between in-map combiner and MR-combiner for sometime, so that we can easily compare the performance.

Query Plan
A new class HashAggOptimizer based on current CombinerOptimizer will generate the in-map plan. The reduce side of the plan will be same as in case of current combiner. HashAggOptimizer will create a POHashAgg operator, instead of creating a POForeach for the combiner. The POHashAgg will have a plan to generate the key, to compute the input values to the agg function (calling UDF.getInitial().exec() ), and the plan for calling UDF.getIntermediate().exec().

Plan execution
1. Evaluating aggregate results
For every new input record, POHashAgg will do the following -

  1. compute the key using the plan for the key
  2. lookup the hashmap for entry corresponding to key
  3. compute the initial plan on the values to compute inputs to the UDF.getIntermediate().exec().
  4. if the key is a key not seen in previous iteration (currentKey), or if the currentKey has many values ( > 100 (configurable)), for each aggregate plan, create a bag with the existing values and new value, and compute the aggregated value.

2. Memory Management
Similar to the current strategy of InternalCachedBag, find the average size of the entries and estimate the size held by current number of entries. If the size exceeds the internal cached bag size threshold, it will write a portion of the hashmap entries to output.

  • No labels