Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-27919

...

Release1.17


Motivation

FLIP-27 sources are non-trivial to implement. At the same time, it is frequently required to generate arbitrary events with a "mock" source. Such requirement arises both for Flink users, in the scope of demo/PoC projects, and for Flink developers when writing tests. The go-to solution for these purposes so far was using pre-FLIP-27 APIs and implementing data generators as SourceFunctions
While the new FLIP-27 Source interface introduces important additional functionality, it comes with significant complexity that presents a hurdle for Flink users for implementing drop-in replacements of the SourceFunction-based data generators.  Meanwhile, SourceFunction is effectively superseded by the Source interface and needs to be eventually deprecated. To fill this gap, this FLIP proposes the introduction of a generic data generator source based on the FLIP-27 API. 

...

Code Block
languagejava
titleDataGeneratorSource
package org.apache.flink.api.connector.source.lib;

/**
 * A data source that produces generators N events of an arbitrary type in parallel. 
 * This source is useful for testing and for cases that just need a stream of N events of any kind.
 *
 * <p>The source splits the sequence into as many parallel sub-sequences as there are parallel
 * source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
 * limited to one, this will produce one sequence in order.
 *
 * <p>This source is always bounded. For very long sequences user may want to consider executing 
 * the application in a streaming manner, because, despite the fact that the produced stream is bounded, 
 * the end bound is pretty far away.
 */

@Public
public class DataGeneratorSource<OUT>                 
		implements Source<
                        OUT,
                        NumberSequenceSource.NumberSequenceSplit,
                        Collection<NumberSequenceSource.NumberSequenceSplit>>,
                ResultTypeQueryable<OUT> {    

    
     /**
     * Creates a new {@code DataGeneratorSource} that produces @{code count} records in
     * parallel.
     *
     * @param generatorFunction The factory for instantiating the readers of 
     *        type SourceReader<OUT, NumberSequenceSplit>.
     * @param count The number of events to be produced.
     * @param typeInfo The type information of the returned events.
     */ 
    public DataGeneratorSource(
            SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
            long count,
            TypeInformation<OUT> typeInfo) {
        this.sourceReaderFactory = checkNotNull(sourceReaderFactory);
        this.typeInfo = checkNotNull(typeInfo);
        this.numberSource = new NumberSequenceSource(0, count);
    }

     /**
     * Creates a new {@code DataGeneratorSource} that produces @{code count} records in
     * parallel.
     *
     * @param generatorFunction The generator function that receives index numbers and translates
     *     them into events of the output type.
     * @param count The number of events to be produced.
     * @param typeInfo The type information of the returned events.
     */
    public DataGeneratorSource(
            GeneratorFunction<Long, OUT> generatorFunction, long count, TypeInformation<OUT> typeInfo) {...}
    }
    

     /**
     * Creates a new {@code DataGeneratorSource} that produces @{code count} records in
     * parallel.
     *
     * @param generatorFunction The generator function that receives index numbers and translates
     *     them into events of the output type.
     * @param count The number of events to be produced.
     * @param sourceRatePerSecond The maximum number of events per seconds that this generator aims
     *     to produce. This is a target number for the whole source and the individual parallel
     *     source instances automatically adjust their rate taking based on the {@code
     *     sourceRatePerSecond} and the source parallelism.
     * @param typeInfo The type information of the returned events.
     */     
    public DataGeneratorSource(
            GeneratorFunction<Long, OUT> generatorFunction,
            long count,
            longdouble sourceRatePerSecond,
            TypeInformation<OUT> typeInfo) {...}

...

Code Block
languagejava
titleRateLimiter
package org.apache.flink.api.common.io.ratelimiting;

/** The interface that can be used to throttle execution of methods. */
public interface RateLimiter extends Serializable {

    /**
     * Acquire method is a blocking call that is intended to be used in places where it is required
     * to limit the rate at which results are produced or other functions are called.
     *
     * @return The number of milliseconds this call blocked its caller.
     * @throws InterruptedException The interrupted exception.
     */
    int acquire() throws InterruptedException;
}

...

Code Block
languagejava
titleMappingIteratorSourceReaderGeneratingIteratorSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Experimental
public class GeneratingIteratorSourceReader<
                E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        extends IteratorSourceReaderBase<E, O, IterT, SplitT> {

    public GeneratingIteratorSourceReader(
            SourceReaderContext context, GeneratorFunction<E, O> generatorFunction) {...} 

    @Override
    public InputStatus pollNext(ReaderOutput<O> output)  {...} 

    
}

...

Code Block
languagejava
titleRateLimitedSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Experimental
public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
        implements SourceReader<E, SplitT> {

    private final SourceReader<E, SplitT> sourceReader;
    private final RateLimiter rateLimiter;

    public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter rateLimiter) {
        this.sourceReader = sourceReader;
        this.rateLimiter = rateLimiter;
    }

    @Override
    public void start() {
        sourceReader.start();
    }

    @Override
    public InputStatus pollNext(ReaderOutput<E> output) throws Exception {
        rateLimiter.acquire();
        return sourceReader.pollNext(output);
    }
  ...
}

...

  • Unit tests will be added to verify the behavior of Source's Splits in relation to the SourceReader
  • Integration tests will be added to verify correct functioning with different levels of parallelism

POC Branch:

https://github.com/apache/flink/compare/master...afedulov:flink:FLINK-27919-generator-source

...