...
Code Block |
---|
language | java |
---|
title | SourceReaderContext |
---|
|
package org.apache.flink.api.connector.source;
/** The class that expose some context from runtime to the {@link SourceReader}. */
@Public
public interface SourceReaderContext {
...
/**
* Gets the context that contains information about the readers runtime, such as the parallelism
* of the source.
*
* @return The runtime context of the source reader.
*/
RuntimeContext getRuntimeContext();
} |
...
Code Block |
---|
language | java |
---|
title | DataGeneratorSource#createrReader() |
---|
|
@Override
public SourceReader<OUT, NumberSequenceSplit> createReader(SourceReaderContext readerContext)
throws Exception {
if (maxPerSecond > 0) {
int parallelism = readerContext.getRuntimeContext().getNumberOfParallelSubtasks();
RateLimiter rateLimiter = new GuavaRateLimiter(maxPerSecond, parallelism);
return new RateLimitedSourceReader<>(
new MappingIteratorSourceReader<>(readerContext, generatorFunction),
rateLimiter);
} else {
return new MappingIteratorSourceReader<>(readerContext, generatorFunction);
}
} |
Where RateLimiter
Code Block |
---|
language | java |
---|
title | RateLimiter |
---|
collapse | true |
---|
|
package org.apache.flink.api.common.io.ratelimiting;
/** The interface that can be used to throttle execution of methods. */
public interface RateLimiter extends Serializable {
/**
* Acquire method is a blocking call that is intended to be used in places where it is required
* to limit the rate at which results are produced or other functions are called.
*
* @return The number of milliseconds this call blocked its caller.
* @throws InterruptedException The interrupted exception.
*/
int acquire() throws InterruptedException;
} |
---
It is desirable to reuse the functionality of IteratorSourceReader for cases where the input data type is different from the output (IN: Long from the wrapped NumberSequenceSplit, OUT: the result of applying MapFunction<Long, OUT> function provided by the user). For that purpose, the following changes are proposed:
- New IteratorSourceReaderBase is introduced parameterized with both in and out data types generics.
- All methods apart from pollNext() from the IteratorSourceReader are "pulled-up" to the *Base class
- IteratorSourceReader API remains the same while implementing IteratorSourceReaderBase where input and output types are the same
- New MappingIteratorSourceReader is introduced where input and output types are different (result of applying the MapFunction)
Code Block |
---|
language | java |
---|
title | IteratorSourceReaderBase |
---|
|
abstract class IteratorSourceReaderBase<
E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
implements SourceReader<O, SplitT> {...} |
Code Block |
---|
language | java |
---|
firstline | IteratorSourceReader |
---|
|
@Public
public class IteratorSourceReader<
E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
extends IteratorSourceReaderBase<E, E, IterT, SplitT> {
public IteratorSourceReader(SourceReaderContext context) {
super(context);
}
@Override
public InputStatus pollNext(ReaderOutput<E> output) {...} |
This FLIP introduces a new DataGeneratorSource
class.
...