Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This document provides a proposal to add Streaming UDFs to Pig. The goal of Streaming UDFs is to allow users to easily write UDFs in scripting languages with no JVM implementation or a limited JVM implementation. Examples include:

...

  • Languages without (widely-used) JVM implementations: R, perl, ...
  • Languages with JVM implementations, but w/o support for C-extension modules

...

    • C extensions Python extensions.

...

Currently, the best way to use JVM-unfriendly code in these languages from Pig is the STREAM operator. STREAM has several limitations, however:

...

Code Block
-- syntax for define needs work to disambiguate from embedded JVM UDFs
define my_streaming_udfs `my_streaming_udfs.py` ship('mystreamingudfs.py');

-- load up the data
searches = LOAD 'excite-small.log' USING PigStorage('\t') AS (user_id:chararray, timestamp:chararray, query:chararray);

-- execute streaming UDF on subset of fields
with_parts_of_speech = FOREACH searches 
                      GENERATE user_id, timestamp, 
                               FLATTEN(my_streaming_udfs.part_of_speech(query));

I think a lot of this functionality could be built on top of the existing pig streaming support. WeSTREAM support. The basic idea is to provide the boilerplate code in each supported target language, and execute that in a streaming fashion. I think we'd need a few more things in place:

Controller Script (Language-Specific)

We'd need a per-language controller script that pig would ship to the cluster and invoke get invoked once per UDF. It would be written in the language to be supported, and would:

  • Be launched in a separate process by Pig
  • Dynamically import the user's UDF script(s)
  • Pull in metadata about which UDF to call and how to call it (script, function name, expected fields, data types). This would likely be provided by pig in a separate metadata file.
  • Optionally determine the expected output fields/data types from the UDF itself (TBD)
  • Open up the input stream of data on stdin
  • For each incoming tuple: pull the row, deserialize, type cast, and pass to the UDF
  • Receive the output from the UDF, serialize and pipe back to stdout

...

StreamingUDF EvalFunc

We'd need a new type of EvalFunc for running streaming UDFs. It would use much of the functionality of the STREAM operator, but additionally would:

  • Ship the per-language Controller Script into the cluster along with user's UDF script
  • Write out and pass metadata about the UDF to be called to the Controller Script
  • Exec the Controller Script (TBD how this is done, see Open Questions below)
  • Be able to receive a spec for the UDF's output schema from the controller script along with the output data itself

...

  • How do we execute the stream process? Particularly, how do we know where the scripting language executable lives? We could get this from the UDF script and transfer it to the controller script, defaulting to #!/usr/bin/env python.
  • How can we ensure that this process will work for a good cross-section of scripting languages to support?