Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For DataStream API V1, It directly creates the underlying implementation class of execution environment once user want to get it, which makes jobs must depend on non-APIs part. In DataStream API V2, we hope that user jobs only depend on a pure API module. Therefore, we create the specific implementation of environment through reflection.

Code Block
languagejava
titleExecutionEnvironment.java
/**
 * The ExecutionEnvironment is the context in which a program is executed.
 *
 * <p>The environment provides methods to create a DataStream and control the job execution.
 */
public interface ExecutionEnvironment {
    /**
     * Get the execution environment instance.
     *
     * @return A {@link ExecutionEnvironment} instance.
     */
    static ExecutionEnvironment getExecutionEnvironment() throws ReflectiveOperationException {
          // return the enviroment instance by reflection.
    }

    /**
     * Create and attach a data stream with the specific source to this environment.
     *
     * @param source of the data stream.
     * @param watermarkStrategy of this source.
     * @param sourceName, the name of this source.
     * @return A data stream with the specific source.
     */
    <OUT> NonKeyedPartitionStream<OUT> fromSource(
      Source<OUT, ?, ?> source,
      WatermarkStrategy<OUT> watermarkStrategy,
      String sourceName
    );

    /** Execute and submit the job attached to this environment. */
    void execute() throws Exception;
}

Currently, we only support adding FLIP-27 based source. The stream returned from `fromSource` method is Non-KeyedPartitionStream by default. If there is a clear key selecting strategy, the keyBy partitioning can be followed later. The connector part will be explained in more detail in future FLIP.

...

Process function is used to describe the processing logic of data. It is the key part for users to implement their job. Overall, we have a base interface for all user defined process functions that contains some life cycle methods, such as open and close. In addition, it also contains some common methods related to state and watermark, but we omit these methods here for simplicity, and we will introduce it in the corresponding sub-FLIPs. 

Code Block
languagejava
titleProcessFunction.java
/** This is the base class for all user defined process functions. */
public interface ProcessFunction extends Function {
    /**
     * Initialization method for the function. It is called before the actual working methods (like
     * processRecord) and thus suitable for one time setup work.
     *
     * <p>By default, this method does nothing.
     *
     * @throws Exception Implementations may forward exceptions, which are caught by the runtime.
     *     When the runtime catches an exception, it aborts the task and lets the fail-over logic
     *     decide whether to retry the task execution.
     */
    default void open() throws Exception {}

    /**
     * Tear-down method for the user code. It is called after the last call to the main working
     * methods (e.g. processRecord).
     *
     * <p>This method can be used for clean up work.
     *
     * @throws Exception Implementations may forward exceptions, which are caught by the runtime.
     *     When the runtime catches an exception, it aborts the task and lets the fail-over logic
     *     decide whether to retry the task execution.
     */
    default void close() throws Exception {}

    // Omit some methods related to state and watermark here.
}

Collector

Before introducing the specific process function, we need to introduce the Collector interface first, which is responsible for collecting processed data.

Code Block
languagejava
titleCollector.java
/** This class take response for collecting data to output stream. */
public interface Collector<OUT> {
    /**
     * Collect record to output stream.
     *
     * @param record to be collected.
     */
    void collect(OUT record);

    /**
     * Collect record to output stream.
     *
     * @param record to be collected.
     * @param timestamp of the processed data.
     */
    void collect(OUT record, long timestamp);
}

OneInputStreamProcessFunction

...