You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 21 Next »

Status

Current state: "Under Discussion"

Discussion thread: https://lists.apache.org/thread/7gjxto1rmkpff4kl54j8nlg5db2rqhkt

JIRA: Unable to render Jira issues macro, execution error.

Released: TBD

Motivation

FLIP-27 sources are non-trivial to implement. At the same time, it is frequently required to generate arbitrary events with a "mock" source. Such requirement arises both for Flink users, in the scope of demo/PoC projects, and for Flink developers when writing tests. The go-to solution for these purposes so far was using pre-FLIP-27 APIs and implementing data generators as SourceFunctions
While the new FLIP-27 Source interface introduces important additional functionality, it comes with significant complexity that presents a hurdle for Flink users for implementing drop-in replacements of the SourceFunction-based data generators.  Meanwhile, SourceFunction is effectively superseded by the Source interface and needs to be eventually deprecated. To fill this gap, this FLIP proposes the introduction of a generic data generator source based on the FLIP-27 API. 

Since it is frequently required to control the rate at which generated events are produced, this FLIP also expands the basic events generation functionality with native support for rate limiting. 

Public Interfaces

A new class with the following API will be introduced. Under the hood it, wraps, and delegates to the NumberSequenceSource utilities. 

DataGeneratorSource
package org.apache.flink.api.connector.source.lib;

/**
 * A data source that produces generators N events of an arbitrary type in parallel. 
 * This source is useful for testing and for cases that just need a stream of N events of any kind.
 *
 * <p>The source splits the sequence into as many parallel sub-sequences as there are parallel
 * source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
 * limited to one, this will produce one sequence in order.
 *
 * <p>This source is always bounded. For very long sequences user may want to consider executing 
 * the application in a streaming manner, because, despite the fact that the produced stream is bounded, 
 * the end bound is pretty far away.
 */

@Public
public class DataGeneratorSource<OUT>                 
		implements Source<
                        OUT,
                        NumberSequenceSource.NumberSequenceSplit,
                        Collection<NumberSequenceSource.NumberSequenceSplit>>,
                ResultTypeQueryable<OUT> {    

    /**
     * Creates a new {@code DataGeneratorSource} that produces <code>count</code> records in
     * parallel.
     *
     * @param generatorFunction The generator function that receives index numbers and translates
     *     them into events of the output type.
     * @param count The number of events to be produced.
     * @param typeInfo The type information of the returned events.
     */
    public DataGeneratorSource(
            MapFunction<Long, OUT> generatorFunction, long count, TypeInformation<OUT> typeInfo) {
        this.typeInfo = checkNotNull(typeInfo);
        this.generatorFunction = checkNotNull(generatorFunction);
        this.numberSource = new NumberSequenceSource(0, count);
    }
    

     /**
     * Creates a new {@code DataGeneratorSource} that produces <code>count</code> records in
     * parallel.
     *
     * @param generatorFunction The generator function that receives index numbers and translates
     *     them into events of the output type.
     * @param count The number of events to be produced.
     * @param sourceRatePerSecond The maximum number of events per seconds that this generator aims
     *     to produce. This is a target number for the whole source and the individual parallel
     *     source instances automatically adjust their rate taking based on the {@code
     *     sourceRatePerSecond} and the source parallelism.
     * @param typeInfo The type information of the returned events.
     */     
    public DataGeneratorSource(
            MapFunction<Long, OUT> generatorFunction,
            long count,
            long sourceRatePerSecond,
            TypeInformation<OUT> typeInfo);
}


Proposed Changes

The sum of rates of all parallel readers has to approximate the optional user-defined sourceRatePerSecond parameter. Currently, there is no way for the SourceReaders to acquire the current parallelism of the job they are part of. In order to overcome this limitation, this FLIP proposes an extension of the SourceReaderContext interface with the getRuntimeContext method:


SourceReaderContext
package org.apache.flink.api.connector.source;

/** The class that expose some context from runtime to the {@link SourceReader}. */
@Public
public interface SourceReaderContext {
	...
     /**
     * Gets the context that contains information about the readers runtime, such as the parallelism
     * of the source.
     *
     * @return The runtime context of the source reader.
     */
    RuntimeContext getRuntimeContext();
}

The respective RuntimeContext is available in the SourceOperator and can be easily provisioned during the anonymous SourceReaderContext initialization in its initReader() method.

With the runtime context accessible via SourceReaderContext, initialization of the data generating readers based on the user-provided generatorFunction could look as follows:


DataGeneratorSource#createrReader()
   @Override
    public SourceReader<OUT, NumberSequenceSplit> createReader(SourceReaderContext readerContext)
            throws Exception {
        if (maxPerSecond > 0) {
            int parallelism = readerContext.getRuntimeContext().getNumberOfParallelSubtasks();
            RateLimiter rateLimiter = new GuavaRateLimiter(maxPerSecond, parallelism);
            return new RateLimitedSourceReader<>(
                    new MappingIteratorSourceReader<>(readerContext, generatorFunction),
                    rateLimiter);
        } else {
            return new MappingIteratorSourceReader<>(readerContext, generatorFunction);
        }
    }

Where RateLimiter

RateLimiter
package org.apache.flink.api.common.io.ratelimiting;

/** The interface that can be used to throttle execution of methods. */
public interface RateLimiter extends Serializable {

    /**
     * Acquire method is a blocking call that is intended to be used in places where it is required
     * to limit the rate at which results are produced or other functions are called.
     *
     * @return The number of milliseconds this call blocked its caller.
     * @throws InterruptedException The interrupted exception.
     */
    int acquire() throws InterruptedException;
}

---

It is desirable to reuse the functionality of IteratorSourceReader for cases where the input data type is different from the output (IN: Long from the wrapped NumberSequenceSplit, OUT: the result of applying MapFunction<Long, OUT> function provided by the user). For that purpose, the following changes are proposed:

  • New IteratorSourceReaderBase is introduced parameterized with both in and out data types generics.
  • All methods apart from pollNext() from the IteratorSourceReader are "pulled-up" to the *Base class
  • IteratorSourceReader API remains the same while implementing IteratorSourceReaderBase where input and output types are the same
  • New MappingIteratorSourceReader is introduced where input and output types are different (result of applying the MapFunction)

IteratorSourceReaderBase
@Experimental
abstract class IteratorSourceReaderBase<
                E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        implements SourceReader<O, SplitT> {...}



Error rendering macro 'code': Invalid value specified for parameter 'com.atlassian.confluence.ext.code.render.InvalidValueException'
@Public
public class IteratorSourceReader<
                E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        extends IteratorSourceReaderBase<E, E, IterT, SplitT> {

    public IteratorSourceReader(SourceReaderContext context) {
        super(context);
    }


    @Override
    public InputStatus pollNext(ReaderOutput<E> output) {...}





This FLIP introduces a new DataGeneratorSource class.

The envisioned usage looks like this:

usage
MapFunction<Long, String> generator = index -> "Event from index: " + index;
DataGeneratorSource<String> source = new DataGeneratorSource<>(generator, 10, Types.STRING);
DataStreamSource<String> watermarked =
                  env.fromSource(
                        source,
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),
                        "watermarked");

It is up for discussion if an additional utility method of StreamExecutionEnvironment with default watermarking might also be desirable (similar to env.fromSequence(long from, long to) ).

Compatibility, Deprecation, and Migration Plan

This feature is a stepping stone toward deprecating the SourceFunction API (see this discussion). 

  1. After this feature is introduced, it will be documented and promoted as the recommended way to write data generators.
  2. A list of Flink tests that currently use the SourceFunction API will be compiled and follow-up tickets for migration will be created.

Test Plan

  • Unit tests will be added to verify the behavior of Source's Splits in relation to the SourceReader
  • Integration tests will be added to verify correct functioning with different levels of parallelism

Rejected Alternatives

It is possible to use a NumberSequenceSource followed by a map function to achieve similar results, however, this has two disadvantages:

  • It introduces another level of indirection and is less intuitive to use
  • It does not promote best practices of assigning watermarks (see this discussion)


  • No labels