Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleIteratorSourceReaderBase
package org.apache.flink.api.connector.source.lib.util;

@Experimental
abstract class IteratorSourceReaderBase<
                E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        implements SourceReader<O, SplitT> {...}

...

Code Block
languagejava
titleIteratorSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Public
public class IteratorSourceReader<
                E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        extends IteratorSourceReaderBase<E, E, IterT, SplitT> {

    public IteratorSourceReader(SourceReaderContext context) {
        super(context);
    }

    @Override
    public InputStatus pollNext(ReaderOutput<E> output) {...}

}

...

Code Block
languagejava
titleMappingIteratorSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Experimental
public class MappingIteratorSourceReader<
                E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        extends IteratorSourceReaderBase<E, O, IterT, SplitT> {

    public MappingIteratorSourceReader(
            SourceReaderContext context, MapFunction<E, O> generatorFunction) {...} 

    @Override
    public InputStatus pollNext(ReaderOutput<O> output)  {...} 
}

...

Code Block
languagejava
titleRateLimitedSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Experimental
public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
        implements SourceReader<E, SplitT> {

    private final SourceReader<E, SplitT> sourceReader;
    private final RateLimiter rateLimiter;

    public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter rateLimiter) {
        this.sourceReader = sourceReader;
        this.rateLimiter = rateLimiter;
    }

    @Override
    public void start() {
        sourceReader.start();
    }

    @Override
    public InputStatus pollNext(ReaderOutput<E> output) throws Exception {
        rateLimiter.acquire();
        return sourceReader.pollNext(output);
    }
  ...
}

...