Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Samza jobs using the High Level API currently specify their inputs and outputs partly in code (i.e., the streamIds), and partly in configuration (e.g., system name, physical name and everything else). Complexity of configuring these inputs and outputs has been a long standing issue for Samza users. We want to allow users to describe their system, inputs, outputs and their properties entirely in code. To do this, we propose new System and Stream "descriptors" APIs. Once the new Now that StreamApplication/TaskApplication APIs for Samza are available (SEP-13), these will be usable may be used for describing inputs and outputs in the Low Level API as well.

...

  1. Create a new SystemDescriptor instance and optionally configure it.
  2. Create Obtain a new InputDescriptor<I> and OutputDescriptor<O> using the SystemDescriptor above and optionally configure them.
  3. Call StreamGraph#getInputStream(InputDescriptor<I>) to get the MessageStream<I> and StreamGraph#getOutputStream(OutputDescriptor<O>) to get the OutputStream<O>.
  4. Apply transforms on the input MessageStream and produce the results to OutputStream.

...


Users will not be required to describe everything in code - they'll have an option to do so. They may describe as much of their their system/stream properties in code as they like, and the rest may be in configuration. System/Stream properties from user provided descriptors will be converted to their corresponding configurations and merged with the user provided configuration. Descriptor generated properties will override those provided by the user in configuration.


Basic requirements

...

InputTransformer functions allow System implementers to apply custom transformations to messages in their input streams before they're delivered to the applicationHigh Level API StreamApplication. The primary use case of this feature is to allow systems to add additional metadata to each incoming message by extending IncomingMessageEnvelope. In this use case, the transformer is provided by the System implementors as a system level default, and is not visible to the user.

...

StreamExpander functions allow System implementors to expand a single user-provided input descriptor into multiple input descriptors, apply some custom MessageStream transforms on the expanded input streams thus obtained, and then return a new MessageStream to the application High Level API StreamApplication for further processing. The primary use case of this feature is to allow system implementors to resolve a "logical" user provided input stream to multiple underlying "physical" input stream sources at execution planning time and still present a unified logical view to the user for further processing. The interface is:

...

These functions are executed once in the ApplicationRunner when creating/describing the StreamGraphStreamApplication.

We do not plan to support user provided stream level StreamExpanders.

...

To satisfy the requirements above, we propose the following class hierarchy of System and Stream Descriptors.

Base Classes:

  • SystemDescriptor

  • StreamDescriptor
    • InputDescriptor
    • OutputDescriptor

To provide flexibility to system implementers to customize whether and how users get input and output descriptors (e.g. advanced requirement 1), we do not enforce a particular API for getInputDescriptor and getOutputDescriptor. System implementers are free to extend the base SystemDescriptor class and add any getInputDescriptor/getOutputDescriptor variants. We do provide the following interfaces as guidelines for System implementers based on the discussion above:

...

We also provide the following generic implementations of System, InputStream and OutputStream Descriptors. If a system doesn't provide a more specific implementation, users can use these to configure any samza Samza properties and provide the system specific properties as a Map<String, String> of configs.

Generic (System Agnostic) Implementations:

  • GenericSystemDescriptor
  • GenericInputDescriptor
  • GenericOutputDescriptor

These APIs (WIP) for the new descriptors can be found at: https://github.com/prateekm/samza/tree/stream-descriptor/samza-api/src/main/java/org/apache/samza/operators/descriptors

Sample High Level API code (WIP) using these APIs can be found at: https://github.com/prateekm/samza/tree/stream-descriptor/samza-test/src/main/java/org/apache/samza/example

Sample implementations and usage patterns for these APIs (WIP) can be found at: https://github.com/prateekm/samza/tree/stream-descriptor/samza-api/src/test/java/org/apache/samza/operators/descriptors

The new StreamGraph interface with these classes is:


The new StreamApplicationDescriptor interface with these classes is:

Code Block
public interface StreamApplicationDescriptor 
Code Block
public interface StreamGraph {
  void setDefaultSystem(SystemDescriptor<?, ?> defaultSystemDescriptor);
  <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor);
  <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor);
  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
  StreamGraph withContextManager(ContextManager contextManager);
}


Default System Descriptor and Intermediate Streams

The "default system" for a job, currently specified by setting the "job.default.system" configuration and its properties, is used for creating intermediate streams (e.g. for partitionBy and broadcast), as well as some internal streams like coordinator, checkpoint and store changelog streams (, unless their system has been explicitly set using job.coordinator.system, task.checkpoint.system, or job.changelog.system respectively).

We will add a new StreamGraph#setDefaultSystemApplicationDescriptor#setDefaultSystem(SystemDescriptor) API that can be used to configure the default system used by the job. For example, to provide the default system name and set and configure the default system serde. Like used by the job. Like everything else in the Descriptors API, user should be able to set default system properties in config instead of in code, including the serdes, if they wish to. If they choose to rely on the default system's serde or setting the serde for job.default.system in configs instead of providing one explicitly when using partitionBy, they'll be responsible for figuring out the type of messages in the repartitioned streams and casting them correctly in their applicationin config instead of in code if they wish to.

We will not provide a way to set intermediate stream properties in code using a StreamDescriptor since we don't expect this to be a common use case. If users need to manually configure them, they'll have to find the intermediate stream name and set its properties in configuration.

...

This implies that, unlike other configurations, input/output stream serdes *must* be specified using descriptors, and cannot be specified in configuration. Users and SystemDescriptors may specify an KVSerde<NoOpSerde, NoOpSerde> if they don't want messages in the system/stream to be deserialized by Samza. This is may be useful when the System ConsumerSystemConsumer/Producer handle the serialization themselves.
handles the serialization itself.

Going forward, serdes will be required for intermediate streamsSerdes are optional for intermediate streams. If users don't provide an explicit serde for MessageStream#partitionBy, and don't set a default system for the application, they must set the serde for job.default.system and its system-level serdes in their configuration instead. If a default system descriptor is set and its serde is used for intermediate streams, it must be a KVSerde.

Stream Configuration Generation

During StreamApplication#init(StreamGraph), users create System and StreamDescriptors, and get input/output streams for them using StreamGraph#getInputStreamStreamApplicationDescriptor#getInputStream(StreamDescriptor). The StreamGraph tracks StreamApplicationDescriptor tracks all Input/OutputDescriptors and the default SystemDescriptor used by the application. When the planner runs, it generates configurations for the input output streams in the following way:

...

Step 1: Create an instance of the StreamGraph and StreamApplicationDescriptor and the StreamApplication, and call StreamApplication#init(StreamGraphStreamApplicationDescriptor)

Step 2: Collect all the user provided System/StreamDescriptors from the StreamGraph.

Step 3: Merge the user provided configuration (application.cfg) with the Descriptor generated configuration (System/Input/OutputDescriptor#toConfig). Descriptor generated configuration has higher precedence and overrides user provided configuration.

Step 4: Run the planning phase using this merged configuration. The planner internally creates StreamSpec instances based on this merged configuration, mutates these StreamSpecs to set additional inferred stream properties, creates/validates internal streams using them, and then merges StreamSpec generated configuration with previously merged configuration from Step 3. StreamSpec generated configuration has higher precedence.

...

Note: System and Stream descriptor generated configuration have higher precedence than user provided configuration (from cfg2). We will need to offer a mechanism to override these descriptor generated configs in cfg2. This is useful, for example, to support fabric/instance level overrides. This can currently be done by specifying them with a special jobs.job-name.* override scope. For example, the value for config "foo.bar" generated in step 4 can be overridden using the config "jobs.job-name.foo.bar".

...