Status

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/ksv71m7rvcwslonw07h2qnw77zpqozvh
Vote threadhttps://lists.apache.org/thread/l7s3fjn0x855y6lyj9bcqwz2f2pb3mgw
JIRA

Unable to render Jira issues macro, execution error.

Release1.16

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The existing DataStream#executeAndCollect family of methods provide the means to feed back the contents of a stream to the client. This is particularly useful for testing workflows, since it allows you to easily validate the output without having to write it to a file/DB and reading it back.

It is however limited to collecting a single stream and forces the immediate execution of the job. This makes it less useful for testing applications that have multiple sinks, and would currently required a user splitting up their application into N different applications to test all paths from sources to sinks.

This is just an API limitation that; the goal is to slightly extend the API to support collecting an arbitrary number of streams.

Public Interfaces

The DataStream class will be extended with 2 variants of a collectAsync method, that essentially return a CloseableIterator over the elements in the stream.

/**
* Sets up the collection of the elements in the given {@link DataStream}, and returns an
* iterator over the collected elements that can be used to retrieve elements once the job
* execution has started.
*
* @return iterator over the contained elements
*/
@Experimental
public CloseableIterator<T> collectAsync();

/**
* Sets up the collection of the elements in the given {@link DataStream}, which can be
* retrieved later via the given {@link Collector}.
*
* @param collector a collector that can be used to retrieve the elements
*/
@Experimental
public void collectAsync(Collector<T> collector);

Proposed Changes

In practice the proposal is to split executeAndCollect into 2 distinct steps; one for setting up the collect sink and returning an iterator to the user (that can be used later), another one for triggering the job and setting the job client.

Step 1

Covered by the added methods to the DataStream:

The methods do the same thing, they just differ in their usage pattern.

CloseableIterator<T> collectAsync()

This is the obvious variant, meant for cases where operations are directly applied on a given DataStream without any indirection. This will generally cover production usages or unit tests.

void collectAsync(Collector<T> collector)

This variant is meant for cases where users have organized their production code in such a way that the production/test code reuse the workflow, while having sources/sinks be parameters that are passed in. Since the collect sink requires the preceding DataStream, a common pattern will be the use of a Consumer<DataStream<?>> to inject the sink (see below). In this case the first variant would be exceedingly annoying to use, as you'd have to store the returned iterator in an atomic reference to get it out of the lambda function.

static void defineWorkflow(Source<Event> source, Consumer<DataStream<Event>> sinkFactory)


The newly added Collector class serves as an accessor for the elements.

/**
 * 	This class acts as an accessor to elements collected via {@link #collectAsync(Collector)}.
 *
 * @param <T> the element type
 */
@Experimental
public static class Collector<T> {
    private CloseableIterator<T> iterator;

    @Internal
    void setIterator(CloseableIterator<T> iterator) {
        this.iterator = iterator;
    }

    /**
     * Returns an iterator over the collected elements. The returned iterator must only be used
     * once the job execution was triggered.
     *
     * <p>This method will always return the same iterator instance.
     *
     * @return iterator over collected elements
     */
    public CloseableIterator<T> getOutput() {
        Preconditions.checkNotNull(iterator, "The job execution was not yet started.");
        return iterator;
    }
}

Step 2g

The following will be added to the StreamExecutionEnvironment:

     private static final List<CollectResultIterator<?>> collectIterators = new ArrayList<>();

    @Internal
    public void registerCollectIterator(CollectResultIterator<?> iterator) {
        collectIterators.add(iterator);
    }

This method will be called from DataStream#collectAsync, and will allow us to automatically set the job client for each CollectResultIterator within executeAsync.


Not further changes are necessary since this is purely an API construct; the underlying functionality already exists.

Compatibility, Deprecation, and Migration Plan

This is purely an API extension around existing functionality. Current usages will not be impacted.

Test Plan

The underlying functionality is already tested. We'll only require a few unit tests for the newly added methods.

Rejected Alternatives