Status
...
...
...
| Jira |
---|
server | ASF JIRA |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | FLINK-27919 |
---|
|
|
---|
|
...
Motivation
FLIP-27 sources are non-trivial to implement. At the same time, it is frequently required to generate arbitrary events with a "mock" source. Such requirement arises both for Flink users, in the scope of demo/PoC projects, and for Flink developers when writing tests. The go-to solution for these purposes so far was using pre-FLIP-27 APIs and implementing data generators as SourceFunctions
.
While the new FLIP-27 Source
interface introduces important additional functionality, it comes with significant complexity that presents a hurdle for Flink users for implementing drop-in replacements of the SourceFunction
-based data generators. Meanwhile, SourceFunction
is effectively superseded by the Source
interface and needs to be eventually deprecated. To fill this gap, this FLIP proposes the introduction of a generic data generator source based on the FLIP-27 API.
...
Code Block |
---|
language | java |
---|
title | DataGeneratorSource |
---|
|
package org.apache.flink.api.connector.source.lib;
/**
* A data source that produces generators N events of an arbitrary type in parallel.
* This source is useful for testing and for cases that just need a stream of N events of any kind.
*
* <p>The source splits the sequence into as many parallel sub-sequences as there are parallel
* source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
* limited to one, this will produce one sequence in order.
*
* <p>This source is always bounded. For very long sequences user may want to consider executing
* the application in a streaming manner, because, despite the fact that the produced stream is bounded,
* the end bound is pretty far away.
*/
@Public
public class DataGeneratorSource<OUT>
implements Source<
OUT,
NumberSequenceSource.NumberSequenceSplit,
Collection<NumberSequenceSource.NumberSequenceSplit>>,
ResultTypeQueryable<OUT> {
/**
* Creates a new {@code DataGeneratorSource} that produces <code>count</code>@{code count} records in
* parallel.
*
* @param generatorFunction The generatorfactory functionfor thatinstantiating receivesthe indexreaders numbers andof translates
* them into events oftype the output typeSourceReader<OUT, NumberSequenceSplit>.
* @param count The number of events to be produced.
* @param typeInfo The type information of the returned events.
*/
public DataGeneratorSource(
MapFunction<LongSourceReaderFactory<OUT, OUT>NumberSequenceSplit> generatorFunctionsourceReaderFactory,
long count, TypeInformation<OUT> typeInfo) {...}
}
long count,
/**
* Creates aTypeInformation<OUT> newtypeInfo) {@code
DataGeneratorSource} that produces <code>count</code> records in
this.sourceReaderFactory * parallel.
= checkNotNull(sourceReaderFactory);
this.typeInfo = *checkNotNull(typeInfo);
* @param generatorFunction Thethis.numberSource generator= function that receives index numbers and translatesnew NumberSequenceSource(0, count);
}
/**
them* intoCreates eventsa ofnew the output type.
* @param count The number of events to be produced.{@code DataGeneratorSource} that produces @{code count} records in
* parallel.
*
* @param sourceRatePerSecondgeneratorFunction The maximumgenerator numberfunction ofthat eventsreceives perindex secondsnumbers that this generator aimsand translates
* tothem produce.into Thisevents isof athe targetoutput numbertype.
for the whole source and* the@param individualcount parallel
The number of events to *be produced.
source instances* automatically@param adjusttypeInfo theirThe ratetype takinginformation basedof onthe thereturned {@codeevents.
*/
public sourceRatePerSecond}DataGeneratorSource(
and the source parallelism.
* @param typeInfoGeneratorFunction<Long, TheOUT> typegeneratorFunction, informationlong ofcount, theTypeInformation<OUT> returned events.typeInfo) {...}
*/}
public DataGeneratorSource(/**
* Creates a new {@code DataGeneratorSource} that MapFunction<Long, OUT> generatorFunction,
produces @{code count} records in
* parallel.
*
long count,
* @param generatorFunction The generator function that receives index numbers longand sourceRatePerSecond,translates
* them into events of TypeInformation<OUT>the typeInfo) {...}
|
Proposed Changes
The sum of rates of all parallel readers has to approximate the optional user-defined sourceRatePerSecond parameter. Currently, there is no way for the SourceReaders to acquire the current parallelism of the job they are part of. In order to overcome this limitation, this FLIP proposes an extension of the SourceReaderContext interface with the getRuntimeContext method:
Code Block |
---|
language | java |
---|
title | SourceReaderContext |
---|
|
package org.apache.flink.api.connector.source;
/** The class that expose some context from runtime to the {@link SourceReader}. */
@Public
public interface SourceReaderContext {
...
/**
* Gets the context that contains information about the readers runtime, such as the parallelismoutput type.
* @param count The number of events to be produced.
* @param sourceRatePerSecond The maximum number of events per seconds that this generator aims
* to produce. This is a target number for the whole source and the individual parallel
* source instances automatically adjust their rate taking based on the {@code
* of the source.
sourceRatePerSecond} and the *source parallelism.
* @param @returntypeInfo The runtimetype contextinformation of the sourcereturned readerevents.
*/
RuntimeContextpublic getRuntimeContextDataGeneratorSource();
} |
The respective RuntimeContext is available in the SourceOperator and can be easily provisioned during the anonymous SourceReaderContext initialization in its initReader() method.
With the runtime context accessible via SourceReaderContext, initialization of the data generating readers based on the user-provided generatorFunction could look as follows:
Code Block |
---|
language | java |
---|
title | DataGeneratorSource#createrReader() |
---|
|
@Override
public SourceReader<OUT, NumberSequenceSplit> createReader(SourceReaderContext readerContext)
GeneratorFunction<Long, OUT> generatorFunction,
long count,
double throwssourceRatePerSecond,
Exception {
if (maxPerSecond >TypeInformation<OUT> 0typeInfo) {
int parallelism = readerContext.getRuntimeContext().getNumberOfParallelSubtasks();...}
|
Where GeneratorFunction supports initialization of class fields via the open() method with access to the local SourceReaderContext.
Code Block |
---|
language | java |
---|
title | GeneratorFunction |
---|
|
@Public
public interface GeneratorFunction<T, O> extends Function {
/**
* Initialization method for the function. It is called RateLimiteronce rateLimiterbefore =the new GuavaRateLimiter(maxPerSecond, parallelism);actual working process
* methods.
*/
return new RateLimitedSourceReader<>(
default void open(SourceReaderContext readerContext) throws Exception {}
/** Tear-down method for the function. */
new MappingIteratorSourceReader<>(readerContext, generatorFunction),
default void close() throws Exception {}
O map(T value) throws Exception;
} |
A new SourceReaderFactory interface is introduced.
Code Block |
---|
language | java |
---|
title | SourceReaderFactory |
---|
|
public interface SourceReaderFactory<OUT, SplitT extends SourceSplit> extends Serializable rateLimiter);{
SourceReader<OUT, SplitT> newSourceReader(SourceReaderContext readerContext);
} |
The generator source delegates the SourceReaders' creation to the factory.
Code Block |
---|
language | java |
---|
title | DataGeneratorSource |
---|
|
@Public
public class DataGeneratorSource<OUT> } else {
return new MappingIteratorSourceReader<>(readerContext, generatorFunction);
implements Source<
}
} |
Where RateLimiter
Code Block |
---|
language | java |
---|
title | RateLimiter |
---|
|
package org.apache.flink.api.common.io.ratelimiting;
/** The interface that can be used to throttle execution of methods. */
public interface RateLimiter extends Serializable {
OUT,
/**
* Acquire method is a blocking call that is intended toNumberSequenceSource.NumberSequenceSplit,
be used in places where it is required
* to limit the rate at which results are produced or other functions are called.Collection<NumberSequenceSource.NumberSequenceSplit>>,
*
* @return The number of milliseconds thisResultTypeQueryable<OUT> call{ blocked its caller.
private final SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory;
@Override
* @throwspublic InterruptedExceptionSourceReader<OUT, TheNumberSequenceSplit> interrupted exception.
createReader(SourceReaderContext readerContext)
*/
int acquire() throws Exception InterruptedException;
} |
---
It is desirable to reuse the functionality of IteratorSourceReader for cases where the input data type is different from the output (IN: Long from the wrapped NumberSequenceSplit, OUT: the result of applying MapFunction<Long, OUT> function provided by the user). For that purpose, the following changes are proposed:
- New IteratorSourceReaderBase is introduced parameterized with both in and out data types generics.
- All methods apart from pollNext() from the IteratorSourceReader are "pulled-up" to the *Base class
- IteratorSourceReader API remains the same while implementing IteratorSourceReaderBase where input and output types are the same
- New MappingIteratorSourceReader is introduced where input and output types are different (result of applying the MapFunction)
Code Block |
---|
language | java |
---|
title | IteratorSourceReaderBase |
---|
|
@Experimental
abstract class IteratorSourceReaderBase<
E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
implements SourceReader<O, SplitT> {...} |
Reader:
{
return sourceReaderFactory.newSourceReader(readerContext);
}
} |
Proposed Changes
In order to deliver convenient rate-limiting functionality to the users of the new API, a small addition to the SourceReaderContext is required.
The sum of rates of all parallel readers has to approximate the optional user-defined sourceRatePerSecond parameter. Currently, there is no way for the SourceReaders to acquire the current parallelism of the job they are part of. To overcome this limitation, this FLIP proposes an extension of the SourceReaderContext interface with the currentParallelism() method:
Code Block |
---|
language | java |
---|
title | SourceReaderContext |
---|
|
package org.apache.flink.api.connector.source;
/** The class that expose some context from runtime to the {@link SourceReader}. */
@Public
public interface SourceReaderContext {
...
/**
* Get the current parallelism of this Source.
*
* @return the parallelism of the Source.
*/
int currentParallelism();
} |
With the parallelism accessible via SourceReaderContext, initialization of the rate-limiting data generating readers can be taken care of by the SourceReaderFactories. For example:
Code Block |
---|
language | java |
---|
title | GeneratorSourceReaderFactory |
---|
|
public class GeneratorSourceReaderFactory<OUT>
implements SourceReaderFactory<OUT, NumberSequenceSource.NumberSequenceSplit> |
Code Block |
---|
language | java |
---|
title | IteratorSourceReader |
---|
|
@Public
public class IteratorSourceReader<
E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
extends IteratorSourceReaderBase<E, E, IterT, SplitT> {
public IteratorSourceReaderGeneratorSourceReaderFactory(SourceReaderContext
context) {
super(context);
GeneratorFunction<Long, OUT> generatorFunction, long sourceRatePerSecond){...}
@Override
public InputStatus pollNext(ReaderOutput<E> output) {...}
} |
Code Block |
---|
language | java |
---|
title | MappingIteratorSourceReader |
---|
|
@Experimental
public class MappingIteratorSourceReader<
SourceReader<OUT, NumberSequenceSource.NumberSequenceSplit> newSourceReader(
SourceReaderContext readerContext) {
E, O,if IterT(sourceRatePerSecond extends> Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
0) {
extends IteratorSourceReaderBase<E,int O,parallelism IterT, SplitT> {
= readerContext.currentParallelism();
public MappingIteratorSourceReader(
RateLimiter rateLimiter = new SourceReaderContext context, MapFunction<E, O> generatorFunction) {...}
GuavaRateLimiter(sourceRatePerSecond, parallelism);
return new RateLimitedSourceReader<>(
@Override
public InputStatusnew pollNextGeneratingIteratorSourceReader<>(ReaderOutput<O>readerContext, outputgeneratorFunction),
{...}
} |
RateLimitedSourceReader wraps another SourceReader (delegates to its methods) while rate-limiting the pollNext() calls.
Code Block |
---|
language | java |
---|
title | RateLimitedSourceReader |
---|
|
@Experimental
public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
implements SourceReader<E, SplitT> {
rateLimiter);
private final} SourceReader<E,else SplitT>{
sourceReader;
private final RateLimiter rateLimiter;
return publicnew RateLimitedSourceReaderGeneratingIteratorSourceReader<>(SourceReader<EreaderContext, SplitT> sourceReader, RateLimiter rateLimiter) {
generatorFunction);
}
}
} |
Where RateLimiter
Code Block |
---|
language | java |
---|
title | RateLimiter |
---|
|
/** The interface that can be used to throttle execution of methods. */
interface RateLimiter extends Serializable {this.sourceReader = sourceReader;
this.rateLimiter = rateLimiter;
}
@Override/**
public void* start() {
sourceReader.start();
}
@OverrideAcquire method is a blocking call that is intended to be used in places where it is required
public InputStatus* pollNext(ReaderOutput<E> output) throws Exception {
rateLimiter.acquire();
to limit the rate at which results are produced or other functions are called.
*
* return sourceReader.pollNext(output);
}
...
} |
Usage:
The envisioned usage looks like this:
@return The number of milliseconds this call blocked its caller.
* @throws InterruptedException The interrupted exception.
*/
int acquire() throws InterruptedException;
} |
---
It is desirable to reuse the functionality of IteratorSourceReader for cases where the input data type is different from the output (IN: Long from the wrapped NumberSequenceSplit, OUT: the result of applying GeneratorFunction<Long, OUT> provided by the user). For that purpose, the following changes are proposed:
- New IteratorSourceReaderBase is introduced parameterized with both in and out data types generics.
- All methods apart from pollNext() from the IteratorSourceReader are "pulled-up" to the *Base class
- IteratorSourceReader API remains the same while implementing IteratorSourceReaderBase where input and output types are the same
- New GeneratingIteratorSourceReader is introduced where input and output types are different (the result of applying GeneratorFunction)
- GeneratingIteratorSourceReader initializes the GeneratorFunction (if needed), by calling open() method within its start() method.
Code Block |
---|
language | java |
---|
title | IteratorSourceReaderBase |
---|
|
package org.apache.flink.api.connector.source.lib.util;
@Experimental
abstract class IteratorSourceReaderBase<
E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
implements SourceReader<O, SplitT> {...} |
Reader:
Code Block |
---|
language | java |
---|
title | IteratorSourceReader |
---|
|
package org.apache.flink.api.connector.source.lib.util;
@Public
public class IteratorSourceReader<
E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
extends IteratorSourceReaderBase<E, E, IterT, SplitT> {
public IteratorSourceReader(SourceReaderContext context) {
super(context);
}
@Override
public InputStatus pollNext(ReaderOutput<E> output) {...}
} |
Code Block |
---|
language | java |
---|
title | GeneratingIteratorSourceReader |
---|
|
package org.apache.flink.api.connector.source.lib.util;
@Experimental
public class GeneratingIteratorSourceReader<
E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
extends IteratorSourceReaderBase<E, O, IterT, SplitT> {
public GeneratingIteratorSourceReader(
SourceReaderContext context, GeneratorFunction<E, O> generatorFunction) {...}
@Override
public InputStatus pollNext(ReaderOutput<O> output) {...}
} |
RateLimitedSourceReader wraps another SourceReader (delegates to its methods) while rate-limiting the pollNext() calls.
Code Block |
---|
language | java |
---|
title | RateLimitedSourceReader |
---|
|
package org.apache.flink.api.connector.source.lib.util;
@Experimental
class RateLimitedSourceReader<E, SplitT extends SourceSplit>
implements SourceReader<E, SplitT> {
private final SourceReader<E, SplitT> sourceReader;
private final RateLimiter rateLimiter;
public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter rateLimiter) {
this.sourceReader = sourceReader;
this.rateLimiter = rateLimiter;
}
@Override
public void start() {
sourceReader.start();
}
@Override
public InputStatus pollNext(ReaderOutput<E> output) throws Exception {
rateLimiter.acquire();
return sourceReader.pollNext(output);
}
...
} |
Usage:
The envisioned usage for functions that do not contain any class fields that need initialization looks like this:
Code Block |
---|
|
int count = 1000;
int sourceRatePerSecond = 2;
GeneratorFunction<Long, String> generator = index -> "Event from index: " + index;
DataGeneratorSource<String> source = new DataGeneratorSource<>(generator, count, sourceRatePerSecond, Types.STRING);
DataStreamSource<String> watermarked =
env.fromSource(
source,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),
"watermarked"); |
Scenarios, where GeneratorFunction requires initialization of non-serializable fields, is supported as follows:
Code Block |
---|
|
GeneratorFunction<Long, String> generator =
new GeneratorFunction<Long, String>() {
transient SourceReaderMetricGroup sourceReaderMetricGroup;
@Override
public void open(SourceReaderContext readerContext) {
sourceReaderMetricGroup = readerContext.metricGroup();
}
@Override
public String map(Long value) {
return "Generated: >> "
|
Code Block |
---|
|
MapFunction<Long, String> generator = index -> "Event from index: " + index;
DataGeneratorSource<String> source = new DataGeneratorSource<>(generator, 10, Types.STRING);
DataStreamSource<String> watermarked =
env.fromSource(
+ value.toString()
source,
+ "; local metric group: "
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),
+ sourceReaderMetricGroup.hashCode();
}
"watermarked"); |
It is up for discussion if an additional utility method of StreamExecutionEnvironment
with default watermarking might also be desirable (similar to env.fromSequence(long from, long to)
).
Compatibility, Deprecation, and Migration Plan
This feature is a stepping stone toward deprecating the SourceFunction
API (see this discussion).
- After this feature is introduced, it will be documented and promoted as the recommended way to write data generators.
- A list of Flink tests that currently use the
SourceFunction
API will be compiled and follow-up tickets for migration will be created.
Test Plan
- Unit tests will be added to verify the behavior of
Source's Splits
in relation to the SourceReader
- Integration tests will be added to verify correct functioning with different levels of parallelism
Rejected Alternatives
It is possible to use a NumberSequenceSource
followed by a map
function to achieve similar results, however, this has two disadvantages:
- It introduces another level of indirection and is less intuitive to use
- It does not promote best practices of assigning watermarks (see this discussion)
POC Branch:
https://github.com/apache/flink/compare/master...afedulov:flink:FLINK-27919-generator-source
...
};
DataGeneratorSource<String> source = new DataGeneratorSource<>(generator, count, sourceRatePerSecond, Types.STRING); |
- It is up for discussion an addition of a utility method to
StreamExecutionEnvironment
with default watermarking might also be desirable (similar to env.fromSequence(long from, long to)
). - To be able to reuse the existing functionality of
NumberSequenceSource
it is required to change the visibility of NumberSequenceSource.CheckpointSerializer
from private to package-private.
Compatibility, Deprecation, and Migration Plan
This feature is a stepping stone toward deprecating the SourceFunction
API (see this discussion).
- After this feature is introduced, it will be documented and promoted as the recommended way to write data generators.
- A list of Flink tests that currently use the
SourceFunction
API will be compiled and follow-up tickets for migration will be created.
Test Plan
- Unit tests will be added to verify the behavior of
Source's Splits
in relation to the SourceReader
- Integration tests will be added to verify correct functioning with different levels of parallelism