Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: UNDER DISCUSSIONACCEPTED

Discussion thread

JIRA: 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keySAMZA-1804

...

Samza jobs using the High Level API currently specify their inputs and outputs partly in code (i.e., the streamIds), and partly in configuration (e.g., system name, physical name and everything else). Complexity of configuring these inputs and outputs has been a long standing issue for Samza users. We want to allow users to describe their system, inputs, outputs and their properties entirely in code. To do this, we propose new System and Stream "descriptors" APIs. Once the new Now that StreamApplication/TaskApplication APIs for Samza are available (SEP-13), these will be usable may be used for describing inputs and outputs in the Low Level API as well.

...

  1. Create a new SystemDescriptor instance and optionally configure it.
  2. Create Obtain a new InputDescriptor<I> and OutputDescriptor<O> using the SystemDescriptor above and optionally configure them.
  3. Call StreamGraph#getInputStreamCall StreamApplicationDescriptor#getInputStream(InputDescriptor<I>) to get the MessageStream<I> and StreamGraph#getOutputStreamthe MessageStream<I> and StreamApplicationDescriptor#getOutputStream(OutputDescriptor<O>) to get the OutputStream<O>.
  4. Apply transforms on the input MessageStream and produce the results to OutputStream.

...


Users will not be required to describe everything in code - they'll have an option to do so. They may describe as much of their their system/stream properties in code as they like, and the rest may be in configuration. System/Stream properties from user provided descriptors will be converted to their corresponding configurations and merged with the user provided configuration. Descriptor generated properties will override those provided by the user in configuration.


Basic requirements

...

InputTransformer functions allow System implementers to apply custom transformations to messages in their input streams before they're delivered to the applicationHigh Level API StreamApplication. The primary use case of this feature is to allow systems to add additional metadata to each incoming message by extending IncomingMessageEnvelope. In this use case, the transformer is provided by the System implementors as a system level default, and is not visible to the user.

...

StreamExpander functions allow System implementors to expand a single user-provided input descriptor into multiple input descriptors, apply some custom MessageStream transforms on the expanded input streams thus obtained, and then return a new MessageStream to the application High Level API StreamApplication for further processing. The primary use case of this feature is to allow system implementors to resolve a "logical" user provided input stream to multiple underlying "physical" input stream sources at execution planning time and still present a unified logical view to the user for further processing. The interface is:

Code Block
languagejava
titleStreamExpander
/**
 * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamGraphStreamApplicationDescriptor},
 * and returns a new {@link MessageStream} with the combined results. Called when {@link StreamGraph#getInputStreamStreamApplicationDescriptor#getInputStream} 
 * is being used to get a {@link MessageStream} using an {@link InputDescriptor} from an 
 * {@link org.apache.samza.operators.descriptors.base.system.ExpandingSystemDescriptor}
 * <p>
 * This is provided by default by {@link org.apache.samza.operators.descriptors.base.system.ExpandingSystemDescriptor}
 * implementations and can not be overridden or set on a per stream level.
 *
 * @param <OM> type of the messages in the resultant {@link MessageStream}
 */
public interface StreamExpander<OM> extends Serializable {

 /**
  * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamGraphStreamApplicationDescriptor},
  * and returns a new {@link MessageStream} with the combined results. Called when the {@link InputDescriptor}
  * is being used to get an {@link MessageStream} using {@link StreamGraph#getInputStreamStreamApplicationDescriptor#getInputStream}.
  * <p>
  * Note: Take care to avoid infinite recursion in the implementation; e.g., by ensuring that it doesn't call
  * {@link StreamGraph#getInputStreamStreamApplicationDescriptor#getInputStream} with an {@link InputDescriptor} from the an 
  * {@link org.apache.samza.operators.descriptors.base.system.ExpandingSystemDescriptor} (like this one) again.
  *
  * @param streamGraphStreamApplicationDescriptor the {@link StreamGraphStreamApplicationDescriptor} to register the expanded sub-DAG of operators on
  * @param inputDescriptor the {@link InputDescriptor} to be expanded
  * @return the {@link MessageStream} containing the combined results of the sub-DAG of operators
  */
  MessageStream<OM> apply(StreamGraphStreamApplicationDescriptor streamGraphStreamApplicationDescriptor, InputDescriptor inputDescriptor);

}

...

These functions are executed once in the ApplicationRunner when creating/describing the StreamGraphStreamApplication.

We do not plan to support user provided stream level StreamExpanders.

...

In other words, the type patterns above are:


getISD(id)getISD(id, serde)getISD(id, txfn)getISD(id, txfn, serde)
Non Transforming System

SystemMessageType

StreamMessageTypeStreamTransformerTypeStreamTransformerType
Transforming SystemSystemTransformerTypeSystemTransformerTypeStreamTransformerTypeStreamTransformerType
Expanding SystemSystemExpanderTypeSystemExpanderTypeSystemExpanderTypeSystemExpanderType
Expanding Transforming SystemSystemExpanderTypeSystemExpanderTypeSystemExpanderType

SystemExpanderType

Clearly, there are no single SystemDescriptor class that can satisfy the type constraints for all four cases. In fact, there are 3 different sets of interface method signatures above (Case 1-3).

...

To satisfy the requirements above, we propose the following class hierarchy of System and Stream Descriptors.

Base Classes:

  • SystemDescriptor

  • StreamDescriptor
    • InputDescriptor
    • OutputDescriptor

To provide flexibility to system implementers to customize whether and how users get input and output descriptors (e.g. advanced requirement 1), we do not enforce a particular API for getInputDescriptor and getOutputDescriptor. System implementers are free to extend the base SystemDescriptor class and add any getInputDescriptor/getOutputDescriptor variants. We do provide the following interfaces as guidelines for System implementers based on the discussion above:

...

We also provide the following generic implementations of System, InputStream and OutputStream Descriptors. If a system doesn't provide a more specific implementation, users can use these to configure any samza Samza properties and provide the system specific properties as a Map<String, String> of configs.

Generic (System Agnostic) Implementations:

  • GenericSystemDescriptor
  • GenericInputDescriptor
  • GenericOutputDescriptor

These APIs (WIP) for the new descriptors can be found at: https://github.com/prateekm/samza/tree/stream-descriptor/samza-api/src/main/java/org/apache/samza/operators/descriptors

Sample High Level API code (WIP) using these APIs can be found at: https://github.com/prateekm/samza/tree/stream-descriptor/samza-test/src/main/java/org/apache/samza/example

Sample implementations and usage patterns for these APIs (WIP) can be found at: https://github.com/prateekm/samza/tree/stream-descriptor/samza-api/src/test/java/org/apache/samza/operators/descriptors


The new StreamGraph interface The new StreamApplicationDescriptor interface with these classes is:

Code Block
public interface StreamGraphStreamApplicationDescriptor {
  void setDefaultSystem(SystemDescriptor<?, ?> defaultSystemDescriptor);
  <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor);
  <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor);
  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
  StreamGraph withContextManager(ContextManager contextManager);
}


Default System Descriptor and Intermediate Streams

The "default system" for a job, currently specified by setting the "job.default.system" configuration and its properties, is used for creating intermediate streams (e.g. for partitionBy and broadcast), as well as some internal streams like coordinator, checkpoint and store changelog streams (, unless their system has been explicitly set using job.coordinator.system, task.checkpoint.system, or job.changelog.system respectively).

We will add a new StreamGraph#setDefaultSystemApplicationDescriptor#setDefaultSystem(SystemDescriptor) API that can be used to configure the default system used by the job. For example, to provide the default system name and set and configure the default system serde. Like used by the job. Like everything else in the Descriptors API, user should be able to set default system properties in config instead of in code, including the serdes, if they wish to. If they choose to rely on the default system's serde or setting the serde for job.default.system in configs instead of providing one explicitly when using partitionBy, they'll be responsible for figuring out the type of messages in the repartitioned streams and casting them correctly in their applicationin config instead of in code if they wish to.

We will not provide a way to set intermediate stream properties in code using a StreamDescriptor since we don't expect this to be a common use case. If users need to manually configure them, they'll have to find the intermediate stream name and set its properties in configuration.

...

This implies that, unlike other configurations, input/output stream serdes *must* be specified using descriptors, and cannot be specified in configuration. Users and SystemDescriptors may specify an KVSerde<NoOpSerde, NoOpSerde> if they don't want messages in the system/stream to be deserialized by Samza. This is may be useful when the System ConsumerSystemConsumer/Producer handle handles the serialization themselves.
itself.

Going forward, serdes will be required for intermediate streamsSerdes are optional for intermediate streams. If users don't provide an explicit serde for MessageStream#partitionBy, and don't set a default system for the application, they must set the serde for job.default.system and its system-level serdes in their configuration instead. If a default system descriptor is set and its serde is used for intermediate streams, it must be a KVSerde.

Stream Configuration Generation

During StreamApplication#init(StreamGraphStreamApplicationDescriptor), users create System and StreamDescriptors, and get input/output streams for them using StreamGraph#getInputStreamStreamApplicationDescriptor#getInputStream(StreamDescriptor). The StreamGraph tracks StreamApplicationDescriptor tracks all Input/OutputDescriptors and the default SystemDescriptor used by the application. When the planner runs, it generates configurations for the input output streams in the following way:

...

Step 1: Create an instance of the StreamGraph and StreamApplicationDescriptor and the StreamApplication, and call StreamApplication#init(StreamGraphStreamApplicationDescriptor)

Step 2: Collect all the user provided System/StreamDescriptors from the StreamGraphStreamApplicationDescriptor.

Step 3: Merge the user provided configuration (application.cfg) with the Descriptor generated configuration (System/Input/OutputDescriptor#toConfig). Descriptor generated configuration has higher precedence and overrides user provided configuration.

Step 4: Run the planning phase using this merged configuration. The planner internally creates StreamSpec instances based on this merged configuration, mutates these StreamSpecs to set additional inferred stream properties, creates/validates internal streams using them, and then merges StreamSpec generated configuration with previously merged configuration from Step 3. StreamSpec generated configuration has higher precedence.

...

Note: System and Stream descriptor generated configuration have higher precedence than user provided configuration (from cfg2). We will need to offer a mechanism to override these descriptor generated configs in cfg2. This is useful, for example, to support fabric/instance level overrides. This can currently be done by specifying them with a special jobs.job-name.* override scope. For example, the value for config "foo.bar" generated in step 4 can be overridden using the config "jobs.job-name.foo.bar".

...

Compatibility, Deprecation, and Migration Plan

The changes in the StreamGraph StreamApplicationDescriptor interface are backwards incompatible. Specifically, instead of getting an input/output stream using a streamId, users will now have to get an input/output stream using SystemDescriptors and StreamDescriptors.

...