Status
...
...
...
| Jira |
---|
server | ASF JIRA |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | FLINK-27919 |
---|
|
|
---|
|
...
Motivation
FLIP-27 sources are non-trivial to implement. At the same time, it is frequently required to generate arbitrary events with a "mock" source. Such requirement arises both for Flink users, in the scope of demo/PoC projects, and for Flink developers when writing tests. The go-to solution for these purposes so far was using pre-FLIP-27 APIs and implementing data generators as SourceFunctions
.
While the new FLIP-27 Source
interface introduces important additional functionality, it comes with significant complexity that presents a hurdle for Flink users for implementing drop-in replacements of the SourceFunction
-based data generators. Meanwhile, SourceFunction
is effectively superseded by the Source
interface and needs to be eventually deprecated. To fill this gap, this FLIP proposes the introduction of a generic data generator source based on the FLIP-27 API.
...
Code Block |
---|
language | java |
---|
title | DataGeneratorSource |
---|
|
package org.apache.flink.api.connector.source.lib;
/**
* A data source that produces generators N events of an arbitrary type in parallel.
* This source is useful for testing and for cases that just need a stream of N events of any kind.
*
* <p>The source splits the sequence into as many parallel sub-sequences as there are parallel
* source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
* limited to one, this will produce one sequence in order.
*
* <p>This source is always bounded. For very long sequences user may want to consider executing
* the application in a streaming manner, because, despite the fact that the produced stream is bounded,
* the end bound is pretty far away.
*/
@Public
public class DataGeneratorSource<OUT>
implements Source<
OUT,
NumberSequenceSource.NumberSequenceSplit,
Collection<NumberSequenceSource.NumberSequenceSplit>>,
ResultTypeQueryable<OUT> {
/**
* Creates a new {@code DataGeneratorSource} that produces @{code count} records in
* parallel.
*
* @param generatorFunction The factory for instantiating the readers of
* type SourceReader<OUT, NumberSequenceSplit>.
* @param count The number of events to be produced.
* @param typeInfo The type information of the returned events.
*/
public DataGeneratorSource(
SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
long count,
TypeInformation<OUT> typeInfo) {
this.sourceReaderFactory = checkNotNull(sourceReaderFactory);
this.typeInfo = checkNotNull(typeInfo);
this.numberSource = new NumberSequenceSource(0, count);
}
/**
* Creates a new {@code DataGeneratorSource} that produces @{code count} records in
* parallel.
*
* @param generatorFunction The generator function that receives index numbers and translates
* them into events of the output type.
* @param count The number of events to be produced.
* @param typeInfo The type information of the returned events.
*/
public DataGeneratorSource(
GeneratorFunction<Long, OUT> generatorFunction, long count, TypeInformation<OUT> typeInfo) {...}
}
/**
* Creates a new {@code DataGeneratorSource} that produces @{code count} records in
* parallel.
*
* @param generatorFunction The generator function that receives index numbers and translates
* them into events of the output type.
* @param count The number of events to be produced.
* @param sourceRatePerSecond The maximum number of events per seconds that this generator aims
* to produce. This is a target number for the whole source and the individual parallel
* source instances automatically adjust their rate taking based on the {@code
* sourceRatePerSecond} and the source parallelism.
* @param typeInfo The type information of the returned events.
*/
public DataGeneratorSource(
GeneratorFunction<Long, OUT> generatorFunction,
long count,
longdouble sourceRatePerSecond,
TypeInformation<OUT> typeInfo) {...}
|
...
Code Block |
---|
language | java |
---|
title | RateLimiter |
---|
|
package org.apache.flink.api.common.io.ratelimiting;
/** The interface that can be used to throttle execution of methods. */
public interface RateLimiter extends Serializable {
/**
* Acquire method is a blocking call that is intended to be used in places where it is required
* to limit the rate at which results are produced or other functions are called.
*
* @return The number of milliseconds this call blocked its caller.
* @throws InterruptedException The interrupted exception.
*/
int acquire() throws InterruptedException;
} |
...
Code Block |
---|
language | java |
---|
title | MappingIteratorSourceReaderGeneratingIteratorSourceReader |
---|
|
package org.apache.flink.api.connector.source.lib.util;
@Experimental
public class GeneratingIteratorSourceReader<
E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
extends IteratorSourceReaderBase<E, O, IterT, SplitT> {
public GeneratingIteratorSourceReader(
SourceReaderContext context, GeneratorFunction<E, O> generatorFunction) {...}
@Override
public InputStatus pollNext(ReaderOutput<O> output) {...}
} |
...
Code Block |
---|
language | java |
---|
title | RateLimitedSourceReader |
---|
|
package org.apache.flink.api.connector.source.lib.util;
@Experimental
public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
implements SourceReader<E, SplitT> {
private final SourceReader<E, SplitT> sourceReader;
private final RateLimiter rateLimiter;
public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter rateLimiter) {
this.sourceReader = sourceReader;
this.rateLimiter = rateLimiter;
}
@Override
public void start() {
sourceReader.start();
}
@Override
public InputStatus pollNext(ReaderOutput<E> output) throws Exception {
rateLimiter.acquire();
return sourceReader.pollNext(output);
}
...
} |
...
- Unit tests will be added to verify the behavior of
Source's Splits
in relation to the SourceReader
- Integration tests will be added to verify correct functioning with different levels of parallelism
POC Branch:
https://github.com/apache/flink/compare/master...afedulov:flink:FLINK-27919-generator-source
...