THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Experimental public class MappingIteratorSourceReader< E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>> extends IteratorSourceReaderBase<E, O, IterT, SplitT> { public MappingIteratorSourceReader( SourceReaderContext context, MapFunction<E, O> generatorFunction) {...} @Override public InputStatus pollNext(ReaderOutput<O> output) {...} } |
RateLimitedSourceReader wraps another SourceReader (delegates to its methods) while rate-limiting the pollNext() calls.
Code Block | ||
---|---|---|
| ||
@Experimental
public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
implements SourceReader<E, SplitT> {
private final SourceReader<E, SplitT> sourceReader;
private final RateLimiter rateLimiter;
public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter rateLimiter) {
checkNotNull(sourceReader);
checkNotNull(rateLimiter);
this.sourceReader = sourceReader;
this.rateLimiter = rateLimiter;
}
@Override
public void start() {
sourceReader.start();
}
@Override
public InputStatus pollNext(ReaderOutput<E> output) throws Exception {
rateLimiter.acquire();
return sourceReader.pollNext(output);
}
...
} |
Usage:
The envisioned usage looks like this:
...