Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSourceReaderContext
package org.apache.flink.api.connector.source;

/** The class that expose some context from runtime to the {@link SourceReader}. */
@Public
public interface SourceReaderContext {
	...
     /**
     * Gets the context that contains information about the readers runtime, such as the parallelism
     * of the source.
     *
     * @return The runtime context of the source reader.
     */
    RuntimeContext getRuntimeContext();
}

...

Code Block
languagejava
titleDataGeneratorSource#createrReader()
   @Override
    public SourceReader<OUT, NumberSequenceSplit> createReader(SourceReaderContext readerContext)
            throws Exception {
        if (maxPerSecond > 0) {
            int parallelism = readerContext.getRuntimeContext().getNumberOfParallelSubtasks();
            RateLimiter rateLimiter = new GuavaRateLimiter(maxPerSecond, parallelism);
            return new RateLimitedSourceReader<>(
                    new MappingIteratorSourceReader<>(readerContext, generatorFunction),
                    rateLimiter);
        } else {
            return new MappingIteratorSourceReader<>(readerContext, generatorFunction);
        }
    }

Where RateLimiter

Code Block
languagejava
titleRateLimiter
collapsetrue
package org.apache.flink.api.common.io.ratelimiting;

/** The interface that can be used to throttle execution of methods. */
public interface RateLimiter extends Serializable {

    /**
     * Acquire method is a blocking call that is intended to be used in places where it is required
     * to limit the rate at which results are produced or other functions are called.
     *
     * @return The number of milliseconds this call blocked its caller.
     * @throws InterruptedException The interrupted exception.
     */
    int acquire() throws InterruptedException;
}

---

It is desirable to reuse the functionality of IteratorSourceReader for cases where the input data type is different from the output (IN: Long from the wrapped NumberSequenceSplit, OUT: the result of applying MapFunction<Long, OUT> function provided by the user). For that purpose, the following changes are proposed:

  • New IteratorSourceReaderBase is introduced parameterized with both in and out data types generics.
  • All methods apart from pollNext() from the IteratorSourceReader are "pulled-up" to the *Base class
  • IteratorSourceReader API remains the same while implementing IteratorSourceReaderBase where input and output types are the same
  • New MappingIteratorSourceReader is introduced where input and output types are different (result of applying the MapFunction)

Code Block
languagejava
titleIteratorSourceReaderBase
abstract class IteratorSourceReaderBase<
                E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        implements SourceReader<O, SplitT> {...}


Code Block
languagejava
firstlineIteratorSourceReader
@Public
public class IteratorSourceReader<
                E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        extends IteratorSourceReaderBase<E, E, IterT, SplitT> {

    public IteratorSourceReader(SourceReaderContext context) {
        super(context);
    }


    @Override
    public InputStatus pollNext(ReaderOutput<E> output) {...}





This FLIP introduces a new DataGeneratorSource class.

...