Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion thread-
Vote thread-
JIRA

-

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Table of Contents

Motivation

In scenarios where output records represent updates to existing values, there are cases in which output records are required to be updated at a minute-level latency. For instance, a user may want to track the top 5 most-viewed tweets and update the results every 2 minutes. Currently, Flink emits newly generated output results as soon as possible after it finishes processing input records, since it lacks knowledge of the specific update latency required for the use-case In scenarios where output records represent updates to existing values, there are cases in which output records are required to be updated at a minute-level latency. For instance, a user may want to track the top 5 most-viewed tweets and update the results every 2 minutes. Currently, Flink emits newly generated output results as soon as possible after it finishes processing input records, since it lacks knowledge of the specific update latency required for the use-case (i.e.,  2 minutes). This has uneccessarily limited the throughput of a Flink job and consumed more network and computation than actually needed2 minutes). This has unneccessarily limited the throughput of a Flink job and consumed more network and computation than actually needed.

To address this limitation, we propose the inclusion of support for user-configurable flush interval in this FLIP. This configuration would enable Flink to optimize its throughput by buffering output records and emitting only the latest updates at the user-defined interval. By doing so, certain operators, such as non-window aggregations, could achieve much higher throughput by reducing the total number of records emitted by the operator.


Public API

...

1. Add job-level configuration execution.max-flush-interval

  • Name: execution.max-flush-interval

  • Type: Duration

  • Default value: null

  • Description: 

  • If this configuration is not null, operators with idempotent output will have their output results temporarily buffered in the state backend, and flush the latest output (of each key, if the operator's upstream is a keyed stream) that have been collected to downstream operators periodically before the configured interval passes. This optimization helps to improve throughput by reducing the output of intermediate results at the cost of increased latency and state size.

  • If this configuration is not null, operators with idempotent output will have their output results temporarily buffered in the state backend, and flush the latest output (of each key, if the operator's upstream is a keyed stream) that have been collected to downstream operators periodically before the configured interval passes. This optimization helps to improve throughput by reducing the output of intermediate results at the cost of increased latency and state size.


2. Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that can be used for performance optimization, and add the property isOutputIdempotent to OperatorAttribute and the Function interface to indicate whether buffering and flushing optimization can be enabled on an operator2. Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that can be used for performance optimization, and add the property isOutputIdempotent to OperatorAttribute and the Function interface to indicate whether buffering and flushing optimization can be enabled on an operator.

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/** The builder class for {@link OperatorAttributes}. */
@Experimental
public class OperatorAttributesBuilder {
    private boolean isOutputIdempotent = false;
 
  	/** @see OperatorAttributes#isOutputIdempotent */
    public OperatorAttributesBuilder setOutputIdempotent(boolean isOutputIdempotent) {...}

    public OperatorAttributes build() {...}
}

...

Code Block
languagejava
@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...
 
    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

Proposed Changes

We propose to add a subclass of the Output interface as follows. When operators output results through this class, this class will buffer the latest results in the state backend until flush() is invoked. A similar subclass of Output will also be introduced in Table/SQL API, which also supports emitting the latest StreamRecord while providing additional support to the Insert/Delete/Update semanticsWe propose to add a subclass of the Output interface as follows. When operators output results through this class, this class will buffer the latest results in the state backend until flush() is invoked. A similar subclass of Output will also be introduced in Table/SQL API, which also supports emitting the latest StreamRecord while providing additional support to the Insert/Delete/Update semantics.

Code Block
languagejava
/**
 * An {@link Output} that buffers the latest output results from an operator in the state backend.
 *
 * <ul>
 *   <li>If the output result is a StreamRecord, the latest record (of each key, if the operator is
 *       applied on keyed stream, same as below) would be buffered. This means if there has existed
 *       a record (with the same key) in buffer, the record will be overridden by the newly arrived
 *       record.
 *   <li>If the output result is a Watermark, it would be directly emitted if the buffer is still
 *       empty. Otherwise, it would be buffered until flush. Only the latest watermark would be
 *       buffered.
 *   <li>If the output result is a WatermarkStatus, it would be directly emitted if the buffer is
 *       still empty. Otherwise, it would be buffered until flush. Only the latest watermark status
 *       would be buffered.
 *   <li>If the output result is a LatencyMarker, it would be directly emitted if the buffer is
 *       still empty. Otherwise, it would be buffered until flush. All latency marker would be
 *       buffered.
 * </ul>
 *
 * <p>When {@link #flush()} is invoked, all StreamElements in the buffer would be emitted to the
 * actual output, and the buffer would be cleared. If there is no watermark and stream records do
 * not contain timestamp, the buffered stream records and latency markers would be emitted in a
 * random order. Otherwise, elements would be emitted in the following order.
 *
 * <ul>
 *   <li>All stream records whose timestamp is smaller than or equal to the watermark.
 *   <li>The watermark status and watermark, in the order they were buffered.
 *   <li>All stream records whose timestamp is larger than the watermark.
 *   <li>All latency markers.
 * </ul>
 */
@Internal
public class BufferOutput<T> implements Output<StreamRecord<T>> {
    private final Output<StreamRecord<T>> output;

    /** @param output The actual output to flush buffered records to. */
    public BufferOutput(
      ...
      Output<StreamRecord<T>> output
    ) {
        this.output = output;
    }

    @Override
    public void collect(StreamRecord<T> record) {...}

    @Override
    public void close() {...}

    @Override
    public void emitWatermark(Watermark mark) {...}

    @Override
    public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {...}

    @Override
    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {...}

    @Override
    public void emitLatencyMarker(LatencyMarker latencyMarker) {...}

    public void flush() {
      for (...) {
          output.collect(...)
      }
    }
}

...