Status
...
Page properties | |
---|---|
|
...
...
|
...
|
Motivation
FLIP-27 sources are non-trivial to implement. At the same time, it is frequently required to generate arbitrary events with a "mock" source. Such requirement arises both for Flink users, in the scope of demo/PoC projects, and for Flink developers when writing tests. The go-to solution for these purposes so far was using pre-FLIP-27 APIs and implementing data generators as SourceFunctions
.
While the new FLIP-27 Source
interface introduces important additional functionality, it comes with significant complexity that presents a hurdle for Flink users for implementing drop-in replacements of the SourceFunction
-based data generators. Meanwhile, SourceFunction
is effectively superseded by the Source
interface and needs to be eventually deprecated. To fill this gap, this FLIP proposes the introduction of a generic data generator source based on the FLIP-27 API.
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.api.common.io.ratelimiting; /** The interface that can be used to throttle execution of methods. */ public interface RateLimiter extends Serializable { /** * Acquire method is a blocking call that is intended to be used in places where it is required * to limit the rate at which results are produced or other functions are called. * * @return The number of milliseconds this call blocked its caller. * @throws InterruptedException The interrupted exception. */ int acquire() throws InterruptedException; } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.api.connector.source.lib.util;
@Experimental
public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
implements SourceReader<E, SplitT> {
private final SourceReader<E, SplitT> sourceReader;
private final RateLimiter rateLimiter;
public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter rateLimiter) {
this.sourceReader = sourceReader;
this.rateLimiter = rateLimiter;
}
@Override
public void start() {
sourceReader.start();
}
@Override
public InputStatus pollNext(ReaderOutput<E> output) throws Exception {
rateLimiter.acquire();
return sourceReader.pollNext(output);
}
...
} |
...
- Unit tests will be added to verify the behavior of
Source's Splits
in relation to theSourceReader
- Integration tests will be added to verify correct functioning with different levels of parallelism
POC Branch:
https://github.com/apache/flink/compare/master...afedulov:flink:FLINK-27919-generator-source
...