...
Table of Contents |
---|
Motivation
Currently, in order to address the above use-case, the Flink job should be run in stream mode and emit one record (i.e. the page-id with its page-view count) for each incoming record. If there are 100 page-view events in 2 minutes, the Flink job will emit 100 records. However, given that the user only needs the result once every 2 minutes, it is unnecessary and wasteful to emit these 100 records. Instead, the job only needs to emit 1 record instead of 100 records, which can significantly reduce network bandwidth usage and reduce the load for the downstream operator/storage.
In order to improve performance and reduce resource usage of Flink jobs, we propose to support user-configurable flush interval, which can be used to adjust the interval of emitting intermediate results for operators with idempotent semantics (e.g. aggregation for keyed inputs). This config effectively allows users to tradeoff between data freshness of the job results and the cost of achieving this data freshness.
Public API
1. Add job-level configuration execution.flush.max-interval
...
2. Add job-level configuration execution.flush.memory-buffer-only
Name: execution.flush.memory-buffer-only
Type: Boolean
Default value: false
Description: Whether only the in-memory cache part of state backends will be used to buffer outputs of operators with idempotent semantics before flush. If true, the buffered outputs will also be flushed when the in-memory cache is full in addition to the periodic flush specified in execution.flush.max-interval. This configuration reduces filesystem and remote state IO but increases the flushing frequency and workload on downstream operators.
...
Code Block | ||
---|---|---|
| ||
/** * An {@link Output} that buffers the latest output results from an operator in the state backend. * * <ul> * <li>If the output result is a StreamRecord, the latest record (of each key, if the operator is * applied on keyed stream, same as below) would be buffered. This means if there has existed * a record (with the same key) in buffer, the record will be overridden by the newly arrived * record. * <li>If the output result is a Watermark, it would be directly emitted if the buffer is still * empty. Otherwise, it would be buffered until flush. Only the latest watermark would be * buffered. * <li>If the output result is a WatermarkStatus, it would be directly emitted if the buffer is * still empty. Otherwise, it would be buffered until flush. Only the latest watermark status * would be buffered. * <li>If the output result is a LatencyMarker, it would be directly emitted if the buffer is * still empty. Otherwise, it would be buffered until flush. Only the earliest latency marker * of each subtask would be buffered. * </ul> * * <p>The output would trigger its flush operation when any of the following conditions is met. * * <ul> * <li>execution.flush.max-interval has passed since last flush. * <li>execution.flush.memory-buffercache-only is enabled and the number of cached records has reached * state.backend.cache.max-entries-num. * <li>{@link #notifyNothingAvailable()} is triggered. * </ul> * * <p>When flush is triggered, all StreamElements in the buffer would be emitted to the actual * output, and the buffer would be cleared. If there is no watermark and stream records do not * contain timestamp, the buffered stream records and latency marker would be emitted in a random * order. Otherwise, * elements would be emitted in the following order. * * <ul> * <li>All stream records whose timestamp is smaller than or equal to the watermark. * <li>The watermark status and watermark, in the order they were buffered. * <li>All stream records whose timestamp is larger than the watermark. * <li>The latency marker. * </ul> */ @Internal public class BufferOutput<T> implements Output<StreamRecord<T>> { private final Output<StreamRecord<T>> output; /** @param output The actual output to flush buffered records to. */ public BufferOutput( ... Output<StreamRecord<T>> output ) { this.output = output; } /** * Notifies the output that no data is currently available, but more data might be available in * future again. */ public void notifyNothingAvailable() { ... } private void flush() { for (...) { output.collect(...) } } } |
execution.flush.max-interval is not null.
The operator reports isOutputIdempotent = true
Analysis
Performance benefits and overheads brought by this FLIP
...
2) Support configuring state backend behavior at module granularity
While this FLIP proposes to store buffered outputs in state backends, there are abilities provided by state backends that are not needed by output buffer. For example, it does not need TTL and RocksDB compaction as it will be periodically cleared. If we could disable these functions, the throughput of output buffering would be further improved. However, currently Flink supports global configurations that apply to all states in a Flink job, so we cannot disable TTL or compaction for output buffer as it will influence other state accesses in operators as well.
A possible solution to this problem is to support configuring state backends' behavior at module's granularity. For example, we could provide an optimization hint to state backends that some states will be frequently cleared, so RocksDB state backend could use this information to disable compaction on these states.
3) Deprecate table.exec.mini-batch.* configurations after both FLIP-369 and FLIP-365 has been implemented.
...