Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Create a new SystemDescriptor instance and optionally configure it.
  2. Obtain a new InputDescriptor<I> and OutputDescriptor<O> using the SystemDescriptor above and optionally configure them.
  3. Call StreamGraph#getInputStreamCall StreamApplicationDescriptor#getInputStream(InputDescriptor<I>) to get the MessageStream<I> and StreamGraph#getOutputStreamthe MessageStream<I> and StreamApplicationDescriptor#getOutputStream(OutputDescriptor<O>) to get the OutputStream<O>.
  4. Apply transforms on the input MessageStream and produce the results to OutputStream.

...

Code Block
languagejava
titleStreamExpander
/**
 * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamGraphStreamApplicationDescriptor},
 * and returns a new {@link MessageStream} with the combined results. Called when {@link StreamGraph#getInputStreamStreamApplicationDescriptor#getInputStream} 
 * is being used to get a {@link MessageStream} using an {@link InputDescriptor} from an 
 * {@link org.apache.samza.operators.descriptors.base.system.ExpandingSystemDescriptor}
 * <p>
 * This is provided by default by {@link org.apache.samza.operators.descriptors.base.system.ExpandingSystemDescriptor}
 * implementations and can not be overridden or set on a per stream level.
 *
 * @param <OM> type of the messages in the resultant {@link MessageStream}
 */
public interface StreamExpander<OM> extends Serializable {

 /**
  * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamGraphStreamApplicationDescriptor},
  * and returns a new {@link MessageStream} with the combined results. Called when the {@link InputDescriptor}
  * is being used to get an {@link MessageStream} using {@link StreamGraph#getInputStreamStreamApplicationDescriptor#getInputStream}.
  * <p>
  * Note: Take care to avoid infinite recursion in the implementation; e.g., by ensuring that it doesn't call
  * {@link StreamGraph#getInputStreamStreamApplicationDescriptor#getInputStream} with an {@link InputDescriptor} from the an 
  * {@link org.apache.samza.operators.descriptors.base.system.ExpandingSystemDescriptor} (like this one) again.
  *
  * @param streamGraphStreamApplicationDescriptor the {@link StreamGraphStreamApplicationDescriptor} to register the expanded sub-DAG of operators on
  * @param inputDescriptor the {@link InputDescriptor} to be expanded
  * @return the {@link MessageStream} containing the combined results of the sub-DAG of operators
  */
  MessageStream<OM> apply(StreamGraphStreamApplicationDescriptor streamGraphStreamApplicationDescriptor, InputDescriptor inputDescriptor);

}

...

During StreamApplication#init(StreamGraphStreamApplicationDescriptor), users create System and StreamDescriptors, and get input/output streams for them using StreamApplicationDescriptor#getInputStream(StreamDescriptor). The StreamApplicationDescriptor tracks all Input/OutputDescriptors and the default SystemDescriptor used by the application. When the planner runs, it generates configurations for the input output streams in the following way:

...

Step 2: Collect all the user provided System/StreamDescriptors from the StreamGraphStreamApplicationDescriptor.

Step 3: Merge the user provided configuration with the Descriptor generated configuration.

...

Compatibility, Deprecation, and Migration Plan

The changes in the StreamGraph StreamApplicationDescriptor interface are backwards incompatible. Specifically, instead of getting an input/output stream using a streamId, users will now have to get an input/output stream using SystemDescriptors and StreamDescriptors.

...