THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.api.connector.source.lib.util;
@Experimental
public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
implements SourceReader<E, SplitT> {
private final SourceReader<E, SplitT> sourceReader;
private final RateLimiter rateLimiter;
public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter rateLimiter) {
this.sourceReader = sourceReader;
this.rateLimiter = rateLimiter;
}
@Override
public void start() {
sourceReader.start();
}
@Override
public InputStatus pollNext(ReaderOutput<E> output) throws Exception {
rateLimiter.acquire();
return sourceReader.pollNext(output);
}
...
} |
...