Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Page properties


Discussion thread

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

here (<- link to /list.html?dev@flink.apache.org)here (<- link to issues/jira/browse/FLINK-XXXX)
Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhttps://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4
Vote threadJIRAhttps://lists.apache.org/thread/q1synh3886kk5nzczw3904zzb34gb424
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33695

Release1.19Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently Flink has a limited observability of checkpoint and recovery processes.

...

In this FLIP I’m proposing to add support for reporting traces/spans (example: Image RemovedTraces) and use this mechanism to report checkpointing and recovery traces.traces/spans (example: Image AddedTraces) and use this mechanism to report checkpointing and recovery traces. I hope in the future traces will also prove useful in other areas of Flink like job submission, job state changes, ... . Moreover as the API to report traces will be added to the MetricGroup , users will be also able to access this API. 

Related work:

[FLINK-23411] Expose Flink checkpoint details metrics - ASF JIRA
[FLINK-33071] Log checkpoint statistics - ASF Jira

[FLINK-7894] Improve metrics around fine-grained recovery and associated checkpointing behaviors - ASF JIRA

Public Interfaces

Code Block
/**
 * Span represents something that happened in Flink at certain point of time, that will be reported
 * to a {@link org.apache.flink.traces.reporter.TraceReporter}.
 *
 * <p>Currently we don't support traces with multiple spans. Each span is self-contained and
 * represents things like a checkpoint or recovery.
 */
@Experimental
public interface Span {

    public static SpanBuilder builder(StringClass<?> scopeclassScope, String name) {
        return new SpanBuilder(scopeclassScope, name);
    }

    public String getScope();
    
    public String getName();

    public long getStartTsMillis();

    public long getEndTsMillis();

    /**
     * Currently returned values can be of type String, Long or Double, however more types can
     * be added in the future.
     */
    public Map<String, Object> getAttributes();
}

...

Code Block
@Experimental
public class SpanBuilder {
    SpanSpanBuilder(StringClass<?> scopeclassScope, String name) {...}
    
    public Span build() {...}
    public SpanSpanBuilder setStartTsMillis(long startTsMillis) {...}
    public SpanBuilder setEndTsMillis(long endTsMillis) {...}
    public SpanBuilder setAttribute(String key, String value) {...}
    public SpanBuilder setAttribute(String key, long value) {...}
    public SpanBuilder setAttribute(String key, double value) {...}
}

...

Code Block
@Public
public interface MetricGroup {

    @Experimental
    default void addSpan(Span span) {}
    
    (...)
}


Proposed Changes

Reporting traces

TraceReporter would be handled and loaded the same way as MetricReporter is: via a separate plugin, with the same style of configuration in the flink-conf.yaml.

Implementations of TraceReporter

A simple Log4jTraceReporter will be added. This reporter would be logging reported traces to Flink’s Job Manager log.

OpenTelemetry based TraceReporter will be proposed in another FLIP

Reported traces

Recovery

Recovery trace would initially have just a single span per whole job, with sum/max aggregated values from each subtask:

...

Code Block
... {
    (...)
    SpanBuilder span =
            Span.builder(CheckpointStatsTracker.class.getCanonicalName(), "JobInitialization")
                    .setStartTsMillis(jobInitializationMetrics.getStartTs())
                    .setEndTsMillis(jobInitializationMetrics.getEndTs())
                    .setAttribute(
                            "initializationStatus",
                            jobInitializationMetrics.getStatus().name());
    setDurationSpanAttribute(span, jobInitializationMetrics.getMailboxStart());
    setDurationSpanAttribute(span, jobInitializationMetrics.getReadOutput());
    setDurationSpanAttribute(span, jobInitializationMetrics.getInitializeState());
    setDurationSpanAttribute(span, jobInitializationMetrics.getGateRestore());
    if (jobInitializationMetrics.getCheckpointId() != JobInitializationMetrics.UNSET) {
        span.setAttribute("checkpointId", jobInitializationMetrics.getCheckpointId());
    }
    if (jobInitializationMetrics.getStateSize() != JobInitializationMetrics.UNSET) {
        span.setAttribute("fullSize", jobInitializationMetrics.getStateSize());
    }
}

private void setDurationSpanAttribute(
        SpanBuilder span, JobInitializationMetrics.SumMaxDuration duration) {
    span.setAttribute("max" + duration.getName(), duration.getMax());
    span.setAttribute("sum" + duration.getName(), duration.getSum());
}

Checkpointing

Checkpoint is also reported as a single span trace. One span for whole job per each checkpoint. Currently without aggregated subtask metrics (I hope this will be added in the future).

Code Block
metricGroup.addSpan(
        new Span.builder(CheckpointStatsTracker.class.getCanonicalName(), "Checkpoint")
                .setStartTsMillis(checkpointStats.getTriggerTimestamp())
                .setEndTsMillis(checkpointStats.getLatestAckTimestamp())
                .setAttribute("checkpointId", checkpointStats.getCheckpointId())
                .setAttribute("fullSize", checkpointStats.getStateSize())
                .setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize())
                .setAttribute("checkpointStatus", checkpointStats.getStatus().name()));

Immediate follow up work

  1. Add OpenTelemetry based TraceReporter.   (FLIP-385).

  2. Allow state backends to report more detailed recovery stats (FLIP-386).

    1. For example for Incremental RocksDB (RocksDBIncrementalRestoreOperation) we could easily separately measure time to download files from remote storage and time to load those files into the local RocksDB instance. However this would require to change state backend API.

Mid/long term follow up work

  1. Present aggregated subtask checkpoint metrics (Sync Duration, Async Duration, Alignment Duration, Start Delay.

    1. For now, this is omitted for checkpointing and included for recovery, to minimize the scope of this FLIP and because user has already access to those metrics from the Flink WebUI for example after all.

  2. For checkpointing and recovery traces create separate spans for each subtask within the same single checkpoint trace

Compatibility, Deprecation, and Migration Plan

There will be no impact on existing users, there is no need for any migration.

Test Plan

This feature is already used and tested inside Confluent. Before committing the final accepted version would be tested inside Confluent again.

Rejected Alternatives

The idea of exposing the same values via existing metric API has been rejected due to the reasons explained in the motivation.

...