Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
Page properties


Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently Flink has a limited observability of checkpoint and recovery processes.

...

[FLINK-7894] Improve metrics around fine-grained recovery and associated checkpointing behaviors - ASF JIRA

Public Interfaces

Code Block
/**
 * Span represents something that happened in Flink at certain point of time, that will be reported
 * to a {@link org.apache.flink.traces.reporter.TraceReporter}.
 *
 * <p>Currently we don't support traces with multiple spans. Each span is self-contained and
 * represents things like a checkpoint or recovery.
 */
@Experimental
public interface Span {

    public static SpanBuilder builder(String scope, String name) {
        return new SpanBuilder(scope, name);
    }

    public String getScope();
    
    public String getName();

    public long getStartTsMillis();

    public long getEndTsMillis();

    /**
     * Currently returned values can be of type String, Long or Double, however more types can
     * be added in the future.
     */
    public Map<String, Object> getAttributes();
}

...

Code Block
@Public
public interface MetricGroup {

    @Experimental
    default void addSpan(Span span) {}
    
    (...)
}


Proposed Changes

Reporting traces

TraceReporter would be handled and loaded the same way as MetricReporter is: via a separate plugin, with the same style of configuration in the flink-conf.yaml.

Implementations of TraceReporter

A simple Log4jTraceReporter will be added. This reporter would be logging reported traces to Flink’s Job Manager log.

OpenTelemetry based TraceReporter will be proposed in another FLIP

Reported traces

Recovery

Recovery trace would initially have just a single span, with sum/max aggregated values from each subtask:

...

Code Block
... {
    (...)
    SpanBuilder span =
            Span.builder(CheckpointStatsTracker.class.getCanonicalName(), "JobInitialization")
                    .setStartTsMillis(jobInitializationMetrics.getStartTs())
                    .setEndTsMillis(jobInitializationMetrics.getEndTs())
                    .setAttribute(
                            "initializationStatus",
                            jobInitializationMetrics.getStatus().name());
    setDurationSpanAttribute(span, jobInitializationMetrics.getMailboxStart());
    setDurationSpanAttribute(span, jobInitializationMetrics.getReadOutput());
    setDurationSpanAttribute(span, jobInitializationMetrics.getInitializeState());
    setDurationSpanAttribute(span, jobInitializationMetrics.getGateRestore());
    if (jobInitializationMetrics.getCheckpointId() != JobInitializationMetrics.UNSET) {
        span.setAttribute("checkpointId", jobInitializationMetrics.getCheckpointId());
    }
    if (jobInitializationMetrics.getStateSize() != JobInitializationMetrics.UNSET) {
        span.setAttribute("fullSize", jobInitializationMetrics.getStateSize());
    }
}

private void setDurationSpanAttribute(
        SpanBuilder span, JobInitializationMetrics.SumMaxDuration duration) {
    span.setAttribute("max" + duration.getName(), duration.getMax());
    span.setAttribute("sum" + duration.getName(), duration.getSum());
}

Checkpointing

Code Block
metricGroup.addSpan(
        new Span(CheckpointStatsTracker.class.getCanonicalName(), "Checkpoint")
                .setStartTsMillis(checkpointStats.getTriggerTimestamp())
                .setEndTsMillis(checkpointStats.getLatestAckTimestamp())
                .setAttribute("checkpointId", checkpointStats.getCheckpointId())
                .setAttribute("fullSize", checkpointStats.getStateSize())
                .setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize())
                .setAttribute("checkpointStatus", checkpointStats.getStatus().name()));

Immediate follow up work

  1. Add OpenTelemetry based TraceReporter.   (FLIP-385).

  2. Allow state backends to report more detailed recovery stats (FLIP-386).

    1. For example for Incremental RocksDB (RocksDBIncrementalRestoreOperation) we could easily separately measure time to download files from remote storage and time to load those files into the local RocksDB instance. However this would require to change state backend API.

Mid/long term follow up work

  1. Present aggregated subtask checkpoint metrics (Sync Duration, Async Duration, Alignment Duration, Start Delay.

    1. For now, this is omitted for checkpointing and included for recovery, to minimize the scope of this FLIP and because user has already access to those metrics from the Flink WebUI for example after all.

  2. For checkpointing and recovery traces create separate spans for each subtask within the same single checkpoint trace

Compatibility, Deprecation, and Migration Plan

There will be no impact on existing users, there is no need for any migration.

Test Plan

This feature is already used and tested inside Confluent. Before committing the final accepted version would be tested inside Confluent again.

Rejected Alternatives

The idea of exposing the same values via existing metric API has been rejected due to the reasons explained in the motivation.

...