Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

...

...


JIRA

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-27919

...

Release1.17


Motivation

FLIP-27 sources are non-trivial to implement. At the same time, it is frequently required to generate arbitrary events with a "mock" source. Such requirement arises both for Flink users, in the scope of demo/PoC projects, and for Flink developers when writing tests. The go-to solution for these purposes so far was using pre-FLIP-27 APIs and implementing data generators as SourceFunctions
While the new FLIP-27 Source interface introduces important additional functionality, it comes with significant complexity that presents a hurdle for Flink users for implementing drop-in replacements of the SourceFunction-based data generators.  Meanwhile, SourceFunction is effectively superseded by the Source interface and needs to be eventually deprecated. To fill this gap, this FLIP proposes the introduction of a generic data generator source based on the FLIP-27 API

Since it is frequently required to control the rate at which generated events are produced, this FLIP also expands the basic events generation functionality with native support for rate limiting. 

Public Interfaces

A new class with the following API will be introduced. Under the hood it, wraps, and delegates to the NumberSequenceSource utilities. 

Code Block
languagejava
titleDataGeneratorSource
package org.apache.flink.api.connector.source.lib;

/**
 * A data source that produces generators N events of an arbitrary type in parallel. 
 * This source is useful for
 * testing and for cases that just need a stream of N events of any kind.
 *
 * <p>The source splits the sequence into as many parallel sub-sequences as there are parallel
 * source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
 * limited to one, this will produce one sequence in order.
 *
 * <p>This source is always bounded. For very long sequences user may want to consider executing 
 * the application in a streaming manner, because, despite the fact that the produced stream is bounded, 
 * the end bound is pretty far away.
 */

@Public
public class DataGeneratorSource<T>
 DataGeneratorSource<OUT>                 
		implements Source<
                       implements Source<T, GeneratorSequenceSplit<T>, Collection<GeneratorSequenceSplit<T>>>OUT,
                        NumberSequenceSource.NumberSequenceSplit,
                        Collection<NumberSequenceSource.NumberSequenceSplit>>,
                ResultTypeQueryable<T>ResultTypeQueryable<OUT> {    

    
     /**
     * Creates a new {@code DataGeneratorSource} that produces @{@codecode count} records in
     * parallel.
     *
     * @param generatorFunction The factory for instantiating the generator function readers of 
     *        type SourceReader<OUT, NumberSequenceSplit>.
     * @param count The count number of events to be produced.
     * @param typeInfo The type infoinformation of the returned events.
     */ 
    public DataGeneratorSource(
            MapFunction<LongSourceReaderFactory<OUT, T>NumberSequenceSplit> generatorFunction,sourceReaderFactory,
            long count, TypeInformation<T>
            TypeInformation<OUT> typeInfo) {
        	...this.sourceReaderFactory = checkNotNull(sourceReaderFactory);
        this.typeInfo = checkNotNull(typeInfo);
        this.numberSource = new NumberSequenceSource(0, count);
    }


Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

     /**
     * Creates a new {@code DataGeneratorSource} that produces @{code count} records in
     * parallel.
     *
     * @param generatorFunction The generator function that receives index numbers and translates
     *     them into events of the output type.
     * @param count The number of events to be produced.
     * @param typeInfo The type information of the returned events.
     */
    public DataGeneratorSource(
            GeneratorFunction<Long, OUT> generatorFunction, long count, TypeInformation<OUT> typeInfo) {...}
    }
    

     /**
     * Creates a new {@code DataGeneratorSource} that produces @{code count} records in
     * parallel.
     *
     * @param generatorFunction The generator function that receives index numbers and translates
     *     them into events of the output type.
     * @param count The number of events to be produced.
     * @param sourceRatePerSecond The maximum number of events per seconds that this generator aims
     *     to produce. This is a target number for the whole source and the individual parallel
     *     source instances automatically adjust their rate taking based on the {@code
     *     sourceRatePerSecond} and the source parallelism.
     * @param typeInfo The type information of the returned events.
     */     
    public DataGeneratorSource(
            GeneratorFunction<Long, OUT> generatorFunction,
            long count,
            double sourceRatePerSecond,
            TypeInformation<OUT> typeInfo) {...}


Where GeneratorFunction supports initialization of class fields via the open() method with access to the local SourceReaderContext.

Code Block
languagejava
titleGeneratorFunction
@Public
public interface GeneratorFunction<T, O> extends Function {

    /**
     * Initialization method for the function. It is called once before the actual working process
     * methods.
     */
    default void open(SourceReaderContext readerContext) throws Exception {}

    /** Tear-down method for the function. */
    default void close() throws Exception {}

    O map(T value) throws Exception;
}


A new SourceReaderFactory interface is introduced.

Code Block
languagejava
titleSourceReaderFactory
public interface SourceReaderFactory<OUT, SplitT extends SourceSplit> extends Serializable {
    SourceReader<OUT, SplitT> newSourceReader(SourceReaderContext readerContext);
}

The generator source delegates the SourceReaders' creation to the factory.

Code Block
languagejava
titleDataGeneratorSource
@Public
public class DataGeneratorSource<OUT>                 
		implements Source<
                        OUT,
                        NumberSequenceSource.NumberSequenceSplit,
                        Collection<NumberSequenceSource.NumberSequenceSplit>>,
                ResultTypeQueryable<OUT> {   

    private final SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory;
 
    @Override
    public SourceReader<OUT, NumberSequenceSplit> createReader(SourceReaderContext readerContext)
            throws Exception {
        return sourceReaderFactory.newSourceReader(readerContext);
    }
}


Proposed Changes

In order to deliver convenient rate-limiting functionality to the users of the new API, a small addition to the SourceReaderContext is required.

The sum of rates of all parallel readers has to approximate the optional user-defined sourceRatePerSecond parameter. Currently, there is no way for the SourceReaders to acquire the current parallelism of the job they are part of. To overcome this limitation, this FLIP proposes an extension of the SourceReaderContext interface with the currentParallelism() method:


Code Block
languagejava
titleSourceReaderContext
package org.apache.flink.api.connector.source;

/** The class that expose some context from runtime to the {@link SourceReader}. */
@Public
public interface SourceReaderContext {
	...         
	/**
     * Get the current parallelism of this Source.
     *
     * @return the parallelism of the Source.
     */
    int currentParallelism(); 
}


With the parallelism accessible via SourceReaderContext, initialization of the rate-limiting data generating readers can be taken care of by the SourceReaderFactories. For example:


Code Block
languagejava
titleGeneratorSourceReaderFactory
public class GeneratorSourceReaderFactory<OUT>
        implements SourceReaderFactory<OUT, NumberSequenceSource.NumberSequenceSplit> {

    public GeneratorSourceReaderFactory(
            GeneratorFunction<Long, OUT> generatorFunction, long sourceRatePerSecond){...}

    @Override
    public SourceReader<OUT, NumberSequenceSource.NumberSequenceSplit> newSourceReader(
            SourceReaderContext readerContext) {
        if (sourceRatePerSecond > 0) {
            int parallelism = readerContext.currentParallelism();
            RateLimiter rateLimiter = new GuavaRateLimiter(sourceRatePerSecond, parallelism);
            return new RateLimitedSourceReader<>(
                    new GeneratingIteratorSourceReader<>(readerContext, generatorFunction),
                    rateLimiter);
        } else {
            return new GeneratingIteratorSourceReader<>(readerContext, generatorFunction);
        }
    }
}


Where RateLimiter

Code Block
languagejava
titleRateLimiter
/** The interface that can be used to throttle execution of methods. */
interface RateLimiter extends Serializable {

    /**
     * Acquire method is a blocking call that is intended to be used in places where it is required
     * to limit the rate at which results are produced or other functions are called.
     *
     * @return The number of milliseconds this call blocked its caller.
     * @throws InterruptedException The interrupted exception.
     */
    int acquire() throws InterruptedException;
}

---

It is desirable to reuse the functionality of IteratorSourceReader for cases where the input data type is different from the output (IN: Long from the wrapped NumberSequenceSplit, OUT: the result of applying GeneratorFunction<Long, OUT>  provided by the user). For that purpose, the following changes are proposed:

  • New IteratorSourceReaderBase is introduced parameterized with both in and out data types generics.
  • All methods apart from pollNext() from the IteratorSourceReader are "pulled-up" to the *Base class
  • IteratorSourceReader API remains the same while implementing IteratorSourceReaderBase where input and output types are the same
  • New GeneratingIteratorSourceReader is introduced where input and output types are different (the result of applying GeneratorFunction)
  • GeneratingIteratorSourceReader initializes the GeneratorFunction (if needed), by calling open() method within its start() method.

Code Block
languagejava
titleIteratorSourceReaderBase
package org.apache.flink.api.connector.source.lib.util;

@Experimental
abstract class IteratorSourceReaderBase<
                E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        implements SourceReader<O, SplitT> {...}


Reader:

Code Block
languagejava
titleIteratorSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Public
public class IteratorSourceReader<
                E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        extends IteratorSourceReaderBase<E, E, IterT, SplitT> {

    public IteratorSourceReader(SourceReaderContext context) {
        super(context);
    }

    @Override
    public InputStatus pollNext(ReaderOutput<E> output) {...}

}


Code Block
languagejava
titleGeneratingIteratorSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Experimental
public class GeneratingIteratorSourceReader<
                E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        extends IteratorSourceReaderBase<E, O, IterT, SplitT> {

    public GeneratingIteratorSourceReader(
            SourceReaderContext context, GeneratorFunction<E, O> generatorFunction) {...} 

    @Override
    public InputStatus pollNext(ReaderOutput<O> output)  {...} 

    
}


RateLimitedSourceReader wraps another SourceReader (delegates to its methods) while rate-limiting the pollNext() calls.

Code Block
languagejava
titleRateLimitedSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Experimental
class RateLimitedSourceReader<E, SplitT extends SourceSplit>
        implements SourceReader<E, SplitT> {

    private final SourceReader<E, SplitT> sourceReader;
    private final RateLimiter rateLimiter;

    public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter rateLimiter) {
        this.sourceReader = sourceReader;
        this.rateLimiter = rateLimiter;
    }

    @Override
    public void start() {
        sourceReader.start();
    }

    @Override
    public InputStatus pollNext(ReaderOutput<E> output) throws Exception {
        rateLimiter.acquire();
        return sourceReader.pollNext(output);
    }
  ...
}

Usage: 

The envisioned usage for functions that do not contain any class fields that need initialization looks like this:

Code Block
languagejava
titleusage
int count = 1000;
int sourceRatePerSecond = 2;
GeneratorFunction<Long, String> generator = index -> "Event from index: " + index;
DataGeneratorSource<String> source = new DataGeneratorSource<>(generator, count, sourceRatePerSecond, Types.STRING);
DataStreamSource<String> watermarked =
                  env.fromSource(
                        source,
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),
                        "watermarked");

Scenarios, where GeneratorFunction requires initialization of non-serializable fields, is supported as follows:

Code Block
languagejava
titleusage
GeneratorFunction<Long, String> generator =
     new GeneratorFunction<Long, String>() {

      transient SourceReaderMetricGroup sourceReaderMetricGroup;

      @Override
      public void open(SourceReaderContext readerContext) {
      	  sourceReaderMetricGroup = readerContext.metricGroup();
      }

      @Override
      public String map(Long value) {
          return "Generated: >> "
                 + value.toString()
                 + "; local metric group: "
                 + sourceReaderMetricGroup.hashCode();
            }
       };
DataGeneratorSource<String> source = new DataGeneratorSource<>(generator, count, sourceRatePerSecond, Types.STRING);

Remarks:

  • It is up for discussion an addition of a utility method to StreamExecutionEnvironment with default watermarking might also be desirable (similar to env.fromSequence(long from, long to) ).
  • To be able to reuse the existing functionality of NumberSequenceSource it is required to change the visibility of NumberSequenceSource.CheckpointSerializer from private to package-private. 

Compatibility, Deprecation, and Migration Plan

This feature is a stepping stone toward deprecating the SourceFunction API (see this discussion). 

  1. After this feature is introduced, it will be documented and promoted as the recommended way to write data generators.
  2. A list of Flink tests that currently use the SourceFunction API will be compiled and follow-up tickets for migration will be created.

Test Plan

  • Unit tests will be added to verify the behavior of Source's Splits in relation to the SourceReader
  • Integration tests will be added to verify correct functioning with different levels of parallelism