Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.



Page properties

Discussion thread



serverASF JIRA




FLIP-27 sources are non-trivial to implement. At the same time, it is frequently required to generate arbitrary events with a "mock" source. Such requirement arises both for Flink users, in the scope of demo/PoC projects, and for Flink developers when writing tests. The go-to solution for these purposes so far was using pre-FLIP-27 APIs and implementing data generators as SourceFunctions
While the new FLIP-27 Source interface introduces important additional functionality, it comes with significant complexity that presents a hurdle for Flink users for implementing drop-in replacements of the SourceFunction-based data generators.  Meanwhile, SourceFunction is effectively superseded by the Source interface and needs to be eventually deprecated. To fill this gap, this FLIP proposes the introduction of a generic data generator source based on the FLIP-27 API. 


Code Block
package org.apache.flink.api.connector.source.lib;

 * A data source that produces generators N events of an arbitrary type in parallel. 
 * This source is useful for testing and for cases that just need a stream of N events of any kind.
 * <p>The source splits the sequence into as many parallel sub-sequences as there are parallel
 * source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
 * limited to one, this will produce one sequence in order.
 * <p>This source is always bounded. For very long sequences user may want to consider executing 
 * the application in a streaming manner, because, despite the fact that the produced stream is bounded, 
 * the end bound is pretty far away.

public class DataGeneratorSource<OUT>                 
		implements Source<
                ResultTypeQueryable<OUT> {    

     * Creates a new {@code DataGeneratorSource} that produces @{code count} records in
     * parallel.
     * @param generatorFunction The factory for instantiating the readers of 
     *        type SourceReader<OUT, NumberSequenceSplit>.
     * @param count The number of events to be produced.
     * @param typeInfo The type information of the returned events.
    public DataGeneratorSource(
            SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
            long count,
            TypeInformation<OUT> typeInfo) {
        this.sourceReaderFactory = checkNotNull(sourceReaderFactory);
        this.typeInfo = checkNotNull(typeInfo);
        this.numberSource = new NumberSequenceSource(0, count);

     * Creates a new {@code DataGeneratorSource} that produces @{code count} records in
     * parallel.
     * @param generatorFunction The generator function that receives index numbers and translates
     *     them into events of the output type.
     * @param count The number of events to be produced.
     * @param typeInfo The type information of the returned events.
    public DataGeneratorSource(
            GeneratorFunction<Long, OUT> generatorFunction, long count, TypeInformation<OUT> typeInfo) {...}

     * Creates a new {@code DataGeneratorSource} that produces @{code count} records in
     * parallel.
     * @param generatorFunction The generator function that receives index numbers and translates
     *     them into events of the output type.
     * @param count The number of events to be produced.
     * @param sourceRatePerSecond The maximum number of events per seconds that this generator aims
     *     to produce. This is a target number for the whole source and the individual parallel
     *     source instances automatically adjust their rate taking based on the {@code
     *     sourceRatePerSecond} and the source parallelism.
     * @param typeInfo The type information of the returned events.
    public DataGeneratorSource(
            GeneratorFunction<Long, OUT> generatorFunction,
            long count,
            longdouble sourceRatePerSecond,
            TypeInformation<OUT> typeInfo) {...}


Code Block

/** The interface that can be used to throttle execution of methods. */
public interface RateLimiter extends Serializable {

     * Acquire method is a blocking call that is intended to be used in places where it is required
     * to limit the rate at which results are produced or other functions are called.
     * @return The number of milliseconds this call blocked its caller.
     * @throws InterruptedException The interrupted exception.
    int acquire() throws InterruptedException;


Code Block
package org.apache.flink.api.connector.source.lib.util;

public class GeneratingIteratorSourceReader<
                E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        extends IteratorSourceReaderBase<E, O, IterT, SplitT> {

    public GeneratingIteratorSourceReader(
            SourceReaderContext context, GeneratorFunction<E, O> generatorFunction) {...} 

    public InputStatus pollNext(ReaderOutput<O> output)  {...} 



Code Block
package org.apache.flink.api.connector.source.lib.util;

public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
        implements SourceReader<E, SplitT> {

    private final SourceReader<E, SplitT> sourceReader;
    private final RateLimiter rateLimiter;

    public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter rateLimiter) {
        this.sourceReader = sourceReader;
        this.rateLimiter = rateLimiter;

    public void start() {

    public InputStatus pollNext(ReaderOutput<E> output) throws Exception {
        return sourceReader.pollNext(output);


  • Unit tests will be added to verify the behavior of Source's Splits in relation to the SourceReader
  • Integration tests will be added to verify correct functioning with different levels of parallelism

POC Branch:
