Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

FLIP-27 sources are non-trivial to implement. At the same time, it is frequently required to generate arbitrary events with a "mock" source. Such requirement arises both for Flink users, in the scope of demo/PoC projects, and for Flink developers when writing tests. The go-to solution for these purposes so far was using pre-FLIP-27 APIs and implementing data generators as SourceFunctions
While the new FLIP-27 Source interface introduces important additional functionality, it comes with significant complexity that presents a hurdle for Flink users for implementing drop-in replacements of the SourceFunction-based data generators.  Meanwhile, SourceFunction is effectively superseded by the Source interface and needs to be eventually deprecated. To fill this gap, this FLIP proposes the introduction of a generic data generator source based on FLIP-27 API.

Public Interfaces

The following new class with the following API will be introduced. Under the hood, it can wrap and delegate to the NumberSequenceSource utilities.

Code Block
languagejava
titleDataGeneratorSource
package org.apache.flink.api.connector.source.lib;

/**
 * A data source that produces generators N events of an arbitrary type in parallel. This source is useful for
 * testing and for cases that just need a stream of N events of any kind.
 *
 * <p>The source splits the sequence into as many parallel sub-sequences as there are parallel
 * source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
 * limited to one, this will produce one sequence in order.
 *
 * <p>This source is always bounded. For very long sequences user may want to consider executing 
 * the application in a streaming manner, because, despite the fact that the produced stream is bounded, 
 * the end bound is pretty far away.
 */

@Public
public class DataGeneratorSource<OUT>
        implements Source<OUT, GeneratorSequenceSplit<OUT>, Collection<GeneratorSequenceSplit<OUT>>>,
                ResultTypeQueryable<OUT> {


    /**
     * Creates a new {@code DataGeneratorSource} that produces {@code count} records in
     * parallel.
     *
     * @param generatorFunction The generator function that receives index numbers and 
	 *                          translates them into events of the output type.
     * @param count The number of events to be produced.
     * @param typeInfo The type infoinformation of the returned events.
     */
    public DataGeneratorSource(
            MapFunction<Long, OUT> generatorFunction, long count, TypeInformation<OUT> typeInfo) {
     ...
	}

Proposed Changes

This FLIP introduces a new DataGeneratorSource class. The envisioned usage looks like this:

Code Block
languagejava
titleusage
MapFunction<Long, String> generator = index -> ">>> " + index;
DataGeneratorSource<String> source = new DataGeneratorSource<>(generator, 10, Types.STRING);
DataStreamSource<String> watermarked =
                  env.fromSource(
                        source,
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),
                        "watermarked");





It is up for discussion if an additional utility method of StreamExecutionEnvironment with default watermarking might also be desirable (similar to env.fromSequence(long from, long to) )Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

This feature is a stepping stone toward deprecating the SourceFunction API (see this discussion). 

  1. After this feature is introduced, it will be documented and promoted as the recommended way to write data generators.
  2. A list of Flink tests that currently use the SourceFunction API will be compiled and follow-up tickets for migration will be created.

Test Plan

  • Unit tests will be added to verify the behavior of Source's Splits in relation to the SourceReader
  • Integration tests will be added to verify correct functioning with different levels of parallelism

Rejected Alternatives

It is possible to use a NumberSequenceSource followed by a map function to achieve similar results, however, this has two disadvantages:

  • It introduces another level of indirection and is less intuitive to use
  • It does not allow to promote best practices of assigning watermarks (see this discussion)