You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: UNDER DISCUSSION

Discussion thread

JIRA:  Unable to render Jira issues macro, execution error.

Motivation

Samza jobs using the High Level API currently specify their inputs and outputs partly in code (i.e., the streamIds), and partly in configuration (e.g., system name, physical name and everything else). Complexity of configuring these inputs and outputs has been a long standing issue for Samza users. We want to allow users to describe their system, inputs, outputs and their properties entirely in code. To do this, we propose new System and Stream "descriptors" APIs. Once the new StreamApplication/TaskApplication APIs for Samza are available (SEP-13), these will be usable for describing inputs and outputs in the Low Level API as well.

We want to support the following use cases:

  1. Allow users to specify their system and stream properties in code instead of configuration.
  2. Let System implementers set reasonable defaults for their systems and streams (e.g., enforce Avro as the message serde).
  3. Support advanced use cases like delivering incoming message metadata to the application, and logical to physical stream expansion.

Requirements

User Experience

The general user workflow should be:

  1. Create a new SystemDescriptor instance and optionally configure it.
  2. Create new InputDescriptor<I> and OutputDescriptor<O> using the SystemDescriptor above and optionally configure them.
  3. Call StreamGraph#getInputStream(InputDescriptor<I>) to get the MessageStream<I> and StreamGraph#getOutputStream(OutputDescriptor<O>) to get the OutputStream<O>.
  4. Apply transforms on the input MessageStream and produce the results to OutputStream.

As an example:

Usage Example
KafkaSystemDescriptor<KV<String, GenericRecord>> ksd = new KafkaSystemDescriptor<>("local") // system name 
    .withConsumerZkConnect("localhost:80")
    .withProducerBootstrapServers(Collections.singletonList("localhost:81"))
    .withSamzaFetchThreshold(10000)
    .withProducerConfigs(ImmutableMap.of(...));

KafkaInputDescriptor<KV<String, GenericRecord>> isd = ksd.getInputDescriptor("input") // stream-id 
    .withPhysicalName("my-input-stream") // physical name 
    .withBootstrap(true); 
KafkaOutputDescriptor<KV<String, GenericRecord>> osd = ksd.getOutputDescriptor("output"); // stream-id 

MessageStream<KV<String, GenericRecord>> is = graph.getInputStream(isd); 
OutputStream<KV<String, GenericRecord>> os = graph.getOutputStream(osd); 

is.map(...).filter(...).sendTo(os);


Users will not be required to describe everything in code - they'll have an option to do so. They may describe as much of their their system/stream properties in code as they like, and the rest may be in configuration. System/Stream properties from user provided descriptors will be converted to their corresponding configurations and merged with the user provided configuration. Descriptor generated properties will override those provided by the user in configuration.

For simple cases, where system configuration is either unnecessary or provided by default, System implementers may provide convenience factory methods for Input/OutputDescriptors so that they can be obtained using just the system name. For example;

Usage Example At LinkedIn
KafkaInputDescriptor<KV<String, GenericRecord>> pvd = KafkaInputDescriptor.from("PageViewEvent", "tracking"); // properties for the "tracking" system are provided in configuration
KafkaOutputDescriptor<KV<String, PageViewCount>> pvcd = KafkaInputDescriptor.from("PageViewCount", "metrics"); // properties for the "metrics" system are provided in configuration

MessageStream<KV<String, GenericRecord>> pvs = graph.getInputStream(pvd); 
OutputStream<KV<String, PageViewCount>> pvcs = graph.getOutputStream(pvcd); 

pvs.window(...).map(...).sendTo(pvcs);

Basic requirements

The basic requirements for the new APIs are:

  1. Users should be able to set any Samza properties for their input and output streams using these descriptors (e.g., bootstrap status, input priority, offset management, serdes etc.).
  2. System implementers should be able to provide custom implementations of these descriptors that support additional system-specific properties. For example, a KafkaSystemDescriptor may allow setting Kafka producer and consumer properties. 
    Users should chose the extended descriptor for their system if one is available. Else, they may use the Samza-provided GenericSystemDescriptor/GenericInputDescriptor/GenericOutputDescriptor implementations and provide system specific properties as the appropriate key-value configurations.
  3. Users should be able to use a fluent API for configuring these descriptors.
  4. The types of MessageStream/OutputStream created using these descriptors should match their contents. E.g., for simple descriptors, the type of Message/OutputStream should be determined by the system or stream level Serdes, whether provided by the system as a default, or configured by the user explicitly.
  5. Users should be able to provide a default SystemDescriptor which should be used to configure the properties of the system used for creating intermediate streams in the application. This is equivalent to setting and configuring job.default.system and its properties.

Advanced requirements

We will allow System implementers to extend these Descriptor implementations so that they can provide the following additional features to users:

System Specific Serdes

System implementers should be able to dictate a specific serialization format for messages in their streams. For example, if a system enforces Avro as the message format, its input and output descriptors should have a KV<String, GenericRecord> message type by default. The API will always allow users to provide a Stream level serde for input/output streams that may be different than the System specific serde. System implementors may choose to disallow this by throwing RuntimeException in these methods.

Input Transformers

InputTransformer functions allow System implementers to apply custom transformations to messages in their input streams before they're delivered to the application. The primary use case of this feature is to allow systems to add additional metadata to each incoming message by extending IncomingMessageEnvelope. In this use case, the transformer is provided by the System implementors as a system level default, and is not visible to the user.

The interface is:

InputStreamTransformer
/**
 * Transforms an {@link IncomingMessageEnvelope} with deserialized key and message to a message of type {@code OM}
 * which is delivered to the {@code MessageStream}.
 * <p>
 * May be provided by default by a {@code TransformingSystemDescriptor}, or set on a stream level on an
 * {@code InputStream}.
 *
 * @param <OM> type of the transformed message
 */
public interface InputTransformer<OM> extends InitableFunction, ClosableFunction, Serializable {

 /**
  * Transforms the provided {@link IncomingMessageEnvelope} with deserialized key and message into another message
  * which is delivered to the {@code MessageStream}.
  *
  * @param ime the {@link IncomingMessageEnvelope} to be transformed
  * @return the transformed message
  */
  OM apply(IncomingMessageEnvelope ime);

}


The InputTransformer function is applied at runtime on each IncomingMessageEnvelope after the key and value have been deserialized. For a InputDescriptor with a InputTransformer, the type of messages in their MessageStreams will always be the result type of the InputTransformer function. I.e., providing stream level serdes should not change the type of the MessageStream generated from the descriptor, since the InputTransformer is applied after deserialization and overrides the Serde type.

Users may optionally override the system default transformer or provide one for an input stream themselves. This might be useful, for example, if they need access to some information in the IncomingMessageEnvelope like the offset or the message timestamp.

Stream Expanders

StreamExpander functions allow System implementors to expand a single user-provided input descriptor into multiple input descriptors, apply some custom MessageStream transforms on the expanded input streams thus obtained, and then return a new MessageStream to the application for further processing. The primary use case of this feature is to allow system implementors to resolve a "logical" user provided input stream to multiple underlying "physical" input stream sources at execution planning time and still present a unified logical view to the user for further processing. The interface is:

StreamExpander
/**
 * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamGraph},
 * and returns a new {@link MessageStream} with the combined results. Called when {@link StreamGraph#getInputStream} 
 * is being used to get a {@link MessageStream} using an {@link InputDescriptor} from an 
 * {@link org.apache.samza.operators.descriptors.base.system.ExpandingSystemDescriptor}
 * <p>
 * This is provided by default by {@link org.apache.samza.operators.descriptors.base.system.ExpandingSystemDescriptor}
 * implementations and can not be overridden or set on a per stream level.
 *
 * @param <OM> type of the messages in the resultant {@link MessageStream}
 */
public interface StreamExpander<OM> extends Serializable {

 /**
  * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamGraph},
  * and returns a new {@link MessageStream} with the combined results. Called when the {@link InputDescriptor}
  * is being used to get an {@link MessageStream} using {@link StreamGraph#getInputStream}.
  * <p>
  * Note: Take care to avoid infinite recursion in the implementation; e.g., by ensuring that it doesn't call
  * {@link StreamGraph#getInputStream} with an {@link InputDescriptor} from the an 
  * {@link org.apache.samza.operators.descriptors.base.system.ExpandingSystemDescriptor} (like this one) again.
  *
  * @param streamGraph the {@link StreamGraph} to register the expanded sub-DAG of operators on
  * @param inputDescriptor the {@link InputDescriptor} to be expanded
  * @return the {@link MessageStream} containing the combined results of the sub-DAG of operators
  */
  MessageStream<OM> apply(StreamGraph streamGraph, InputDescriptor inputDescriptor);

}


Unlike InputTransformers, these functions cannot be provided or overridden by users. These functions are executed once in the ApplicationRunner when creating/describing the StreamGraph.

Similar to a SystemDescriptor with a InputTransformer, the type of the messages in their MessageStream (but not OutputStream) obtained from their InputDescriptors will always be the result type of the StreamExpander. Same parameterization rules apply.

Proposed Implementation

Base Classes and Type Bounds

The requirements above impose some constraints on the number and type parameters of the new API classes:

  1. A Fluent API for describing inputs and outputs: This requires each super class method to return an instance of the concrete sub-class it was called on for further chaining. This means that each super class needs to carry the type of its extending class in its type parameters. E.g.:

    Descriptor Sub-type
    public abstract class SystemDescriptor<SystemMessageType, SubClass extends SystemDescriptor<SystemMessageType, SubClass>> 
    public final class GenericSystemDescriptor<SystemMessageType> extends SystemDescriptor<SystemMessageType, GenericSystemDescriptor<SystemMessageType>>
  2. Support both System and Stream level serdes: If users or system implementers set a default serde for a SystemDescriptor, any Input/OutputDescriptors created from it should inherit the default serde type. However Samza also allows users to provide stream level serdes that override the system level serde. If users provide a Stream level serde, the Input/OutputDescriptors should have the same type as the serde. The simplest way to do this is to require stream-level serdes to be provided while creating the Input/OutputDescriptors. 

    BaseSystemDescriptor API
    class SystemDescriptor<SystemMessageType> {
        public InputDescriptor<A> getInputDescriptor(String id) // case 1 
        public InputDescriptor<B> getInputDescriptor(String id, Serde<StreamMessageType> s) // case 2
    } 
  3. Stream level serde should not affect InputDescriptor type when used with StreamTransformer. This is in conflict with (or an exception to) the requirement above. For a InputDescriptor with a InputTransformer, the type of messages in their MessageStreams will always be the result type of the InputTransformer function. I.e., providing system/stream level serdes should not affect the type of the descriptor, since the InputTransformer is applied after deserialization and overrides the Serde type. User provided StreamTransformer overrides System provided default StreamTransformer. 

    class SystemDescriptor<SystemMessageType> {
        ... // 1 and 2 as above, plus:
        public InputDescriptor<C> getInputDescriptor(String id, InputTransformer<StreamTransformerType> t) // case 3 
        public InputDescriptor<D> getInputDescriptor(String id, InputTransformer<StreamTransformerType> t, Serde<StreamMessageType> s) // case 4
    }

    While we will consider the use case of stream-level InputTransformer functions in the API design here, we can choose to not add them to the API until we have concrete use cases. This will simplify the SystemDescriptor so that users have 2 ways of getting an input stream from a system descriptor instead of 4.


  4. System or stream level serde should not affect InputDescriptor type when used with StreamExpander. The type of such InputDescriptors will always be the result type of the StreamExpander function.

The constraints above imply that the desired InputDescriptor types for the methods above are:

  1. For non-transforming System:
    A = SystemMessageType, B = StreamMessageType, C = StreamTransformerType, D = StreamTransformerType
  2. For System with default InputTransformer<SystemTransformerType>:
    A = SystemTransformerType, B = SystemTransformerType, C = StreamTransformerType, D = StreamTransformerType
  3. For System with StreamExpander<SystemExpanderType>:
    A = SystemExpanderType, B = SystemExpanderType, C = SystemExpanderType, D = SystemExpanderType
  4. For System with InputTransformer<SystemTransformerType> and StreamExpander<SystemExpanderType>:
    A = SystemExpanderType, B = SystemExpanderType, C = SystemExpanderType, D = SystemExpanderType

In other words, the type patterns above are:


getISD(id)getISD(id, serde)getISD(id, txfn)getISD(id, txfn, serde)
Non Transforming System

SystemMessageType

StreamMessageTypeStreamTransformerTypeStreamTransformerType
Transforming SystemSystemTransformerTypeSystemTransformerTypeStreamTransformerTypeStreamTransformerType
Expanding SystemSystemExpanderTypeSystemExpanderTypeSystemExpanderTypeSystemExpanderType
Expanding Transforming SystemSystemExpanderTypeSystemExpanderTypeSystemExpanderType

SystemExpanderType

Clearly, there are no single SystemDescriptor class that can satisfy the type constraints for all four cases. In fact, there are 3 different sets of interface method signatures above (Case 1-3).


Note that if we add new features that affects the message type of InputDescriptors, we may need yet another set of getInputStream methods. In general there will have to be 2^n ways of getting an InputDescriptor to support all combinations of features, where n is the number of features where the type of the input stream is affected by the feature. Currently n == 1 if we only support stream level serde overrides, or 2 if we support stream level transform functions as well. This might be a concern for future API evolution, but its not as bad as it appears to be because:

  1. This is only an issue for features that affect the type of messages in the stream. For other features, their properties/functions can be set on the Input/OutputDescriptor after it has been created. E.g., delete committed messages is a new feature that can be used directly with the InputDescriptor since it doesn't affect the type of messages in the stream.
  2. The variations in the ways of getting an InputDescriptor apply to the SystemDescriptor base classes, not the InputDescriptor classes. Also, the new API methods will be in addition to the methods proposed here. This means that when we add these methods for the new feature:
    1. The feature may only be necessary for a specific system implementation, in which case we could introduce a new SystemDescriptor and only add those methods there. All existing code will be unchanged.
    2. If the feature does need to be supported by all existing systems, the new methods would be added to the existing abstract base classes, which could provide a default implementation. Any existing user code that doesn't use the feature will be remain unchanged.

Because of these reasons, we consider this API to be a reasonable tradeoff for providing a type safe fluent descriptor API to our users.

Proposed APIs

To satisfy the requirements above, we propose the following class hierarchy of System and Stream Descriptors.

Base Classes:

We also provide the following generic implementations of System, InputStream and OutputStream Descriptors. If a system doesn't provide a more specific implementation, users can use these to configure any samza properties and provide the system specific properties as a Map<String, String> of configs.

Generic (System Agnostic) Implementations:

These APIs (WIP) for the new descriptors can be found at: https://github.com/prateekm/samza/tree/stream-descriptor/samza-api/src/main/java/org/apache/samza/operators/descriptors

Sample High Level API code (WIP) using these APIs can be found at: https://github.com/prateekm/samza/tree/stream-descriptor/samza-test/src/main/java/org/apache/samza/example

Sample implementations and usage patterns for these APIs (WIP) can be found at: https://github.com/prateekm/samza/tree/stream-descriptor/samza-api/src/test/java/org/apache/samza/operators/descriptors

The new StreamGraph interface with these classes is:

public interface StreamGraph {
  void setDefaultSystem(SystemDescriptor<?, ?> defaultSystemDescriptor);
  <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor);
  <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor);
  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
  StreamGraph withContextManager(ContextManager contextManager);
}


Default System Descriptor and Intermediate Streams

The "default system" for a job, currently specified by setting the "job.default.system" configuration and its properties, is used for creating intermediate streams (for partitionBy), as well as some internal streams like coordinator, checkpoint and store changelog streams (unless their system has been explicitly set using job.coordinator.system, task.checkpoint.system, or job.changelog.system respectively).

We will add a new StreamGraph#setDefaultSystem(SystemDescriptor) API that can be used to configure the default system used by the job. For example, to provide the default system name and the default system serde. Like everything else in the Descriptors API, user should be able to set default system properties in config instead of in code, including the serdes, if they wish to. If they choose to rely on the default system's serde or setting the serde for job.default.system in configs instead of providing one explicitly when using partitionBy, they'll be responsible for figuring out the type of messages in the repartitioned streams and casting them correctly in their application.

We will not provide a way to set intermediate stream properties in code using a StreamDescriptor since we don't expect this to be a common use case. If users need to manually configure them, they'll have to find the intermediate stream name and set its properties in configuration.

This default system will also not be used to create input and output streams. I.e., users will always have to associate an input/output stream explicitly with a system name, either by getting them from a system descriptor, or by providing the name when creating the InputDescriptor using the InputStream#from convenience factory methods.

Serdes For Streams

Every InputDescriptor and OutputDescriptor in the DAG must have a non-null Serde instance associated with it. This serde may be:

  1. Inherited from their SystemDescriptor, if they were created using a SystemDescriptor with a default serde, or
  2. Provided explicitly when creating the stream, either using SystemDescriptor#getInputStream(String, Serde) etc, or using the InputDescriptor#from convenience factory methods.

This implies that, unlike other configurations, input/output stream serdes *must* be specified using descriptors, and cannot be specified in configuration. Users and SystemDescriptors may specify an KVSerde<NoOpSerde, NoOpSerde> if they don't want messages in the system/stream to be deserialized by Samza. This is useful when the System Consumer/Producer handle the serialization themselves.

Serdes are optional for intermediate streams. If users don't provide an explicit serde for MessageStream#partitionBy, and don't set a default system for the application, they must set the serde for job.default.system and its system-level serdes in their configuration instead. If a default system descriptor is set and its serde is used for intermediate streams, it must be a KVSerde.

Stream Configuration Generation

During StreamApplication#init(StreamGraph), users create System and StreamDescriptors, and get input/output streams for them using StreamGraph#getInputStream(StreamDescriptor). The StreamGraph tracks all Input/OutputDescriptors and the default SystemDescriptor used by the application. When the planner runs, it generates configurations for the input output streams in the following way:

Planning inputs: User provided configuration, User provided System and Stream Descriptors.

Step 1: Create an instance of the StreamGraph and the StreamApplication, and call StreamApplication#init(StreamGraph)

Step 2: Collect all the user provided System/StreamDescriptors from the StreamGraph.

Step 3: Merge the user provided configuration (application.cfg) with the Descriptor generated configuration (System/Input/OutputDescriptor#toConfig). Descriptor generated configuration has higher precedence and overrides user provided configuration.

Step 4: Run the planning phase using this merged configuration. The planner internally creates StreamSpec instances based on this merged configuration, mutates these StreamSpecs to set additional inferred stream properties, creates/validates internal streams using them, and then merges StreamSpec generated configuration with previously merged configuration from Step 3. StreamSpec generated configuration has higher precedence.

This is the final configuration that gets sent to the executing containers.

Note: System and Stream descriptor generated configuration have higher precedence than user provided configuration (from cfg2). We will need to offer a mechanism to override these descriptor generated configs in cfg2. This is useful, for example, to support fabric/instance level overrides. This can currently be done by specifying them with a special jobs.job-id.* override scope. For example, the value for config "foo.bar" generated in step 4 can be overridden using the config "jobs.job-id.foo.bar".

Note: In the case where there are "expanded" streams, configs will be generated for the "expanded" streams only, not the initial logical stream.

Default Config Values and Descriptors

Some configurations have default values associated with them. E.g., default value of "streams.stream-id.samza.bootstrap" is false. Similarly, users may get default values from some configurations from another source (e.g., app-def). 

As described above, user provided descriptors are used to generate system and stream configurations during planning. Note that in Step 3 above any configurations generated using the descriptor override any configurations provided by the user.

For these reasons, if the user hasn't explicitly provided a default value for a property in the descriptor, the descriptor will not generate any configuration for the property, not even the default value. In the example above, if the user has not explicitly set bootstrap status for a stream as true or false, the correct behavior is for InputDescriptor#toConfig() to not generate the "streams.stream-id.samza.bootstrap" configuration at all, not to set it to false.

By convention, all such fields are marked as Optional<T> to clarify that they may not have been set by the user.

Compatibility, Deprecation, and Migration Plan

The changes in the StreamGraph interface are backwards incompatible. Specifically, instead of getting an input/output stream using a streamId, users will now have to get an input/output stream using StreamDescriptors.

The migration is as simple as changing:

from: graph.getInputStream(streamId, serde);
to:   graph.getInputStream(GenericInputDescriptor.from(streamId, systemName, serde));

Rest of the code remains the same, and users can migrate to specifying more of their input/output properties using descriptors instead of configuration over time.

  • No labels