You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Background

Geode supports querying data from regions using query language called OQL.. Currently it doesn’t have support for aggregate functions, except supporting “count(*)”  that is without the accompanying group by functionality.

Requirements

Enhance the OQL engine to support:

- Aggregate functions (AVG , MAX, MIN, COUNT, SUM)  

- Associated Group By clause

- Investigation on UDA (User Defined Aggregates) implementation

Proposal

High level changes to support the functionality:

 OQL Grammar:

  1. Support non distinct order by

  2. Detect Group By Clause

  3. Detect Group By Path expressions

  4. Detect Aggregate functions in the projection attribute

Query correctness criteria:

a) If the query contains Group By Clause, then the projection attribute should only contain aggregate functions & only those columns which are present in Group By Clause.

E.g.: select col1, avg( col2) from /portfolio  group by col1

 select col1, col3, avg( col2) from /portfolio  group by col1     ( col3 missing from group by clause)

 b) If the query does not contain Group By clause then its projection can either not contain any aggregate functions or they can only contain aggregate functions.

E.g.: select avg( col2) from /portfolio  

select col1, avg ( col2) from /portfolio    ( col1 should not be present in projection attributes)

Query Types & implementation:

Implementation for queries containing only group by clause & projection containing only columns (no aggregate function)

 Select pf.ID , pf.status as status from /portfolio pf where pf.ID > 100 group by pf.ID , status

The above query can be converted into a distinct – order by query to get the result.

Transformed query => select distinct pf.ID , pf.status as status from /portfolio pf where pf.ID > 100 order by pf.ID, status

a) Replicated Region query :-  For replicated executing the transformed query should yield the correct results.

b) Partitioned Region :- Should work without change ( assuming distinct order by is supported for PR)

Implementation for queries containing only group by clause & projection containing at least one aggregate function & zero or more columns

 

select pf.status ,  AVG(pf.ID) from /portfolio pf where pf.ID > 0 group by pf.status

 

 

Replicated Region query

 

     a) Get the projected rows ( by stripping the aggregate functions)

b) Sort the projected rows based on the Group By Columns

c) For each row of sorted data , feed the row to the aggregate functions in the projection & maintain the aggregate.

d) Prepare the resultset at the end of complete row scan.

 

Partitioned Region query

 

For each Bucket:

     a)  Get the projected rows ( by stripping the aggregate functions)

b) Sort the projected rows based on the Group By Columns

b) For each sorted row , feed the row to the aggregate functions in the projection & maintain the aggregate.

c) Prepare the resultset at the end of complete row scan.

d) For the aggregate function AVG ,  it should also keep track of the number of rows used to produce the result of AVG.

          

 For all buckets present locally

     a) The result of each bucket needs to be merged locally , to  get the resultant resultset. The group by columns fetched in the projection attribute would be used to merge & calculate the resultant resultset

 For the query node

     a) Resultset from each node needs to be merged in the same manner as for all buckets merged locally.

     b) Final resultset computed


Modification in current Querying Engine and things to consider:

The current OQL engine needs to support non distinct order by clause. The order by comparator attributes which map to run time iterators , need to map to projection attributes, so that order by clause, can be applied on the query node without firing another order by query.  The results from individuals bucket nodes need to be N - way merged, as the results from individual nodes comes sorted.

In case of PR , for certain types of aggregate functions,  the implementation of the aggregate function on bucket nodes, would defer from the implementation of the aggregate functions for the query node.

E.g.: select col1, avg( col2) from /portfolios group by col1.

The computed average value for each bucket , cannot be just fed in the avg() function on the query node. The avg() function on each bucket node needs to pass , the sum as well as the number of elements , to the query node. The query node avg function would take these two as input & compute a final value of avg.

For the queries which involve distinct keyword, also requires difference in implementation on the bucket node & query nodes.

E.g.: select col1, sum( distinct col2) from /portfolios group by col1.

Here it will not be possible to pass the individual sum calculated from each of the bucket nodes, to be passed to the query node to compute the final sum. The reason being that we need to sum only distinct values of col2 and it is possible two buckets have the same value & so only one such value should be added to sum. In such case, the sum() function  on the bucket node needs to return a Set of the values seen, rather than sum of those values.

 In built aggregate functions:

  • MAX:   Usage : MAX(Expression)

example : MAX(pf.ID) : The expression must evaluate to a java.lang.Comparable type

  • MIN :   Usage : MIN(Expression)

          example : MIN(pf.ID) : The expression must evaluate to a java.lang.Comparable type 

  • AVG : Usage : AVG(Expression)

example :  AVG(pf.ID)  : The expression must evaluate to java.lang.Number  type.      

  • AVG : Usage : AVG ( distinct expression)
  • SUM:   Usage : SUM(Expression)

example :  SUM(pf.ID)  : The expression must evaluate to java.lang.Number  type.

  • SUM : Usage : SUM( distinct expression)
  • COUNT (*): Already supported ( I think)
  • Count ( expression | column)
  • Count ( distinct expression | column)


User Defined Aggregate Function:


Geode will provide the following interface to be implemented by the user

com.gemstone.gemfire.cache.query.Aggregator

The Aggregator interface will have following methods :

public interface Aggregator {


 /**

  * Accumulate the next scalar value

  *

  * @param value

  */

 public void accumulate(Object value);


 /**

  * Initialize the Aggregator

  */

 public void init();


 /**

  *

  * @return Return the result scalar value

  */

 public Object terminate();


 /**

  * Merges the incoming aggregator from bucket nodes with the resultant aggregator

  * on the query node

  *

  * @param otherAggregator

  */

 public void merge(Aggregator otherAggregator);

}


There should be a zero arg constructor present in the implementing class.



Example of query using built in aggregate


select  pf.ID , SUM( pos.mktValue ) from /portfolios pf , pf.positions pos group by pf.ID

The above query will give the total market value of all the positions for a portfolio identified by its ID


Note :

 As a first cut , we will make it mandatory for the group by column to be part of projection attribute ( Will see if we can make implement in a reasonable period the code to relax this constraint. i.e group by columns not be part of projection columns) 


Proposed Syntax  for using UDA :


Creation of UDA :

  1. QueryService qs = CacheUtils.getQueryService();    qs.createUDA(“udaAlias”, "com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$SumUDA");


  1. XML based creation

<uda-manager>

   <uda name="uda2" class="com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$UDACLass2"/>

   <uda name="uda3" class="com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$UDACLass3"/>

 </uda-manager>



cache-9.0 xsd


<xsd:element maxOccurs="1" minOccurs="0" name="uda-manager">

         <xsd:complexType>

          <xsd:sequence>

           <xsd:element maxOccurs="unbounded" minOccurs="0" name="uda">

             <xsd:complexType>

               <xsd:attribute name="name" type="xsd:string" use="required" />

               <xsd:attribute name="class" type="xsd:string" use="required" />

             </xsd:complexType>

           </xsd:element>

         </xsd:sequence>

        </xsd:complexType>

       </xsd:element>  


Example of using UDA , after defining:


String queryStr = "select p.status , myUDA(p.ID) from /portfolio p where p.ID > 0 group by p.status order by p.status";


This will require us to maintain a registry of the aggregate functions defined & will also have to ensure that it gets created on newly joining nodes .We piggy back on the infra through which the OQL indexes get created on the peers & I assume that we can also create on newly joining nodes.

  • No labels