Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleDataGeneratorSource
@Public
public class DataGeneratorSource<OUT>                 
		implements Source<
                        OUT,
                        NumberSequenceSource.NumberSequenceSplit,
                        Collection<NumberSequenceSource.NumberSequenceSplit>>,
                ResultTypeQueryable<OUT> {   

    private final SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory;
 
    @Override
    public SourceReader<OUT, NumberSequenceSplit> createReader(SourceReaderContext readerContext)
            throws Exception {
        return sourceReaderFactory.newSourceReader(readerContext);
    }
}
Code Block
languagejava
titleGeneratorSourceReaderFactory


Proposed Changes

In order to deliver convenient rate-limiting functionality to the users of the new API, a small addition to the SourceReaderContext is required.

The sum of rates of all parallel readers has to approximate the optional user-defined sourceRatePerSecond parameter. Currently, there is no way for the SourceReaders to acquire the current parallelism of the job they are part of. To overcome this limitation, this FLIP proposes an extension of the SourceReaderContext interface with the currentParallelism() method:


Code Block
languagejava
titleSourceReaderContext
package org.apache.flink.api.connector.source;

/** The class that expose some context from runtime to the {@link SourceReader}. */
@Public
public interface SourceReaderContext {
	...         
	/**public class GeneratorSourceReaderFactory<OUT>
        implements SourceReaderFactory<OUT, NumberSequenceSource.NumberSequenceSplit> {

    public GeneratorSourceReaderFactory(
            GeneratorFunction<Long, OUT> generatorFunction, long sourceRatePerSecond){...}

    @Override
    public SourceReader<OUT, NumberSequenceSource.NumberSequenceSplit> newSourceReader(
            SourceReaderContext readerContext) {
        if (sourceRatePerSecond > 0) {
     * Get the current parallelism of  int parallelism = readerContext.currentParallelism();this Source.
     *
     * @return RateLimiterthe rateLimiterparallelism =of new GuavaRateLimiter(sourceRatePerSecond, parallelism);the Source.
            return new RateLimitedSourceReader<>(*/
    int currentParallelism(); 
}


With the parallelism accessible via SourceReaderContext, initialization of the rate-limiting data generating readers can be taken care of by the SourceReaderFactories. For example:


Code Block
languagejava
titleGeneratorSourceReaderFactory
public class GeneratorSourceReaderFactory<OUT>
                new GeneratingIteratorSourceReader<>(readerContext, generatorFunction),
             implements SourceReaderFactory<OUT, NumberSequenceSource.NumberSequenceSplit> {

    public rateLimiter);GeneratorSourceReaderFactory(
        } else {
  GeneratorFunction<Long, OUT> generatorFunction, long sourceRatePerSecond){...}

    @Override
    public return new GeneratingIteratorSourceReader<>(readerContext, generatorFunction);
SourceReader<OUT, NumberSequenceSource.NumberSequenceSplit> newSourceReader(
            SourceReaderContext readerContext) {
        if (sourceRatePerSecond > 0) }{
      }
}

Proposed Changes

The sum of rates of all parallel readers has to approximate the optional user-defined sourceRatePerSecond parameter. Currently, there is no way for the SourceReaders to acquire the current parallelism of the job they are part of. In order to overcome this limitation, this FLIP proposes an extension of the SourceReaderContext interface with the currentParallelism() method:

Code Block
languagejava
titleSourceReaderContext
package org.apache.flink.api.connector.source;

/** The class that expose some context from runtime to the {@link SourceReader}. */
@Public
public interface SourceReaderContext {
	...         
	/**
     * Get the current parallelism of this Source.
     *
     * @return the parallelism of the Source.
     */
    int currentParallelism(); 
}

The parallelism can be retrieved in the SourceOperator via the RuntimeContext and can be easily provisioned during the anonymous SourceReaderContext initialization in its initReader() method.

With the parallelism accessible via SourceReaderContext, initialization of the data generating readers based on the user-provided generatorFunction could look as follows:

Code Block
languagejava
titleDataGeneratorSource#createrReader()
   @Override
    public SourceReader<OUT, NumberSequenceSplit> createReader(SourceReaderContext readerContext)      int parallelism = readerContext.currentParallelism();
            RateLimiter rateLimiter = new GuavaRateLimiter(sourceRatePerSecond, parallelism);
            return new RateLimitedSourceReader<>(
                    new GeneratingIteratorSourceReader<>(readerContext, generatorFunction),
                    rateLimiter);
        } else {
            throwsreturn Exception {
        if (maxPerSecond > 0) {new GeneratingIteratorSourceReader<>(readerContext, generatorFunction);
        }
    int parallelism = readerContext.currentParallelism();
            RateLimiter rateLimiter = new GuavaRateLimiter(maxPerSecond, parallelism);
            return new RateLimitedSourceReader<>(
                    new GeneratingIteratorSourceReader<>(readerContext, generatorFunction),
                    rateLimiter);
 }
}


Where RateLimiter

Code Block
languagejava
titleRateLimiter
package org.apache.flink.api.common.io.ratelimiting;

/** The interface that can be used to throttle execution of methods. */
public interface RateLimiter extends Serializable {

    /**
     * Acquire method is a blocking call that is intended to be used in places where it is required
     * to limit the rate at which results are produced or other functions are called.
     *
     * @return The number of milliseconds this call blocked its caller.
     * @throws InterruptedException The interrupted exception.
     */
    int acquire() throws InterruptedException;
}

---

It is desirable to reuse the functionality of IteratorSourceReader for cases where the input data type is different from the output (IN: Long from the wrapped NumberSequenceSplit, OUT: the result of applying GeneratorFunction<Long, OUT>  provided by the user). For that purpose, the following changes are proposed:

  • New IteratorSourceReaderBase is introduced parameterized with both in and out data types generics.
  • All methods apart from pollNext() from the IteratorSourceReader are "pulled-up" to the *Base class
  • IteratorSourceReader API remains the same while implementing IteratorSourceReaderBase where input and output types are the same
  • New GeneratingIteratorSourceReader is introduced where input and output types are different (the result of applying GeneratorFunction)
  • GeneratingIteratorSourceReader initializes the GeneratorFunction (if needed), by calling open() method within its start() method.

Code Block
languagejava
titleIteratorSourceReaderBase
package org.apache.flink.api.connector.source.lib.util;

@Experimental
abstract class IteratorSourceReaderBase<
       } else {
       E, O, IterT extends Iterator<E>, returnSplitT newextends GeneratingIteratorSourceReader<>(readerContextIteratorSourceSplit<E, generatorFunction);IterT>>
        }
implements SourceReader<O, SplitT>  {...}


Where RateLimiterReader:

Code Block
languagejava
titleRateLimiterIteratorSourceReader
package org.apache.flink.api.commonconnector.source.iolib.ratelimitingutil;

@Public

/** The interface that can be used to throttle execution of methods. */
public interface RateLimiter extends Serializable {

    /**
     * Acquire method is a blocking call that is intended to be used in places where it is required
     * to limit the rate at which results are produced or other functions are called.
     *
     * @return The number of milliseconds this call blocked its caller.
     * @throws InterruptedException The interrupted exception.
     */
    int acquire() throws InterruptedException;
}

---

It is desirable to reuse the functionality of IteratorSourceReader for cases where the input data type is different from the output (IN: Long from the wrapped NumberSequenceSplit, OUT: the result of applying MapFunction<Long, OUT> function provided by the user). For that purpose, the following changes are proposed:

...

public class IteratorSourceReader<
                E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        extends IteratorSourceReaderBase<E, E, IterT, SplitT> {

    public IteratorSourceReader(SourceReaderContext context) {
        super(context);
    }

    @Override
    public InputStatus pollNext(ReaderOutput<E> output) {...}

}


Code Block
languagejava
titleMappingIteratorSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Experimental
public class GeneratingIteratorSourceReader<
                E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        extends IteratorSourceReaderBase<E, O, IterT, SplitT> {

    public GeneratingIteratorSourceReader(
            SourceReaderContext context, GeneratorFunction<E, O> generatorFunction) {...} 

    @Override
    public InputStatus pollNext(ReaderOutput<O> output)  {...} 

    
}


RateLimitedSourceReader wraps another SourceReader (delegates to its methods) while rate-limiting the pollNext() calls.

...

Code Block
languagejava
titleIteratorSourceReaderBaseRateLimitedSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Experimental
abstract class IteratorSourceReaderBase<
           public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
     E, O, IterT extendsimplements Iterator<E>SourceReader<E, SplitT extends IteratorSourceSplit<E, IterT>>SplitT> {

    private final   implements SourceReader<OSourceReader<E, SplitT> {...}

Reader:

Code Block
languagejava
titleIteratorSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Public
public class IteratorSourceReader<
         sourceReader;
    private final RateLimiter rateLimiter;

    public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter rateLimiter) {
        E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        extends IteratorSourceReaderBase<E, E, IterT, SplitT> {

this.sourceReader = sourceReader;
        this.rateLimiter = rateLimiter;
    }

    @Override
    public IteratorSourceReader(SourceReaderContext contextvoid start() {
        supersourceReader.start(context);
    }

    @Override
    public InputStatus pollNext(ReaderOutput<E> output) {...}

}
Code Block
languagejava
titleMappingIteratorSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Experimental
public class GeneratingIteratorSourceReader<
       throws Exception {
          E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
        extends IteratorSourceReaderBase<E, O, IterT, SplitT> {

    public GeneratingIteratorSourceReader(
            SourceReaderContext context, GeneratorFunction<E, O> generatorFunction) {...} 

    @Override
    public InputStatus pollNext(ReaderOutput<O> output)  {...} 

    
}

RateLimitedSourceReader wraps another SourceReader (delegates to its methods) while rate-limiting the pollNext() calls.

rateLimiter.acquire();
        return sourceReader.pollNext(output);
    }
  ...
}

Usage: 

The envisioned usage for functions that do not contain any class fields that need initialization looks like this:

Code Block
languagejava
titleusage
int count = 1000;
int sourceRatePerSecond = 2;
GeneratorFunction<Long, String> generator = index -> "Event from index: " + index;
DataGeneratorSource<String> source = new DataGeneratorSource<>(generator, count, sourceRatePerSecond, Types.STRING);
DataStreamSource<String> watermarked =
                  env.fromSource(
               
Code Block
languagejava
titleRateLimitedSourceReader
package org.apache.flink.api.connector.source.lib.util;

@Experimental
public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
        implements SourceReader<Esource,
   SplitT> {

    private final SourceReader<E, SplitT> sourceReader;
    private final RateLimiter rateLimiter;

    public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter rateLimiter) {
 WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),
             this.sourceReader = sourceReader;
        this.rateLimiter = rateLimiter;
    }
 "watermarked");

Scenarios, where GeneratorFunction requires initialization of non-serializable fields, is supported as follows:

Code Block
languagejava
titleusage
GeneratorFunction<Long, String> generator =
    @Override
 new   public void startGeneratorFunction<Long, String>() {

      transient SourceReaderMetricGroup sourceReader.start()sourceReaderMetricGroup;

    }

  @Override
  @Override
    public InputStatusvoid pollNextopen(ReaderOutput<E>SourceReaderContext outputreaderContext) throws Exception {
      	  rateLimiter.acquire();
        return sourceReader.pollNext(outputsourceReaderMetricGroup = readerContext.metricGroup();
      }

   ...
}

Usage: 

The envisioned usage looks like this:

Code Block
languagejava
titleusage
int count = 1000;@Override
int sourceRatePerSecond = 2;
GeneratorFunction<Long, String> generator =public index -> "Event from index: " + index;
DataGeneratorSource<String> source = new DataGeneratorSource<>(generator, count, sourceRatePerSecond, Types.STRING);
DataStreamSource<String> watermarked =String map(Long value) {
          return "Generated: >> "
                 + envvalue.fromSourcetoString()
                 + "; local metric    source,group: "
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),+ sourceReaderMetricGroup.hashCode();
            }
       };
DataGeneratorSource<String> source = new DataGeneratorSource<>(generator, count,  "watermarked"sourceRatePerSecond, Types.STRING);

Remarks:

  • It is up for discussion

...

  • an

...

  • addition of a utility method

...

  • to StreamExecutionEnvironment with default watermarking might also be desirable (similar to env.fromSequence(

...

  • long from, long to) ).
  • To be able to reuse the existing functionality of NumberSequenceSource it is required to change the visibility of NumberSequenceSource.CheckpointSerializer from private to package-private. 

Compatibility, Deprecation, and Migration Plan

...

  • Unit tests will be added to verify the behavior of Source's Splits in relation to the SourceReader
  • Integration tests will be added to verify correct functioning with different levels of parallelism

Rejected Alternatives

It is possible to use a NumberSequenceSource followed by a map function to achieve similar results, however, this has two disadvantages:

  • It introduces another level of indirection and is less intuitive to use
  • It does not promote best practices of assigning watermarks (see this discussion)

POC Branch:

https://github.com/apache/flink/compare/master...afedulov:flink:FLINK-27919-generator-source

Latest version: DataGeneratorSourceV3 DataGeneratorSourceV4