Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A new class with the following API will be introduced. Under the hood, wraps, and delegates to the NumberSequenceSource utilities. 

Code Block
languagejava
titleDataGeneratorSource
package org.apache.flink.api.connector.source.lib;

/**
 * A data source that produces generators N events of an arbitrary type in parallel. 
 * This source is useful for testing and for cases that just need a stream of N events of any kind.
 *
 * <p>The source splits the sequence into as many parallel sub-sequences as there are parallel
 * source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
 * limited to one, this will produce one sequence in order.
 *
 * <p>This source is always bounded. For very long sequences user may want to consider executing 
 * the application in a streaming manner, because, despite the fact that the produced stream is bounded, 
 * the end bound is pretty far away.
 */

@Public
public class DataGeneratorSource<OUT>                 
		implements Source<
                        OUT,
                        NumberSequenceSource.NumberSequenceSplit,
                        Collection<NumberSequenceSource.NumberSequenceSplit>>,
                ResultTypeQueryable<OUT> {    

    /**
     * Creates a new {@code DataGeneratorSource} that produces <code>count</code> records in
     * parallel.
     *
     * @param generatorFunction The generator function that receives index numbers and translates
     *     them into events of the output type.
     * @param count The number of events to be produced.
     * @param typeInfo The type information of the returned events.
     */
    public DataGeneratorSource(
            MapFunction<Long, OUT> generatorFunction, long count, TypeInformation<OUT> typeInfo) {
        this.typeInfo = checkNotNull(typeInfo);
        this.generatorFunction = checkNotNull(generatorFunction);
        this.numberSource = new NumberSequenceSource(0, count);
    }
    

     /**
     * Creates a new {@code DataGeneratorSource} that produces <code>count</code> records in
     * parallel.
     *
     * @param generatorFunction The generator function that receives index numbers and translates
     *     them into events of the output type.
     * @param count The number of events to be produced.
     * @param sourceRatePerSecond The maximum number of events per seconds that this generator aims
     *     to produce. This is a target number for the whole source and the individual parallel
     *     source instances automatically adjust their rate taking based on the {@code
     *     sourceRatePerSecond} and the source parallelism.
     * @param typeInfo The type information of the returned events.
     */     
    public DataGeneratorSource(
            MapFunction<Long, OUT> generatorFunction,
            long count,
            long sourceRatePerSecond,
            TypeInformation<OUT> typeInfo) {
     ...
	}
	
    /** A split;
}


Proposed Changes

The sum of rates of all parallel readers has to approximate the optional user-defined sourceRatePerSecond parameter. Currently, there is no way for the SourceReaders to acquire the current parallelism of the job they are part of. In order to overcome this limitation, this FLIP proposes an extension of the SourceReaderContext interface with the getRuntimeContext method:


Code Block
languagejava
titleSourceReaderContext
/** The class that expose some context from runtime to the {@link SourceReader}. */
@Public
public interface SourceReaderContext {
	...
     /**
     * Gets the context that contains information about the readers runtime, such as the parallelism
     * of the source.
     *
     * @return The runtime context of the source, reader.
 representing a number sub-sequence. */
    RuntimeContext getRuntimeContext();
}

The respective RuntimeContext is available in the SourceOperator and can be easily provisioned during the anonymous SourceReaderContext initialization in its initReader() method.

With the runtime context accessible via SourceReaderContext, initialization of the data generating readers based on the user-provided generatorFunction could look as follows:


Code Block
languagejava
titleDataGeneratorSource#createrReader()
   @Override
    public SourceReader<OUT, staticNumberSequenceSplit> class GeneratorSequenceSplit<T>
   createReader(SourceReaderContext readerContext)
            throws Exception {
        if implements(maxPerSecond IteratorSourceSplit<T,> GeneratorSequenceIterator<T>> {
	 0) {
            int parallelism = readerContext.getRuntimeContext().getNumberOfParallelSubtasks();
            RateLimiter rateLimiter = new GuavaRateLimiter(maxPerSecond, parallelism);
        public GeneratorSequenceSplit(
    return new RateLimitedSourceReader<>(
                    NumberSequenceSplitnew numberSequenceSplitMappingIteratorSourceReader<>(readerContext, MapFunction<LonggeneratorFunction),
 T>    generatorFunction) {
            this.numberSequenceSplit = numberSequenceSplit;
rateLimiter);
        } else {
            return this.generatorFunction =new MappingIteratorSourceReader<>(readerContext, generatorFunction);
        }
		...
	}
}

...

    }



/** The interface that can be used to throttle execution of methods. */
public interface RateLimiter extends Serializable {

/**
* Acquire method is a blocking call that is intended to be used in places where it is required
* to limit the rate at which results are produced or other functions are called.
*
* @return The number of milliseconds this call blocked its caller.
* @throws InterruptedException The interrupted exception.
*/
int acquire() throws InterruptedException;
}




This FLIP introduces a new DataGeneratorSource class.

...