Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[This FLIP proposal is a joint work between Yunfeng Zhou  and Dong Lin ]

Table of Contents

Motivation

In scenarios where output records represent updates to existing values, there are cases in which output records are required to be updated at a minute-There are use-cases where job's results need to be emitted only at a minute level latency instead of milli-second level latency. For instance, a user may might want to track the top 5 process an unbounded stream of page-view events from Kafka and compute the most-viewed tweets and update the results page and its page-view count once every 2 minutes. Currently, Flink emits newly generated output results as soon as possible after it finishes processing input records, since it lacks knowledge of the specific update latency required for the use-case (i.e., 2 minutes). This has unneccessarily limited the throughput of a Flink job and consumed more network and computation than actually needed.

To address this limitation, we propose the inclusion of support for user-configurable flush interval in this FLIP. This configuration would enable Flink to optimize its throughput by buffering output records and emitting only the latest updates at the user-defined interval. By doing so, certain operators, such as non-window aggregations, could achieve much higher throughput by reducing the total number of records emitted by the operator.

Public API

1. Add job-level configuration execution.max-flush-interval

  • Name: execution.max-flush-interval

  • Type: Duration

  • Default value: null

  • Description: 

  • If this configuration is not null, operators with idempotent output will have their output results temporarily buffered in the state backend, and flush the latest output (of each key, if the operator's upstream is a keyed stream) that have been collected to downstream operators periodically before the configured interval passes. This optimization helps to improve throughput by reducing the output of intermediate results at the cost of increased latency and state size.

2. Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that can be used for performance optimization, and add the property isOutputIdempotent to OperatorAttribute and the Function interface to indicate whether buffering and flushing optimization can be enabled on an operator.

Currently, in order to address the above use-case, the Flink job should be run in stream mode and emit one record (i.e. the page-id with its page-view count) for each incoming record. If there are 100 page-view events in 2 minutes, the Flink job will emit 100 records. However, given that the user only needs the result once every 2 minutes, it is unnecessary and wasteful to emit these 100 records. Instead, the job only needs to emit 1 record instead of 100 records, which can significantly reduce network bandwidth usage and reduce the load for the downstream operator/storage.

In order to improve performance and reduce resource usage of Flink jobs, we propose to support user-configurable flush interval, which can be used to adjust the interval of emitting intermediate results for operators with idempotent semantics (e.g. aggregation for keyed inputs). This config effectively allows users to tradeoff between data freshness of the job results and the cost of achieving this data freshness.

Public API

1. Add job-level configuration execution.max-flush-interval

  • Name: execution.max-flush-interval

  • Type: Duration

  • Default value: null

  • Description: 

  • If this configuration is not null, operators with idempotent semantics will emit output records (for keys whose values have been updated) only periodically (up to the specified flush interval) instead of emitting one output record for each input. This config allows operators to reduce the output bandwidth usage with increased throughput, at the cost of increased latency and state size.


2. Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that can be used for performance optimization, and add the property isOutputIdempotent to OperatorAttribute and the Function interface to indicate whether the operator's output has idempotent semantics.

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    private boolean isOutputIdempotent = false;
 
  	/** @see OperatorAttributes#isOutputIdempotent */
    public OperatorAttributesBuilder 
Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/** The builder class for {@link OperatorAttributes}. */
@Experimental
public class OperatorAttributesBuilder {
    private boolean isOutputIdempotent = false;
 
  	/** @see OperatorAttributes#isOutputIdempotent */
    public OperatorAttributesBuilder setOutputIdempotent(boolean isOutputIdempotent) {...}

    public OperatorAttributes build() {...}
}

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/**
 * OperatorAttributes element provides information about the operator that can be
 * used for performance optimization.
 */
@Experimental@PublicEvolving
public class OperatorAttributes {
    /**
     * Whether thethis outputoperator resultsis ofidempotent. thisAn operator is idempotent. Toif one beof exact,the idempotencyfollowing here
     * meansconditions thatare itmet:
 would yield the same final*
 effect to collect a list* of<ul>
 output results or to
 *   <li> *Its collectoutput onlyrecords theare lastkeyed. resultAnd (ofwhen eachthere key,are ifmultiple theoutput operatorrecords isfor applied on keyed stream) in thethe same 
     * list.
    key, *
the job result is correct *if <p>Bythis settingoperator thisonly attributeemits tothe true,last itrecord offers an optimization hint that so long as a newer
     * output result is available, the older result (with the same key) can be discarded. This
of these records.
     *   <li> Its output records are not keyed. And when there are multiple output records, the job
     *     *result allowsis Flink to buffer the outputs correct of thethis operator and periodicallyonly emit the last record latestof onesthese everrecords.
     * collected. The operator's throughput can be improved in this way by reducing the total number
     * of records emitted by the operator.
     */
    public boolean isOutputIdempotent() </ul>
     */
    public boolean isOutputIdempotent() {...}
}


Code Block
languagejava
public interface Function extends java.io.Serializable {     
 
    /**
     * Whether thethis outputoperator resultsis ofidempotent. thisAn functionoperator is idempotent. if Toone beof exact,the idempotencyfollowing here
     * meansconditions thatare itmet:
 would yield the same final*
 effect to collect a list* of<ul>
 output results or to
 *   <li> *Its collectoutput onlyrecords theare lastkeyed. resultAnd (ofwhen eachthere key,are ifmultiple theoutput functionrecords isfor appliedthe on keyed stream) in thesame 
     * list.
    key, *
the job result is correct *if <p>Bythis settingoperator thisonly attributeemits tothe true,last itrecord offersof anthese optimizationrecords.
 hint that so long as* a newer
 <li> Its output records are *not outputkeyed. resultAnd iswhen available,there theare oldermultiple resultoutput (withrecords, the same key) can be discarded. This
     * allows Flink to buffer the outputs of the function and periodically emit the latest ones everjob
     *     result is correct of this operator only emit the last record of these records.
     * </ul>
     */ collected. The  function's throughput can be improved in this way by reducing the total number
    
   * of records emitted by the function.
     */
    default public boolean isOutputIdempotent() {
      return false;
    }
}

...

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
 
    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

...

Code Block
languagejava
@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...
 
    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

Proposed Changes

We propose to add a subclass of the Output interface as follows. When operators output results through this class, this class will buffer the latest results in the state backend until flush() is invoked. A similar subclass of Output will also be introduced in Table/SQL API, which also supports emitting the latest StreamRecord while providing additional support to the Insert/Delete/Update semantics.

Code Block
languagejava
/**
 * An {@link Output} that buffers the latest output results from an operator in the state backend.
 *
 * <ul>
 *   <li>If the output result is a StreamRecord, the latest record (of each key, if the operator is
 *       applied on keyed stream, same as below) would be buffered. This means if there has existed
 *       a record (with the same key) in buffer, the record will be overridden by the newly arrived
 *       record.
 *   <li>If the output result is a Watermark, it would be directly emitted if the buffer is still
 *       empty. Otherwise, it would be buffered until flush. Only the latest watermark would be
 *       buffered.
 *   <li>If the output result is a WatermarkStatus, it would be directly emitted if the buffer is
 *       still empty. Otherwise, it would be buffered until flush. Only the latest watermark status
 *       would be buffered.
 *   <li>If the output result is a LatencyMarker, it would be directly emitted if the buffer is
 *       still empty. Otherwise, it would be buffered until flush. All latency marker would be
 *       buffered.
 * </ul>
 *
 * <p>When {@link #flush()} is invoked, all StreamElements in the buffer would be emitted to the
 * actual output, and the buffer would be cleared. If there is nowatermark watermark andor stream records do
 * not containcontains timestamp, the buffered stream records and latency markers would be emitted in a
 * random order. Otherwise, elements would be emitted in the following order.
 *
 * <ul>
 *   <li>All stream records whose timestamp is smaller than or equal to the watermark.
 *   <li>The watermark status and watermark, in the order they were buffered.
 *   <li>All stream records whose timestamp is larger than the watermark.
 *   <li>All latency markers.
 * </ul>
 */
@Internal
public class BufferOutput<T> implements Output<StreamRecord<T>> {
    private final Output<StreamRecord<T>> output;

    /** @param output The actual output to flush buffered records to. */
    public BufferOutput(
      ...
      Output<StreamRecord<T>> output
    ) {
        this.output = output;
    }

    @Override
    public void collect(StreamRecord<T> record) {...}

    @Override
    public void close() {...}

    @Override
    public void emitWatermark(Watermark mark) {...}

    @Override
    public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {...}

    @Override
    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {...}

    @Override
    public void emitLatencyMarker(LatencyMarker latencyMarker) {...}

    public void flush() {
      for (...) {
          output.collect(...)
      }
    }
}

...

  • All input channels are empty and there have been records given to the operator since the last flush.

  • execution.max-flush-interval has passed since the last flush.


Analysis

Performance benefits and overheads brought by this FLIP

If the optimizations proposed in this FLIP is enabled on an operator, its downstream operators would be able to reduce the computation, storage and network resources needed to process the intermediate output results generated by the operator. Suppose the operator used to generate N output records for each key between a max-flush-interval on average, the downstream operators now would only need 1/N of the original resources to handle the incoming data stream, or increase its throughput by N times with the same resource set.

On the other hand, this operator would increate the end-to-end latency of the Flink job, as the latest records would only be output when flush() is triggered. In the worst case where the latest record of a key is buffered right after a flush, the latency of the record would increase by max-flush-interval. Besides, each buffering operation would bring a read & write state access.

Built-in operators and functions that would be affected by this FLIP

With regard to the scenarios in which the optimizations proposed in this FLIP can come to effect, the group-reduce function seems to be the only one among Flink's built-in functions that can utilize these optimizations so far. This would be achieved by having the following operator/function report isOutputIdempotent=true.

...

  • DataStream API

    • KeyedStream#reduce

    • KeyedStream#sum

    • KeyedStream#min

    • KeyedStream#max

    • KeyedStream#minBy

    • KeyedStream#maxBy

  • Table API (similar for SQL)

    • BaseExpressions#distinct

    • BaseExpressions#sum

    • BaseExpressions#sum0

    • BaseExpressions#min

    • BaseExpressions#max

    • BaseExpressions#count

    • BaseExpressions#avg

    • BaseExpressions#firstValue

    • BaseExpressions#lastValue

    • BaseExpressions#listAgg

    • BaseExpressions#stddevPop

    • BaseExpressions#stddevSamp

    • BaseExpressions#varPop

    • BaseExpressions#varSamp

    • BaseExpressions#collect


Compatibility

...

The design proposed in this FLIP is backward compatible.

If this FLIP and FLIP-325 could be implemented, we propose to deprecate the table.exec.mini-batch.* configurations in Table/SQL programs. As we can see, table.exec.mini-batch brings two optimizations: reduced state backend access and reduced number of output records, and the design proposed in this FLIP and FLIP-325 can cover these optimizations with the following additional advantages:

  • Less heap memory usage. The operator only needs to store the aggregated value and one output record for each unique key, rather than the full list of the original elements.

  • Better cache hit rate. Since hot key does not have to be evicted from the cache periodically (due to mini-batch processing).

  • No need to increase time of the synchronous checkpoint stage.

  • Applicable to operators in DataStream API as well, instead of only Table/SQL API.

Thus the table.exec.mini-batch.* configurations should be deprecated and replaced by the newly proposed designs.

Future Work

...

, Deprecation, and Migration Plan

The design proposed in this FLIP is backward compatible.

No deprecation or migration plan is needed.

Future Work

1) Add extra mechanism to upper-bound the increase to end-to-end latency.

When a Flink job contains multiple operators in series and more than one of them configure isOutputIdempotent=true, the end-to-end latency brought by each operator might be accumulated and the overall latency overhead could be max-flush-interval * num-operators. This problem is now alleviated by triggering flush when input channels are empty, as described in "Proposed Changes" section.

If the resulting increase to the end-to-end latency is typically much higher than max-flush-interval, we can introduce a control flow message that travels through the job graph in topological order and triggers flush on each StreamTask, and resolve the accumulation of the buffering latency with this message.


2) Deprecate table.exec.mini-batch.* configurations after both FLIP-325 and FLIP-365 has been implemented.

table.exec.mini-batch brings two optimizations: reduced state backend access and reduced number of output records. The design proposed in this FLIP and FLIP-325 can cover these optimizations with the following additional advantages:

  • Less heap memory usage. The operator only needs to store the aggregated value and one output record for each unique key, rather than the full list of the original elements.

  • Better cache hit rate. Since hot key does not have to be evicted from the cache periodically (due to mini-batch processing).

  • No need to increase time of the synchronous checkpoint stage.

  • Applicable to operators in DataStream API as well, instead of only Table/SQL API.