Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The files for the examples in this guide can be found in the hello world  folder of folder of the source code repository.

...

We also define parameters passed to CREATE AGGREGATE:

  • SFUNC
    • the The name of the state transition function to be called for each input row. The state transition function, avg_var_transition in this example, is defined in the same file avg_var.sql_in and implemented later in C++.
  • FINALFUNC
    • the The name of the final function called to compute the aggregate's result after all input rows have been traversed. The final function, avg_var_final in this example, is defined in the same file avg_var.sql_in and implemented later in C++.
  • PREFUNC
    • the The name of the merge function called to combine the aggregate's state values after each segment, or partition, of data has been traversed. The merge function is needed for distributed datasets on Greenplum and HAWQ. For PostgreSQL, the data is not distributed, and the merge function is not necessary. For completeness we implement a merge function called avg_var_merge_states in this guide.
  • INITCOND
    • the The initial condition for the state value. In this example it is an all-zero double array corresponding to the values of mean, variance, and the number of rows, respectively.

The transition, merge, and final functions are defined in the same file avg_var.sql_in as the aggregate function. More details about those functions can be found in the PostgreSQL documentation.

3. Implement the functions in C++

...

Here the AnyType class works for both passing data from the DBMS to the C++ function, as well as returning values back from C++. Refer to TypeTraits_impl.hpp for more details.

Transition function

 
Code Block
languagecpp
AnyType
avg_var_transition::run(AnyType& args) {
    // get current state value 
    AvgVarTransitionState<MutableArrayHandle<double> > state = args[0];
    // get current row value
    double x = args[1].getAs<double>();
    double d = (x - state.avg);

...

// online update mean
    state.avg += d / static_cast<double>(state.numRows + 1);
    double new_d = (x - state.avg);
    double a = static_cast<double>(state.numRows) / static_cast<double>(state.numRows + 1);

    // online update variance
    state.var = state.var * a + d * new_d / static_cast<double>(state.numRows + 1);
    state.numRows ++;
    return state;

}

Code Block
languagecpp
AnyType 
avgvartransition::run(AnyType& args) {
 
	// get current state value 
	AvgVarTransitionState<MutableArrayHandle<double> > state = args[0]; 
 
	// get current row value 
	double x = args[1].getAs<double>(); 
	double d = (x - state.avg);

	

    // online update mean
    state.avg += d / static_cast<double>(state.numRows + 1);
    double new_d = (x - state.avg);
    double a = static_cast<double>(state.numRows) / static_cast<double>(state.numRows + 1);

    // online update variance
    state.var = state.var * a + d * new_d / static_cast<double>(state.numRows + 1);
    state.numRows ++;
    return state;
}

...

  • There are two arguments for avg_var_transition, as specified in avg_var.sql_in. The first one is an array of SQL double typeSQ: double type, corresponding to the current mean, variance, and number of rows traversed, and the second one is a double representing double representing the current tuple value.
  • We will describe classAvgVarTransitionState later. Basically it takes args[0], a SQL double arraydouble array, passes the data to the appropriate C++ types and stores them in the state instance instance.
  • Both the mean and the variance are updated in an online manner to avoid accumulating a large intermediate sum.

Merge function

...

  • Again: the arguments contained in AnyType& args are defined in avg_var.sql_in.
  • The details are hidden in the method of class AvgVarTransitionState which overloads the operator +=

...

Below is the method that overloads the operator += for the bridging class AvgVarTransitionState

Code Block
languagecpp
/**
 * @brief Merge with another State object
 * 
 * We update mean and variance in a online fashion fashion
 * to avoid intermediate large sum.  
 */ 
template <class OtherHandle> OtherHandle>
AvgVarTransitionState &operator+=( 
	    const AvgVarTransitionState<OtherHandle> &inOtherState) {

	    if (mStorage.size() != inOtherState.mStorage.size())
        throw std::logic_error("Internal error: Incompatible transition "
                               "states");
    double avg_ = inOtherState.avg;
    double var_ = inOtherState.var;
    uint16_t numRows_ = static_cast<uint16_t>(inOtherState.numRows);
    double totalNumRows = static_cast<double>(numRows + numRows_);
    double p = static_cast<double>(numRows) / totalNumRows;
    double p_ = static_cast<double>(numRows_) / totalNumRows;
    double totalAvg = avg * p + avg_ * p_;
    double a = avg - totalAvg;
    double a_ = avg_ - totalAvg;

    numRows += numRows_;
    var = p * var + p_ * var_ + p * a * a + p_ * a_ * a_;
    avg = totalAvg;
    return *this;
}

...

5. Running the new module

To use Now let's run an example using the new module, for example, we can launch psql terminal and apply it to the patients dataset, described here. The result .  We use the  patients dataset from the  MADlib Quick Start Guide for Users.  From the psql terminal, the result below shows that half of the 20 patients have had second heart attacks within 1 year (yes = 1).:

Code Block
languagesql
SELECT madlib.avg_var(second_attack) FROM patients;

    -- ************ --
    --    Result    --
    -- ************ --
    +-------------------+
    | avg_var           |
    |-------------------|
    | [0.5, 0.25, 20.0] |
    +-------------------+
-- (average, variance, count) --

 

...

The files for the above exercise can be found in the examples the hello world folder of the source code repository.

Anchor
Adding Iterative Module
Adding Iterative Module
Adding An Iterative UDF

...