Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion thread-https://lists.apache.org/thread/byfp762r34h6hvox6gcnrgfdnsqqqx6h
Vote thread-
JIRA

-

Release-

...

Table of Contents

Motivation

There are use-cases where job's results need to be emitted only at a minute level latency instead of milli-second level latency. For instance, a user might want to process an unbounded stream of page-view events from Kafka and compute the most-viewed page and its page-view count once every 2 minutes.

Currently, in order to address the above use-case, the Flink job should be run in stream mode and emit one record (i.e. the page-id with its page-view count) for each incoming record. If there are 100 page-view events in 2 minutes, the Flink job will emit 100 records. However, given that the user only needs the result once every 2 minutes, it is unnecessary and wasteful to emit these 100 records. Instead,

...

the job only needs to emit 1 record instead of 100 records, which can significantly reduce network bandwidth usage and reduce the load for the downstream operator/storage.

In order to improve performance and reduce resource usage of Flink jobs, we propose to support user-configurable flush interval, which can be used to adjust the interval of emitting intermediate results for operators with idempotent semantics (e.g. aggregation for keyed inputs). This config effectively allows users to tradeoff between data freshness of the job results and the cost of achieving this data freshness.

Public API

Note that part of the API and implementation introduced in this FLIP relies on FLIP-369. The design proposed in this FLIP can still achieve performance improvement without FLIP-369, while with FLIP-369 this FLIP could bring further optimization.

Public API

1. Add job-level configuration execution.flush.max-1. Add job-level configuration execution.max-flush-interval

  • Name: execution.flush.max-flush-interval

  • Type: Duration

  • Default value: null

  • Description: 

  • If this configuration is not null, operators with idempotent semantics will emit output records (for keys whose values have been updated) only periodically (up to the specified flush interval) instead of emitting one output record for each input. This config allows operators to reduce the output bandwidth usage with increased throughput, at the cost of increased latency and state size.


2. Add

...

job-level configuration execution.flush.memory-buffer-only

The "in-memory cache of state backends" mentioned below is introduced in FLIP-369.

  • Name: execution.flush.memory-buffer-only

  • Type: Boolean

  • Default value: false

  • Description: Whether only the in-memory cache part of state backends will be used to buffer outputs of operators with idempotent semantics before flush. If true, the buffered outputs will also be flushed when the in-memory cache is full in addition to the periodic flush specified in execution.flush.max-interval. This configuration reduces filesystem and remote state IO but increases the flushing frequency and workload on downstream operators.


3. Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that can be used for performance optimization, and add the property isOutputIdempotent to OperatorAttribute and the Function interface to indicate whether the operator's output has idempotent semantics.

Code Block
languagejava
package org.apache.

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    private boolean isOutputIdempotent = false;
 
  	/** @see OperatorAttributes#isOutputIdempotent */
    public OperatorAttributesBuilder setOutputIdempotent(boolean isOutputIdempotent) {...}

    public OperatorAttributes build() {...}
}

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/**
 * OperatorAttributes element provides information about the operator that can be
 * used for performance optimization.
 */
@PublicEvolving
public class OperatorAttributes {
    /**
     * OperatorAttributesWhether elementthis providesoperator informationis aboutidempotent. theAn operator thatis canidempotent be
if *one usedof forthe performancefollowing optimization.
 */
@PublicEvolving
public class OperatorAttributes {
    * conditions are met:
     *
     /** <ul>
     *   <li> WhetherIts thisoutput operatorrecords isare idempotentkeyed. And Anwhen operatorthere isare idempotentmultiple ifoutput onerecords offor the followingsame 
     * conditions are met:
     *
     * <ul>    key, the job result is correct if this operator only emits the last record of these records.
     *   <li> Its output records are not keyed. And when there are multiple output records, for the same job
     *     key, the job result is correct ifof this operator only emitsemit the last record of these records.
     *   <li> Its output records are not keyed. And when there are multiple output records, the job
     *     result is correct of this operator only emit the last record of these records.
     * </ul>
     */
    public boolean isOutputIdempotent() {...}
}</ul>
     */
    public boolean isOutputIdempotent() {...}
}


Code Block
languagejava
public interface Function extends java.io.Serializable {     
 
    /**
     * Whether this operator is idempotent. An operator is idempotent if one of the following 
     * conditions are met:
     *
     * <ul>
Code Block
languagejava
public interface Function extends java.io.Serializable {     
 
    /**
     *   <li> WhetherIts thisoutput operatorrecords isare idempotentkeyed. AnAnd when operatorthere isare idempotentmultiple ifoutput onerecords offor the followingsame 
     * conditions are met:
     *
     * <ul>    key, the job result is correct if this operator only emits the last record of these records.
     *   <li> Its output records are not keyed. And when there are multiple output records for, the same job
     *     key, the job result is correct ifof this operator only emits the last record of these records.
     *   <li> Its output records are not keyed. And when there are multiple output records, the job
     *     result is correct of this operator only emit the last record of these records.
     * </ul>
     */      emit the last record of these records.
     * </ul>
     */      
    default public boolean isOutputIdempotent() {
      return false;
    }
}


3. Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
 
    default publicOperatorAttributes boolean isOutputIdempotentgetOperatorAttributes() {

        return false new OperatorAttributesBuilder().build();
    }
}

...


Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT>StreamOperatorFactory<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
} Serializable {
    ...
 
    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

Proposed Changes

We propose to add a subclass of the Output interface as follows. When operators output results through this class, this class will buffer the latest results in the state backend until flush() is invoked. A similar subclass of Output will also be introduced in Table/SQL API, which also supports emitting the latest StreamRecord while providing additional support to the Insert/Delete/Update semantics.

Code Block
languagejava
@PublicEvolving/**
public interface* StreamOperatorFactory<OUT>An extends Serializable {
@link Output} that buffers ...
 
    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

Proposed Changes

We propose to add a subclass of the Output interface as follows. When operators output results through this class, this class will buffer the latest results in the state backend until flush() is invoked. A similar subclass of Output will also be introduced in Table/SQL API, which also supports emitting the latest StreamRecord while providing additional support to the Insert/Delete/Update semantics.

Code Block
languagejava
/**
 * An {@link Output} that buffers the latest output results from an operator in the state backend.
 *
 * <ul>
 *   <li>If the output result is a StreamRecord, the latest record (of each key, if the operator isthe latest output results from an operator in the state backend.
 *
 * <ul>
 *   <li>If the output result is a StreamRecord, the latest record (of each key, if the operator is
 *       applied on keyed stream, same as below) would be buffered. This means if there has existed
 *       a record (with the same key) in buffer, the record will be overridden by the newly arrived
 *       applied on keyed stream, same as below)record.
 *   <li>If the output result is a Watermark, it would be buffered.directly Thisemitted meansif ifthe therebuffer hasis existedstill
 *       a record (with the same key) in buffer, the record will be overridden by the newly arrivedempty. Otherwise, it would be buffered until flush. Only the latest watermark would be
 *       recordbuffered.
 *   <li>If the output result is a WatermarkWatermarkStatus, it would be directly emitted if the buffer is still
 *       still empty. Otherwise, it would be buffered until flush. Only the latest watermark would bestatus
 *       would be buffered.
 *   <li>If the output result is a WatermarkStatusLatencyMarker, it would be directly emitted if the buffer is
 *       still empty. Otherwise, it would be buffered until flush. Only the latestearliest watermarklatency statusmarker
 *       of each subtask would be buffered.
 *   <li>If the output result is a LatencyMarker, it would be directly emitted if the buffer is
 *       still empty. Otherwise, it would be buffered until flush. All latency marker would be buffered.
 * </ul>
 *
 * <p>The output would trigger its flush operation when any of the following conditions is met.
 *
 * <ul>
 *   <li>execution.flush.max-interval has passed since last flush.
 *   <li>execution.flush.cache-only is enabled and the number of cached records has reached
 *       state.backend.cache.max-entries-num.
 *   <li>{@link #notifyNothingAvailable()} is  bufferedtriggered.
 * </ul>
 *
 * <p>When {@link #flush()} flush is invokedtriggered, all StreamElements in the buffer would be emitted to the actual
 * actual output, and the buffer output, and the buffer would be cleared. If there is no watermark and stream records do not
 * contain timestamp, the buffered stream records and latency marker would be cleared.emitted Ifin there is watermark or stream recordsa random
 * containsorder. timestampOtherwise, the buffered elements would be emitted in the following order.
 *
 * <ul>
 *   <li>All stream records whose timestamp is smaller than or equal to the watermark.
 *   <li>The watermark status and watermark, in the order they were buffered.
 *   <li>All stream records whose timestamp is larger than the watermark.
 *   <li>All<li>The latency markersmarker.
 * </ul>
 */
@Internal
public class BufferOutput<T> implements Output<StreamRecord<T>> {
    private final Output<StreamRecord<T>> output;
 
    /** @param output The actual output to flush buffered records to. */
    public BufferOutput(
      ...
      Output<StreamRecord<T>> output
    ) {
        this.output = output;
    }

    @Override
    public void collect(StreamRecord<T> record) {...}

    @Override/**
    public void* close() {...}

    @Override
    public void emitWatermark(Watermark mark) {...}

    @OverrideNotifies the output that no data is currently available, but more data might be available in
    public void* emitWatermarkStatus(WatermarkStatus watermarkStatus) {...}

future again.
     @Override*/
    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> recordnotifyNothingAvailable() {...}

    @Override
  ...
  public void emitLatencyMarker(LatencyMarker latencyMarker) {...}
 
    publicprivate void flush() {
      for (...) {
          output.collect(...)
      }
    }
}

If all of the following conditions are met, OperatorChain would decorate the original mainOutput variable with BufferedOutput before using it to create and setup the operator.

  • execution.flush.max-interval is not null.

...

  • The operator reports isOutputIdempotent = true


If buffering is enabled according to the requirements above, StreamTask would order the OperatorChain to trigger BufferOutput#notifyNothingAvailable when the input queue is temporarily empty or end of data is reached.


Analysis

Performance benefits and overheads brought by this FLIP

If the optimizations proposed in this FLIP is enabled on an operator, its downstream operators would be able to reduce the computation, storage and network resources needed to process the intermediate output results generated by the operator. Suppose the operator used to generate N output records for each key between a flush.max-interval on average, the downstream operators now would only need 1/N of the original resources to handle the incoming data stream, or increase its throughput by N times with the same resource set.

On the other hand, this feature would incur the following downside / overhead:

  • The end-to-end latency of intermediate results will be higher. This is because each operator might buffer the intermediate results for up to the configured interval before emitting the intermediate result. This will negatively affect the data freshness of the job's output.
  • State size will be larger. This is because the corresponding operators will need to buffer / merge output records by key before emitting those outputs periodically.
  • There is higher state backend access overhead. Each buffering operation would bring a read & write state access. Note that this overhead can be avoided by enabling execution.flush.memory-buffer-only.

Built-in operators and functions that would be affected by this FLIP

With regard to the scenarios in which the optimizations proposed in this FLIP can come to effect, the group-reduce function seems to be the only one among Flink's built-in functions that can utilize these optimizations so far. This would be achieved by having the following operator/function report isOutputIdempotent=true.

  • StreamGroupedReduceOperator

  • GroupAggFunction

After this is applied, the following public API would be able to utilize these optimizations when they are applied on non-windowed stream.

  • DataStream API

    • KeyedStream#reduce

    • KeyedStream#sum

    • KeyedStream#min

    • KeyedStream#max

    • KeyedStream#minBy

    • KeyedStream#maxBy

  • Table API (similar for SQL)

    • BaseExpressions#distinct

    • BaseExpressions#sum

    • BaseExpressions#sum0

    • BaseExpressions#min

    • BaseExpressions#max

    • BaseExpressions#count

    • BaseExpressions#avg

    • BaseExpressions#firstValue

    • BaseExpressions#lastValue

    • BaseExpressions#listAgg

    • BaseExpressions#stddevPop

    • BaseExpressions#stddevSamp

    • BaseExpressions#varPop

    • BaseExpressions#varSamp

    • BaseExpressions#collect


Compatibility, Deprecation, and Migration Plan

The design proposed in this FLIP is backward compatible.

No deprecation or migration plan is needed.

Future Work

1) Add extra mechanism to upper-bound the increase to end-to-end latency.

When a Flink job contains multiple operators in series and more than one of them configure isOutputIdempotent=true, the end-to-end latency brought by each operator might be accumulated and the overall latency overhead could be flush.max-interval * num-operators. This problem is now alleviated by triggering flush when input channels are empty, as described in "Proposed Changes" section.

If the resulting increase to the end-to-end latency is typically much higher than flush.max-interval, we can introduce a control flow message that travels through the job graph in topological order and triggers flush on each StreamTask, and resolve the accumulation of the buffering latency with this message.


2) Support configuring state backend behavior at module granularity

While this FLIP proposes to store buffered outputs in state backends, there are abilities provided by state backends that are not needed by output buffer. For example, it does not need TTL and RocksDB compaction as it will be periodically cleared. If we could disable these functions, the throughput of output buffering would be further improved. However, currently Flink supports global configurations that apply to all states in a Flink job, so we cannot disable TTL or compaction for output buffer as it will influence other state accesses in operators as well.

A possible solution to this problem is to support configuring state backends' behavior at module's granularity. For example, we could provide an optimization hint to state backends that some states will be frequently cleared, so RocksDB state backend could use this information to disable compaction on these states.


3

If all of the following conditions are met, OperatorChain would decorate the original mainOutput variable with BufferedOutput before using it to create and setup the operator.

  • The operator is applied on a keyed stream, or the parallelism of the operator is 1.

  • execution.max-flush-interval is not null.

  • The operator reports isOutputIdempotent = true

If buffering is enabled according to the requirements above, StreamTask would order the OperatorChain to flush all BufferOutput when any of the following conditions is met during runtime.

  • All input channels are empty and there have been records given to the operator since the last flush.

  • execution.max-flush-interval has passed since the last flush.

Analysis

Performance benefits and overheads brought by this FLIP

If the optimizations proposed in this FLIP is enabled on an operator, its downstream operators would be able to reduce the computation, storage and network resources needed to process the intermediate output results generated by the operator. Suppose the operator used to generate N output records for each key between a max-flush-interval on average, the downstream operators now would only need 1/N of the original resources to handle the incoming data stream, or increase its throughput by N times with the same resource set.

On the other hand, this operator would increate the end-to-end latency of the Flink job, as the latest records would only be output when flush() is triggered. In the worst case where the latest record of a key is buffered right after a flush, the latency of the record would increase by max-flush-interval. Besides, each buffering operation would bring a read & write state access.

Built-in operators and functions that would be affected by this FLIP

With regard to the scenarios in which the optimizations proposed in this FLIP can come to effect, the group-reduce function seems to be the only one among Flink's built-in functions that can utilize these optimizations so far. This would be achieved by having the following operator/function report isOutputIdempotent=true.

  • StreamGroupedReduceOperator

  • GroupAggFunction

After this is applied, the following public API would be able to utilize these optimizations when they are applied on non-windowed stream.

  • DataStream API

    • KeyedStream#reduce

    • KeyedStream#sum

    • KeyedStream#min

    • KeyedStream#max

    • KeyedStream#minBy

    • KeyedStream#maxBy

  • Table API (similar for SQL)

    • BaseExpressions#distinct

    • BaseExpressions#sum

    • BaseExpressions#sum0

    • BaseExpressions#min

    • BaseExpressions#max

    • BaseExpressions#count

    • BaseExpressions#avg

    • BaseExpressions#firstValue

    • BaseExpressions#lastValue

    • BaseExpressions#listAgg

    • BaseExpressions#stddevPop

    • BaseExpressions#stddevSamp

    • BaseExpressions#varPop

    • BaseExpressions#varSamp

    • BaseExpressions#collect

Compatibility, Deprecation, and Migration Plan

The design proposed in this FLIP is backward compatible.

No deprecation or migration plan is needed.

Future Work

1) Add extra mechanism to upper-bound the increase to end-to-end latency.

When a Flink job contains multiple operators in series and more than one of them configure isOutputIdempotent=true, the end-to-end latency brought by each operator might be accumulated and the overall latency overhead could be max-flush-interval * num-operators. This problem is now alleviated by triggering flush when input channels are empty, as described in "Proposed Changes" section.

If the resulting increase to the end-to-end latency is typically much higher than max-flush-interval, we can introduce a control flow message that travels through the job graph in topological order and triggers flush on each StreamTask, and resolve the accumulation of the buffering latency with this message.

2) Deprecate table.exec.mini-batch.* configurations after both FLIP-325 369 and FLIP-365 has been implemented.

table.exec.mini-batch brings two optimizations: reduced state backend access and reduced number of output records. The design proposed in this FLIP and FLIP-325 369 can cover these optimizations with the following additional advantages:

...