Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The sum of rates of all parallel readers has to approximate the optional user-defined sourceRatePerSecond parameter. Currently, there is no way for the SourceReaders to acquire the current parallelism of the job they are part of. In order to overcome this limitation, this FLIP proposes an extension of the SourceReaderContext interface with the getRuntimeContext currentParallelism() method:


Code Block
languagejava
titleSourceReaderContext
package org.apache.flink.api.connector.source;

/** The class that expose some context from runtime to the {@link SourceReader}. */
@Public
public interface SourceReaderContext {
	...
         
	/**
     * GetsGet the context that contains information about the readers runtime, such as the parallelism
     *current parallelism of thethis sourceSource.
     *
     * @return Thethe runtimeparallelism context of the source readerSource.
     */
    RuntimeContextint getRuntimeContextcurrentParallelism(); 
}

The respective RuntimeContext is available parallelism can be retrieved in the SourceOperator via the RuntimeContext and can be easily provisioned during the anonymous SourceReaderContext initialization in its initReader() method.

With the runtime context parallelism accessible via SourceReaderContext, initialization of the data generating readers based on the user-provided generatorFunction could look as follows:

...

Code Block
languagejava
titleDataGeneratorSource#createrReader()
   @Override
    public SourceReader<OUT, NumberSequenceSplit> createReader(SourceReaderContext readerContext)
            throws Exception {
        if (maxPerSecond > 0) {
            int parallelism = readerContext.getRuntimeContext().getNumberOfParallelSubtaskscurrentParallelism();
            RateLimiter rateLimiter = new GuavaRateLimiter(maxPerSecond, parallelism);
            return new RateLimitedSourceReader<>(
                    new MappingIteratorSourceReader<>(readerContext, generatorFunction),
                    rateLimiter);
        } else {
            return new MappingIteratorSourceReader<>(readerContext, generatorFunction);
        }
    }

...