Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
/thread/q1synh3886kk5nzczw3904zzb34gb424
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33695

Release1.19Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

In this FLIP I’m proposing to add support for reporting traces/spans (example: Image RemovedTraces) and use this mechanism to report checkpointing and recovery traces.traces/spans (example: Image AddedTraces) and use this mechanism to report checkpointing and recovery traces. I hope in the future traces will also prove useful in other areas of Flink like job submission, job state changes, ... . Moreover as the API to report traces will be added to the MetricGroup , users will be also able to access this API. 

Related work:

[FLINK-23411] Expose Flink checkpoint details metrics - ASF JIRA
[FLINK-33071] Log checkpoint statistics - ASF Jira

...

Code Block
/**
 * Span represents something that happened in Flink at certain point of time, that will be reported
 * to a {@link org.apache.flink.traces.reporter.TraceReporter}.
 *
 * <p>Currently we don't support traces with multiple spans. Each span is self-contained and
 * represents things like a checkpoint or recovery.
 */
@Experimental
public interface Span {

    public static SpanBuilder builder(StringClass<?> scopeclassScope, String name) {
        return new SpanBuilder(scopeclassScope, name);
    }

    public String getScope();
    
    public String getName();

    public long getStartTsMillis();

    public long getEndTsMillis();

    /**
     * Currently returned values can be of type String, Long or Double, however more types can
     * be added in the future.
     */
    public Map<String, Object> getAttributes();
}

...

Code Block
@Experimental
public class SpanBuilder {
    SpanSpanBuilder(StringClass<?> scopeclassScope, String name) {...}
    
    public Span build() {...}
    public SpanSpanBuilder setStartTsMillis(long startTsMillis) {...}
    public SpanBuilder setEndTsMillis(long endTsMillis) {...}
    public SpanBuilder setAttribute(String key, String value) {...}
    public SpanBuilder setAttribute(String key, long value) {...}
    public SpanBuilder setAttribute(String key, double value) {...}
}

...

Recovery trace would initially have just a single span per whole job, with sum/max aggregated values from each subtask:

...

Code Block
... {
    (...)
    SpanBuilder span =
            Span.builder(CheckpointStatsTracker.class.getCanonicalName(), "JobInitialization")
                    .setStartTsMillis(jobInitializationMetrics.getStartTs())
                    .setEndTsMillis(jobInitializationMetrics.getEndTs())
                    .setAttribute(
                            "initializationStatus",
                            jobInitializationMetrics.getStatus().name());
    setDurationSpanAttribute(span, jobInitializationMetrics.getMailboxStart());
    setDurationSpanAttribute(span, jobInitializationMetrics.getReadOutput());
    setDurationSpanAttribute(span, jobInitializationMetrics.getInitializeState());
    setDurationSpanAttribute(span, jobInitializationMetrics.getGateRestore());
    if (jobInitializationMetrics.getCheckpointId() != JobInitializationMetrics.UNSET) {
        span.setAttribute("checkpointId", jobInitializationMetrics.getCheckpointId());
    }
    if (jobInitializationMetrics.getStateSize() != JobInitializationMetrics.UNSET) {
        span.setAttribute("fullSize", jobInitializationMetrics.getStateSize());
    }
}

private void setDurationSpanAttribute(
        SpanBuilder span, JobInitializationMetrics.SumMaxDuration duration) {
    span.setAttribute("max" + duration.getName(), duration.getMax());
    span.setAttribute("sum" + duration.getName(), duration.getSum());
}

Checkpointing

Checkpoint is also reported as a single span trace. One span for whole job per each checkpoint. Currently without aggregated subtask metrics (I hope this will be added in the future).

Code Block
metricGroup.addSpan(
        new Span.builder(CheckpointStatsTracker.class.getCanonicalName(), "Checkpoint")
                .setStartTsMillis(checkpointStats.getTriggerTimestamp())
                .setEndTsMillis(checkpointStats.getLatestAckTimestamp())
                .setAttribute("checkpointId", checkpointStats.getCheckpointId())
                .setAttribute("fullSize", checkpointStats.getStateSize())
                .setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize())
                .setAttribute("checkpointStatus", checkpointStats.getStatus().name()));

...