You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

 

NOTE: The features described in this tutorial are Technology Preview (Complete But Not Tested) features in Trafodion Release 1.1.

This is a tutorial on how to write user-defined functions (UDFs) in C++ or Java.

 

Types of UDFs

Trafodion supports several types of UDFs: Scalar UDFs take scalar input parameters and return a single scalar value or a tuple, consisting of multiple values. Scalar UDFs can be used in SQL expressions, for example in the SELECT list or WHERE clause. Table-valued UDFs (TVUDFs) take scalar input parameters and return a table-valued output. They appear in the FROM clause. Then there is a generalization of table-valued UDFs, called Table-mapping UDFs, or TMUDFs. TMUDFs are similar to Map and Reduce operators, like the FROM ... MAP ... REDUCE syntax in HiveQL. They take scalar parameters and optional table-valued inputs, as we will see below.

Step 1a: Write a TMUDF in C++

Currently, we cover only TMUDFs in this tutorial. Note that this covers table-valued UDFs, which are TMUDFs with zero table-valued inputs.

To write a TMUDF, source in file sqludr.h and provide at least three things:

  • A class that is derived from tmudr::UDR.
  • A factory method with a given name that creates and object of your class.
  • The run-time method that implements the UDF as a virtual method of your class.

See C++ Interface for Trafodion UDRs for the documentation of the UDR C++ interface.

#include "sqludr.h"

using namespace tmudr;

// Step 1: derive a class from tmudr::UDR

class HelloWorldUDF : public UDR
{
public:

  // override the runtime method
  virtual void processData(UDRInvocationInfo &info,
                           UDRPlanInfo &plan);
};

// Step 2: Create a factory method

extern "C" UDR * HELLO_WORLD()
{
  return new HelloWorldUDF();
}

// Step 3: Write the actual UDF code

void HelloWorldUDF::processData(UDRInvocationInfo &info,
                                UDRPlanInfo &plan)
{
  // set the output column
  info.out().setString(0, "Hello world!");

  // produce a single output row
  emitRow(info);
}

Then compile the code into a DLL. Note: This assumes you put your code into a file HelloWorld.cpp:

g++ -g -I$MY_SQROOT/export/include/sql -fPIC -c -o HelloWorld.o HelloWorld.cpp
g++ -shared -rdynamic -o libhelloworld.so HelloWorld.o

Step 1b: Alternatively, write the UDF in Java

This is not yet supported. You can write stored procedures in Java, though.

Step 2: Create a library and a table mapping UDF in SQL

drop function helloworld;
drop library helloworldlib;

create library helloworldlib file '<put directory of your DLL here>/libhelloworld.so';

create table_mapping function helloworld()
   returns (col1 char(40))
   external name 'HELLO_WORLD' -- name of the factory method
   language cpp
   library helloworldlib;

Step 3: Use the function

select * from udf(helloworld());

Let's look at a few more details:

explain select * from udf(helloworld());
showddl table_mapping function helloworld;
get table_mapping functions for library helloworldlib;

A more complex example: Sessionize

Next, we will look at an example that justifies adding several methods of the TMUDF compiler interface. We want to write a generic "Sessionize" TMUDF with the following properties:

  • The Sessionize TMUDF will take one table-valued input and it will find the unique user sessions in that input. The ORDER BY clause for this table-valued input specifies the timestamp column. The input table can also have a PARTITION BY clause, to sessionize multiple users at the same time.
  • We want to return all the columns of the input table, those are called pass-through columns. The UDF also generates additional columns for the session_id and a sequence number.
  • We want this TMUDF to be generic and usable on any table. This means that we can't specify the output columns when we create the UDF, this has to be done at compile time.
  • The session timeout is specified as a scalar parameter.
  • We want to run the TMUDF in parallel, of course.

To show some advanced features of the UDR interface, we also want to eliminate any pass-through columns that are not required by the query and we want to be able to optimize the handling of some predicates as well as generate uniqueness constraints to enable optimizations.

If we would write this as a MapReduce job, we would have an empty mapper, use the user id column as the key for the reducer, would sort the rows on the timestamp column, to make it easy for the reducer to recognize the sessions. Next, we'll see how to do this with a TMUDF in Trafodion. Note that a similar UDF is part of regression test trafodion/core/sql/regress/udr/TEST001.

Here are the methods implemented for the Sessionize UDF and the factory function:

#include "sqludr.h"

using namespace tmudr;

// Step 1: derive a class from UDR

class Sessionize : public UDR
{
public:

  // determine output columns dynamically at compile time
  void describeParamsAndColumns(UDRInvocationInfo &info);

  // eliminate unused columns and help with predicate pushdown
  void describeDataflowAndPredicates(UDRInvocationInfo &info);

  // generate constraints for the table-valued result
  void describeConstraints(UDRInvocationInfo &info);

  // override the runtime method
  void processData(UDRInvocationInfo &info,
                   UDRPlanInfo &plan);
};

// Step 2: Create a factory method

extern "C" UDR * SESSIONIZE()
{
  return new Sessionize();
}

Optional: Determine input and result parameters of the UDF dynamically

To specify output columns at compile time and to validate some information, we implement this virtual method:

void Sessionize::describeParamsAndColumns(UDRInvocationInfo &info)
{
  // First, validate PARTITION BY and ORDER BY columns

  // Make sure we have exactly one table-valued input, otherwise
  // generate a compile error
  if (info.getNumTableInputs() != 1)
    throw UDRException(38000,
                       "%s must be called with one table-valued input",
                       info.getUDRName().data());

  // check whether there is a PARTITION BY for the input table
  // that specifies the single partitioning column we support
  const PartitionInfo &queryPartInfo = info.in().getQueryPartitioning();

  if (queryPartInfo.getType() != PartitionInfo::PARTITION ||
      queryPartInfo.getNumEntries() != 1)
    throw UDRException(
         38001,
         "Expecting a PARTITION BY clause with a single column for the input table.");

  // check whether there is an ORDER BY for the input table,
  // indicating the timestamp column
  const OrderInfo &queryOrderInfo = info.in().getQueryOrdering();

  if (queryOrderInfo.getNumEntries() != 1 ||
      queryOrderInfo.getOrderType(0) != OrderInfo::ASCENDING)
    throw UDRException(
         38002,
         "Expecting an ORDER BY with a single ascending column for the input table, indicating the timestamp column");
 
  // make sure the timestamp colum is of a numeric type
  int tsCol = queryOrderInfo.getColumnNum(0);
  const TypeInfo &tsType = info.in().getColumn(tsCol).getType();

  if (tsType.getSQLTypeSubClass() != TypeInfo::EXACT_NUMERIC_TYPE)
    throw UDRException(38003, "The ORDER BY of the input table must be on an exact numeric column");

  // the scalar parameter is defined in the DDL and does not need to be checked

  // Second, define the output parameters

  // add the columns for session id and sequence number
  // (sequence_no is a unique sequence number within the session)
  info.out().addLongColumn("SESSION_ID");  // column number 0
  info.out().addLongColumn("SEQUENCE_NO"); // column number 1
 
  // Make all the input table columns also output columns,
  // those are called "pass-through" columns. The default
  // parameters of this method add all the columns of the
  // first input table.
  info.addPassThruColumns();

  // set the function type, sessionize behaves like a reducer in
  // MapReduce. Session ids are local within rows that share the
  // same id column value.
  info.setFuncType(UDRInvocationInfo::REDUCER);
}

Optional: Eliminate unneeded columns and push predicates down

Implementing this method will allow the TMUDF writer to answer the following questions:

  • Question 1: Here is a list of the output columns that are required by this query. This is a subset of the output columns we saw in the previous step. Given that, do you want to eliminate some of the output columns? Also, can you eliminate columns of the child tables?
  • Question 2: Here is a list of predicates that need to be evaluated on the result of the TMUDF. Given that, would you like to evaluate some of these inside the TMUDF or do you want to evaluate some of them on the child tables before that data even reaches you?

In the default method, no unused columns are eliminated. The default answer to handling predicates depends on the function type. For TMUDFs of type MAPPER, all predicates on pass-through columns are pushed down, since a mapper does not carry any state between rows. For type REDUCER, only predicates on the PARTITION BY columns are pushed down, if they are declared as pass-through columns. A reducer carries no state between partitions and since such predicates eliminate entire partitions, doing so should not interfere with a reducer. For the default function type GENERIC, no predicates are pushed down.

For our Sessionize function, we will eliminate any unused pass-through columns, for efficiency. Also, we can think what predicates could be pushed down over the Sessionize function to its input table, in addition to the default behavior of a REDUCER type function. Could we push any predicates down that reference arbitrary columns of the input table? Generally, no, since those could eliminate some rows that could cause us to split a session into two. If, however, the query does not reference the session id or sequence number columns, we can allow such a pushdown, since the session id is irrelevant in that case. This sample TMUDF also implements a simple SESSION_ID < <const> predicate to demonstrate predicate evaluation inside a UDF. All this code is just for performance optimization, with possibly very big improvements, but the UDF will work fine without it.

Here is the code to implement eliminating unused columns and predicate push-down:

void Sessionize::describeDataflowAndPredicates(UDRInvocationInfo &info)
{
  // Start with the default behavior for a reducer, pushing down
  // any predicates on the key/id column.
  UDR::describeDataflowAndPredicates(info);

  // Make sure we don't require any unused passthru columns
  // from the child/input table. NOTE: This can change the
  // column numbers for our id and timestamp columns!
  info.setUnusedPassthruColumns();

  bool generatedColsAreUsed =
    (info.out().getColumn(0).getUsage() == ColumnInfo::USED ||
     info.out().getColumn(1).getUsage() == ColumnInfo::USED);

  // Walk through predicates and find additional ones to push down
  // or to evaluate locally
  for (int p=0; p<info.getNumPredicates(); p++)
    {
      if (!generatedColsAreUsed)
        {
          // If session_id/sequence_no are not used in the query, then
          // we can push all predicates to the children.
          info.setPredicateEvaluationCode(p, PredicateInfo::EVALUATE_IN_CHILD);
        }
      else if (info.isAComparisonPredicate(p))
        {
          // For demo purposes, accept predicates of the
          // form "session_id < const" to be evaluated in the UDF.
          const ComparisonPredicateInfo &cpi = info.getComparisonPredicate(p);

          if (cpi.getColumnNumber() == 0 /* SESSION_ID */ &&
              cpi.getOperator() == PredicateInfo::LESS &&
              cpi.hasAConstantValue())
            info.setPredicateEvaluationCode(p, PredicateInfo::EVALUATE_IN_UDF);
        }
    }
}

Optional: Help the optimizer by providing constraints

By providing simple constraints on the output of a TMUDF, the UDF writer can enable powerful optimizations in the Trafodion compiler, for example it can result in elimination of GROUP BY operators if the GROUP BY columns are already unique.

For our Sessionize UDF, the following code propagates certain constraints for pass-through columns and provides a unique key for the output of the UDF:

void Sessionize::describeConstraints(UDRInvocationInfo &info)
{
  // The sessionize UDF produces at most one result row for every input
  // row it reads. This means it can propagate certain constraints on
  // its input tables to the result.
  info.propagateConstraintsFor1To1UDFs(false);

  UniqueConstraintInfo uc;
  int idColNum = info.in().getQueryPartitioning().getColumnNum(0);

  // The partitioning columns of the input table, together with
  // session id and sequence_no, form a unique key. Generate a
  // uniqueness constraint for that.
  for (int c=0; c<info.out().getNumColumns(); c++)
    if (info.out().getColumn(c).getProvenance().getInputColumnNum() == idColNum)
      uc.addColumn(c);
  uc.addColumn(0); // the session id is always column #0
  uc.addColumn(1); // the sequence number is always column #1
  info.out().addUniquenessConstraint(uc);

}

Optional: Help the optimizer by providing row count and cost estimates

This interface is not yet supported.

Optional: Specify the degree of parallelism

With this interface, the TMUDF writer can help answer the following question:

  • Question: Can this TMUDF be executed in parallel? If so, what would be a good degree of parallelism?
  • Default answer: If the TMUDF takes one table valued input and if the function type is MAPPER or REDUCER, then we will assume that it can execute in parallel, with the degree of parallelism determined by the optimizer. Otherwise, we will execute it serially.

The reason for this default is that we assume the TMUDF writer to be familar with MapReduce. MapReduce assumes that we don't carry any state between rows in a mapper or between different keys in a reducer. This allows us to enable parallel execution by default.

For the Sessionize TMUDF, the default behavior works fine. We just need to make sure that rows with the same user column value get processed by a single instance of the TMUDF. Since we specified the user column through the PARTITION BY syntax, this is handled automatically by the Trafodion engine.

Optional: Specify ordering and partitioning of the query plan

This interface is not yet supported. For the Sessionize UDF, this interface is not required.

Optional: Pass data from compile-time to run-time interface

Sometimes, the TMUDF does complex analysis at compile time. The run-time method might need some of this information. For example, a TMUDF that reads data from MySQL might construct the necessary SQL query in the compiler interface. It would then need to pass this query to the runtime instance(s). This can be done here.

For the Sessionize TMUDF this is not necessary.

Runtime logic for Sessionize

Once we added the compiler methods, the runtime interface implements the actual logic of the function:

void Sessionize::processData(UDRInvocationInfo &info,
                             UDRPlanInfo &plan)
{
  int userIdColNum    = info.in().getQueryPartitioning().getColumnNum(0);
  int timeStampColNum = info.in().getQueryOrdering().getColumnNum(0);
  long timeout        = info.par().getLong(0);

  // variables needed for computing the session id
  long lastTimeStamp = 0;
  std::string lastUserId;
  long currSessionId = 1;
  long currSequenceNo = 1;
  int maxSessionId = 999999999;

  if (info.getNumPredicates() > 0)
    {
      // based on the describeDataflowAndPredicates() method, this must be
      // a predicate of the form SESSION_ID < const that we need
      // to evaluate inside this method
      std::string maxValue = info.getComparisonPredicate(0).getConstValue();

      sscanf(maxValue.c_str(), "%d", &maxSessionId);
    }

  // loop over input rows
  while (getNextRow(info))
  {
    long timeStamp = info.in(0).getLong(timeStampColNum);
    std::string userId = info.in(0).getString(userIdColNum);

    // check for a change of the user id
    if (lastUserId != userId)
      {
        // reset timestamp check and start over with session id 0
        lastTimeStamp = 0;
        currSessionId = 1;
        currSequenceNo = 1;
        lastUserId = userId;
      }

    // check for expiry of the session timeout
    long tsDiff = timeStamp - lastTimeStamp;

    if (tsDiff > timeout && lastTimeStamp > 0)
      {
        currSessionId++;
        currSequenceNo = 1;
      }
    else if (tsDiff < 0)
      throw UDRException(
           38100,
           "Got negative or descending timestamps %ld, %ld",
           lastTimeStamp, timeStamp);

    lastTimeStamp = timeStamp;

    // this evaluates the SQL predicate on SESSION_ID
    if (currSessionId < maxSessionId)
      {
        // produce session_id and sequence_no output columns
        info.out().setLong(0, currSessionId);
        info.out().setLong(1, currSequenceNo);

        // produce the remaining columns and emit the row
        info.copyPassThruData();
        emitRow(info);
        currSequenceNo++;
      }
  }
}

Additional considerations

Debugging UDF code

To debug a UDF, attach a debugger to the process that is executing your code. We recommend that you use a debug build of Trafodion, this will enable a few helpful features. In the sqludr.h file, you'll find an enum DebugFlags that enables them - but only with a debug build. These flags are set via a CONTROL QUERY DEFAULT in SQL:

cqd UDR_DEBUG_FLAGS '<num>'

Set <num> to the sum (expressed as a decimal number) of all the flags you want to set. To do any of these tasks, you should log on to a Trafodion node, using the trafodion user id, and invoke the UDF through the sqlci tool. Otherwise, you will not be able to see information that is printed out, and it will be harder to clean up after the debug session.

Here is a short description of what some of these flags do:

  • DEBUG_INITIAL_RUN_TIME_LOOP_ONE (1): Loop in the C++ or Java code right before entering the run-time interface. If the UDF is executed in parallel, loop only in the first instance. Note that this could be on any of the Trafodion nodes. To find the instance, you can log on as trafodion on the cluster and issue this command: pdsh $MY_NODES top -b -n 1 | grep tdm_udrserv. This should identify the looping tdm_udrserv process. Attach a debugger to the looping process and use it to force the process out of the loop, then continue as you wish. If you don't have the Trafodion source code handy, the way to exit the loop is by setting debugLoop to 2 (debugLoop = 2). This has to be done on the right line (the first of the 2 lines of the loop).
  • DEBUG_INITIAL_RUN_TIME_LOOP_ALL (2): This flag will put all the parallel run-time instances into a debug loop. You'll have to free them all, otherwise your SQL statement will be stuck.
  • DEBUG_INITIAL_COMPILE_TIME_LOOP (4): This will loop when calling the compile-time interface. Note that the compile-time interface is invoked in a different process.
  • DEBUG_LOAD_MSG_LOOP (8): This will loop in the C++ code very early-on, to debug problems that occur in the earliest stages - hopefully not needed by UDF writers.
  • TRACE_ROWS (16): Prints out rows as they are read and produced by the UDF.
  • PRINT_* flags (64 and 128): This prints various pieces of information.
  • VALIDATE_WALLS (256): This will put a "wall" (a bit pattern with a conspicuous value) before and after the output data buffer and it will check whether these patterns get corrupted. If so, it will raise an exception.

Debugging UDFs with a Trafodion release build

In some cases you will have to use a release build (usually from downloads.trafodion.org). In that case, it is still a good idea to log on as the trafodion id and use the sqlci tool. Alternatively, you can connect with a tool like trafci or another ODBC/JDBC client and use the DCS web GUI to find out which server you are connected to. For the UDR compiler interface, debug the sqlci tool directly or the mxosrvr process connected to your ODBC/JDBC client. For the runtime interface, debug the tdm_udrserv process whose parent is your sqlci/mxosrvr. With any of these methods you will need to be able to run commands under the trafodion id.

  • No labels