This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Page properties | ||||||||
---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently Flink has a limited observability of checkpoint and recovery processes.
...
Public Interfaces
Code Block |
---|
/** * Span represents something that happened in Flink at certain point of time, that will be reported * to a {@link org.apache.flink.traces.reporter.TraceReporter}. * * <p>Currently we don't support traces with multiple spans. Each span is self-contained and * represents things like a checkpoint or recovery. */ @Experimental public interface Span { public static SpanBuilder builder(String scope, String name) { return new SpanBuilder(scope, name); } public String getScope(); public String getName(); public long getStartTsMillis(); public long getEndTsMillis(); /** * Currently returned values can be of type String, Long or Double, however more types can * be added in the future. */ public Map<String, Object> getAttributes(); } |
...
Code Block |
---|
@Public public interface MetricGroup { @Experimental default void addSpan(Span span) {} (...) } |
Proposed Changes
Reporting traces
TraceReporter would be handled and loaded the same way as MetricReporter is: via a separate plugin, with the same style of configuration in the flink-conf.yaml.
Implementations of TraceReporter
A simple Log4jTraceReporter will be added. This reporter would be logging reported traces to Flink’s Job Manager log.
OpenTelemetry based TraceReporter will be proposed in another FLIP
Reported traces
Recovery
Recovery trace would initially have just a single span, with sum/max aggregated values from each subtask:
...
Code Block |
---|
... { (...) SpanBuilder span = Span.builder(CheckpointStatsTracker.class.getCanonicalName(), "JobInitialization") .setStartTsMillis(jobInitializationMetrics.getStartTs()) .setEndTsMillis(jobInitializationMetrics.getEndTs()) .setAttribute( "initializationStatus", jobInitializationMetrics.getStatus().name()); setDurationSpanAttribute(span, jobInitializationMetrics.getMailboxStart()); setDurationSpanAttribute(span, jobInitializationMetrics.getReadOutput()); setDurationSpanAttribute(span, jobInitializationMetrics.getInitializeState()); setDurationSpanAttribute(span, jobInitializationMetrics.getGateRestore()); if (jobInitializationMetrics.getCheckpointId() != JobInitializationMetrics.UNSET) { span.setAttribute("checkpointId", jobInitializationMetrics.getCheckpointId()); } if (jobInitializationMetrics.getStateSize() != JobInitializationMetrics.UNSET) { span.setAttribute("fullSize", jobInitializationMetrics.getStateSize()); } } private void setDurationSpanAttribute( SpanBuilder span, JobInitializationMetrics.SumMaxDuration duration) { span.setAttribute("max" + duration.getName(), duration.getMax()); span.setAttribute("sum" + duration.getName(), duration.getSum()); } |
Checkpointing
Code Block |
---|
metricGroup.addSpan( new Span(CheckpointStatsTracker.class.getCanonicalName(), "Checkpoint") .setStartTsMillis(checkpointStats.getTriggerTimestamp()) .setEndTsMillis(checkpointStats.getLatestAckTimestamp()) .setAttribute("checkpointId", checkpointStats.getCheckpointId()) .setAttribute("fullSize", checkpointStats.getStateSize()) .setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize()) .setAttribute("checkpointStatus", checkpointStats.getStatus().name())); |
Immediate follow up work
Add OpenTelemetry based TraceReporter. (FLIP-385).
Allow state backends to report more detailed recovery stats (FLIP-386).
For example for Incremental RocksDB (RocksDBIncrementalRestoreOperation) we could easily separately measure time to download files from remote storage and time to load those files into the local RocksDB instance. However this would require to change state backend API.
Mid/long term follow up work
Present aggregated subtask checkpoint metrics (Sync Duration, Async Duration, Alignment Duration, Start Delay.
For now, this is omitted for checkpointing and included for recovery, to minimize the scope of this FLIP and because user has already access to those metrics from the Flink WebUI for example after all.
For checkpointing and recovery traces create separate spans for each subtask within the same single checkpoint trace
Compatibility, Deprecation, and Migration Plan
There will be no impact on existing users, there is no need for any migration.
Test Plan
This feature is already used and tested inside Confluent. Before committing the final accepted version would be tested inside Confluent again.
Rejected Alternatives
The idea of exposing the same values via existing metric API has been rejected due to the reasons explained in the motivation.
...