THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.api.connector.source.lib.util;
@Experimental
abstract class IteratorSourceReaderBase<
E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
implements SourceReader<O, SplitT> {...} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.api.connector.source.lib.util;
@Public
public class IteratorSourceReader<
E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
extends IteratorSourceReaderBase<E, E, IterT, SplitT> {
public IteratorSourceReader(SourceReaderContext context) {
super(context);
}
@Override
public InputStatus pollNext(ReaderOutput<E> output) {...}
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.api.connector.source.lib.util;
@Experimental
public class MappingIteratorSourceReader<
E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
extends IteratorSourceReaderBase<E, O, IterT, SplitT> {
public MappingIteratorSourceReader(
SourceReaderContext context, MapFunction<E, O> generatorFunction) {...}
@Override
public InputStatus pollNext(ReaderOutput<O> output) {...}
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.api.connector.source.lib.util;
@Experimental
public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
implements SourceReader<E, SplitT> {
private final SourceReader<E, SplitT> sourceReader;
private final RateLimiter rateLimiter;
public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter rateLimiter) {
this.sourceReader = sourceReader;
this.rateLimiter = rateLimiter;
}
@Override
public void start() {
sourceReader.start();
}
@Override
public InputStatus pollNext(ReaderOutput<E> output) throws Exception {
rateLimiter.acquire();
return sourceReader.pollNext(output);
}
...
} |
...