Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleDataGeneratorSource
package org.apache.flink.api.connector.source.lib;

/**
 * A data source that produces generators N events of an arbitrary type in parallel. 
 * This source is useful for testing and for cases that just need a stream of N events of any kind.
 *
 * <p>The source splits the sequence into as many parallel sub-sequences as there are parallel
 * source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
 * limited to one, this will produce one sequence in order.
 *
 * <p>This source is always bounded. For very long sequences user may want to consider executing 
 * the application in a streaming manner, because, despite the fact that the produced stream is bounded, 
 * the end bound is pretty far away.
 */

@Public
public class DataGeneratorSource<OUT>                 
		implements Source<
                        OUT,
                        NumberSequenceSource.NumberSequenceSplit,
                        Collection<NumberSequenceSource.NumberSequenceSplit>>,
                ResultTypeQueryable<OUT> {    

    
     /**
     * Creates a new {@code DataGeneratorSource} that produces @{code count} records in
     * parallel.
     *
     * @param generatorFunction The generator function that receives index numbers and translates
     *     them into events of the output type.
     * @param count The number of events to be produced.
     * @param typeInfo The type information of the returned events.
     */ 
    public DataGeneratorSourceV4DataGeneratorSource(
            SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
            long count,
            TypeInformation<OUT> typeInfo) {
        this.sourceReaderFactory = checkNotNull(sourceReaderFactory);
        this.typeInfo = checkNotNull(typeInfo);
        this.numberSource = new NumberSequenceSource(0, count);
    }

     /**
     * Creates a new {@code DataGeneratorSource} that produces @{code count} records in
     * parallel.
     *
     * @param generatorFunction The generator function that receives index numbers and translates
     *     them into events of the output type.
     * @param count The number of events to be produced.
     * @param typeInfo The type information of the returned events.
     */
    public DataGeneratorSource(
            GeneratorFunction<Long, OUT> generatorFunction, long count, TypeInformation<OUT> typeInfo) {...}
    }
    

     /**
     * Creates a new {@code DataGeneratorSource} that produces @{code count} records in
     * parallel.
     *
     * @param generatorFunction The generator function that receives index numbers and translates
     *     them into events of the output type.
     * @param count The number of events to be produced.
     * @param sourceRatePerSecond The maximum number of events per seconds that this generator aims
     *     to produce. This is a target number for the whole source and the individual parallel
     *     source instances automatically adjust their rate taking based on the {@code
     *     sourceRatePerSecond} and the source parallelism.
     * @param typeInfo The type information of the returned events.
     */     
    public DataGeneratorSource(
            GeneratorFunction<Long, OUT> generatorFunction,
            long count,
            long sourceRatePerSecond,
            TypeInformation<OUT> typeInfo) {...}


Where  GeneratorFunction supports initialization of local fields with access to the local SourceReaderContext

Code Block
languagejava
titleGeneratorFunction
@Public
public interface GeneratorFunction<T, O> extends Function {

    /**
     * Initialization method for the function. It is called once before the actual working process
     * methods.
     */
    default void open(SourceReaderContext readerContext) throws Exception {}

    /** Tear-down method for the function. */
    default void close() throws Exception {}

    O map(T value) throws Exception;
}


A new SourceReaderFactory interface is introduced.

Code Block
languagejava
titleSourceReaderFactory
public interface  long countSourceReaderFactory<OUT,
 SplitT extends SourceSplit> extends Serializable {
      long sourceRatePerSecondSourceReader<OUT,
 SplitT>           TypeInformation<OUT> typeInfo) {...}

newSourceReader(SourceReaderContext readerContext);
}


Proposed Changes

The sum of rates of all parallel readers has to approximate the optional user-defined sourceRatePerSecond parameter. Currently, there is no way for the SourceReaders to acquire the current parallelism of the job they are part of. In order to overcome this limitation, this FLIP proposes an extension of the SourceReaderContext interface with the currentParallelism() method:

...

Code Block
languagejava
titleDataGeneratorSource#createrReader()
   @Override
    public SourceReader<OUT, NumberSequenceSplit> createReader(SourceReaderContext readerContext)
            throws Exception {
        if (maxPerSecond > 0) {
            int parallelism = readerContext.currentParallelism();
            RateLimiter rateLimiter = new GuavaRateLimiter(maxPerSecond, parallelism);
            return new RateLimitedSourceReader<>(
                    new MappingIteratorSourceReader<>GeneratingIteratorSourceReader<>(readerContext, generatorFunction),
                    rateLimiter);
        } else {
            return new MappingIteratorSourceReader<>GeneratingIteratorSourceReader<>(readerContext, generatorFunction);
        }
    }

...

  • New IteratorSourceReaderBase is introduced parameterized with both in and out data types generics.
  • All methods apart from pollNext() from the IteratorSourceReader are "pulled-up" to the *Base class
  • IteratorSourceReader API remains the same while implementing IteratorSourceReaderBase where input and output types are the same
  • New MappingIteratorSourceReader GeneratingIteratorSourceReader is introduced where input and output types are different (result of applying the MapFunction)

...

Code Block
languagejava
titleMappingIteratorSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Experimental
public class GeneratingIteratorSourceReader<
                E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        extends IteratorSourceReaderBase<E, O, IterT, SplitT> {

    public GeneratingIteratorSourceReader(
            SourceReaderContext context, GeneratorFunction<E, O> generatorFunction) {...} 

    @Override
    public InputStatus pollNext(ReaderOutput<O> output)  {...} 

    
}


RateLimitedSourceReader wraps another SourceReader (delegates to its methods) while rate-limiting the pollNext() calls.

...

Code Block
languagejava
titleusage
int count = 1000;
int sourceRatePerSecond = 2;
MapFunction<LongGeneratorFunction<Long, String> generator = index -> "Event from index: " + index;
DataGeneratorSource<String> source = new DataGeneratorSource<>(generator, count, sourceRatePerSecond, Types.STRING);
DataStreamSource<String> watermarked =
                  env.fromSource(
                        source,
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),
                        "watermarked");

...