Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: UNDER DISCUSSION

Discussion thread

Motivation

Samza jobs using the High Level API currently specify their inputs and outputs partly in code (i.e., the streamIds), and partly in configuration (e.g., system name, physical name and everything else). Complexity of configuring these inputs and outputs has been a long standing issue for Samza users. We want to allow users to describe their system, inputs, outputs and their properties entirely in code. To do this, we propose new System and Stream "descriptors" APIs. Once the new StreamApplication/TaskApplication APIs for Samza are available (SEP-13), these will be usable for describing inputs and outputs in the Low Level API as well.

...

  1. Allow users to specify their system and stream properties in code instead of configuration.
  2. Let System implementers set reasonable defaults for their systems and streams (e.g., enforce Avro as the message serde).
  3. Support advanced use cases like delivering incoming message metadata to the application, and logical to physical stream expansion.

Requirements

User Experience

The general user workflow should be:

...

Code Block
languagejava
titleUsage Example At LinkedIn
KafkaInputDescriptor<KV<String, GenericRecord>> pvd = KafkaInputDescriptor.from("PageViewEvent", "tracking"); // properties for the "tracking" system are provided in configuration
KafkaOutputDescriptor<KV<String, PageViewCount>> pvcd = KafkaInputDescriptor.from("PageViewCount", "metrics"); // properties for the "metrics" system are provided in configuration

MessageStream<KV<String, GenericRecord>> pvs = graph.getInputStream(pvd); 
OutputStream<KV<String, PageViewCount>> pvcs = graph.getOutputStream(pvcd); 

pvs.window(...).map(...).sendTo(pvcs);

Basic requirements

The basic requirements for the new APIs are:

  1. Users should be able to set any Samza properties for their input and output streams using these descriptors (e.g., bootstrap status, input priority, offset management, serdes etc.).
  2. System implementers should be able to provide custom implementations of these descriptors that support additional system-specific properties. For example, a KafkaSystemDescriptor may allow setting Kafka producer and consumer properties. 
    Users should chose the extended descriptor for their system if one is available. Else, they may use the Samza-provided GenericSystemDescriptor/GenericInputDescriptor/GenericOutputDescriptor implementations and provide system specific properties as the appropriate key-value configurations.
  3. Users should be able to use a fluent API for configuring these descriptors.
  4. The types of MessageStream/OutputStream created using these descriptors should match their contents. E.g., for simple descriptors, the type of Message/OutputStream should be determined by the system or stream level Serdes, whether provided by the system as a default, or configured by the user explicitly.
  5. Users should be able to provide a default SystemDescriptor which should be used to configure the properties of the system used for creating intermediate streams in the application. This is equivalent to setting and configuring job.default.system and its properties.

Advanced requirements

We will allow System implementers to extend these Descriptor implementations so that they can provide the following additional features to users:

System Specific Serdes

System implementers should be able to dictate a specific serialization format for messages in their streams. For example, if a system enforces Avro as the message format, its input and output descriptors should have a KV<String, GenericRecord> message type by default. The API will always allow users to provide a Stream level serde for input/output streams that may be different than the System specific serde. System implementors may choose to disallow this by throwing RuntimeException in these methods.

Input Transformers

InputTransformer functions allow System implementers to apply custom transformations to messages in their input streams before they're delivered to the application. The primary use case of this feature is to allow systems to add additional metadata to each incoming message by extending IncomingMessageEnvelope. In this use case, the transformer is provided by the System implementors as a system level default, and is not visible to the user.

...

Users may optionally override the system default transformer or provide one for an input stream themselves. This might be useful, for example, if they need access to some information in the IncomingMessageEnvelope like the offset or the message timestamp.

Stream Expanders

StreamExpander functions allow System implementors to expand a single user-provided input descriptor into multiple input descriptors, apply some custom MessageStream transforms on the expanded input streams thus obtained, and then return a new MessageStream to the application for further processing. The primary use case of this feature is to allow system implementors to resolve a "logical" user provided input stream to multiple underlying "physical" input stream sources at execution planning time and still present a unified logical view to the user for further processing. The interface is:

...

Similar to a SystemDescriptor with a InputTransformer, the type of the messages in their MessageStream (but not OutputStream) obtained from their InputDescriptors will always be the result type of the StreamExpander. Same parameterization rules apply.

Proposed Implementation

Base Classes and Type Bounds

The requirements above impose some constraints on the number and type parameters of the new API classes:

...

Because of these reasons, we consider this API to be a reasonable tradeoff for providing a type safe fluent descriptor API to our users.

Proposed APIs

To satisfy the requirements above, we propose the following class hierarchy of System and Stream Descriptors.

...

Code Block
public interface StreamGraph {
  void setDefaultSystem(SystemDescriptor<?, ?> defaultSystemDescriptor);
  <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor);
  <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor);
  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
  StreamGraph withContextManager(ContextManager contextManager);
}


Default System Descriptor and Intermediate Streams

The "default system" for a job, currently specified by setting the "job.default.system" configuration and its properties, is used for creating intermediate streams (for partitionBy), as well as some internal streams like coordinator, checkpoint and store changelog streams (unless their system has been explicitly set using job.coordinator.system, task.checkpoint.system, or job.changelog.system respectively).

...

This default system will also not be used to create input and output streams. I.e., users will always have to associate an input/output stream explicitly with a system name, either by getting them from a system descriptor, or by providing the name when creating the InputDescriptor using the InputStream#from convenience factory methods.

Serdes For Streams

Every InputDescriptor and OutputDescriptor in the DAG must have a non-null Serde instance associated with it. This serde may be:

...

Serdes are optional for intermediate streams. If users don't provide an explicit serde for MessageStream#partitionBy, and don't set a default system for the application, they must set the serde for job.default.system and its system-level serdes in their configuration instead. If a default system descriptor is set and its serde is used for intermediate streams, it must be a KVSerde.

Stream Configuration Generation

During StreamApplication#init(StreamGraph), users create System and StreamDescriptors, and get input/output streams for them using StreamGraph#getInputStream(StreamDescriptor). The StreamGraph tracks all Input/OutputDescriptors and the default SystemDescriptor used by the application. When the planner runs, it generates configurations for the input output streams in the following way:

...

Note: In the case where there are "expanded" streams, configs will be generated for the "expanded" streams only, not the initial logical stream.

Default Config Values and Descriptors

Some configurations have default values associated with them. E.g., default value of "streams.stream-id.samza.bootstrap" is false. Similarly, users may get default values from some configurations from another source (e.g., app-def). 

...

By convention, all such fields are marked as Optional<T> to clarify that they may not have been set by the user.

Compatibility, Deprecation, and Migration Plan

The changes in the StreamGraph interface are backwards incompatible. Specifically, instead of getting an input/output stream using a streamId, users will now have to get an input/output stream using StreamDescriptors.

...