Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently Flink has a limited observability of checkpoint and recovery processes.

For checkpointing Flink has a very detailed overview in the Flink WebUI, which works great in many use cases, however it’s problematic if one is operating multiple Flink clusters, or if cluster/JM dies. Additionally there are a couple of metrics (like lastCheckpointDuration or lastCheckpointSize), however those metrics have a couple of issues:

  • They are reported and refreshed periodically, depending on the MetricReporter settings, which doesn’t take into account checkpointing frequency.

    • If checkpointing interval > metric reporting interval, we would be reporting the same values multiple times.

    • If checkpointing interval < metric reporting interval, we would be randomly dropping metrics for some of the checkpoints.

For recovery we are missing even the most basic of the metrics and Flink WebUI support. Also given the fact that recovery is even less frequent compared to checkpoints, adding recovery metrics would have even bigger problems with unnecessary reporting the same values.

In this FLIP I’m proposing to add support for reporting traces/spans (example: Image ModifiedTraces) and use this mechanism to report checkpointing and recovery traces.

Related work:

[FLINK-23411] Expose Flink checkpoint details metrics - ASF JIRA
[FLINK-33071] Log checkpoint statistics - ASF Jira

[FLINK-7894] Improve metrics around fine-grained recovery and associated checkpointing behaviors - ASF JIRA

Public Interfaces

Code Block
/**
 * Span represents something that happened in Flink at certain point of time, that will be reported
 * to a {@link org.apache.flink.traces.reporter.TraceReporter}.
 *
 * <p>Currently we don't support traces with multiple spans. Each span is self-contained and
 * represents things like a checkpoint or recovery.
 */
@Experimental
public interface Span {

    public static SpanBuilder builder(String scope, String name) {
        return new SpanBuilder(scope, name);
    }

    public String getScope();
    
    public String getName();

    public long getStartTsMillis();

    public long getEndTsMillis();

    /**
     * Currently returned values can be of type String, Long or Double, however more types can
     * be added in the future.
     */
    public Map<String, Object> getAttributes();
}

...

Code Block
metricGroup.addSpan(
        new Span(CheckpointStatsTracker.class.getCanonicalName(), "Checkpoint")
                .setStartTsMillis(checkpointStats.getTriggerTimestamp())
                .setEndTsMillis(checkpointStats.getLatestAckTimestamp())
                .setAttribute("checkpointId", checkpointStats.getCheckpointId())
                .setAttribute("fullSize", checkpointStats.getStateSize())
                .setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize())
                .setAttribute("checkpointStatus", checkpointStats.getStatus().name()));

Immediate

...

follow up work

  1. Add OpenTelemetry based TraceReporter.   (FLIP-385).

  2. Allow state backends to report more detailed recovery stats (FLIP-386).

    1. For example for Incremental RocksDB (RocksDBIncrementalRestoreOperation) we could easily separately measure time to download files from remote storage and time to load those files into the local RocksDB instance. However this would require to change state backend API.

Mid/long term

...

follow up work

  1. Present aggregated subtask checkpoint metrics (Sync Duration, Async Duration, Alignment Duration, Start Delay.

    1. For now, this is omitted for checkpointing and included for recovery, to minimize the scope of this FLIP and because user has already access to those metrics from the Flink WebUI for example after all.

  2. For checkpointing and recovery traces create separate spans for each subtask within the same single checkpoint trace

...