Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Recovery trace would initially have just a single span per whole job, with sum/max aggregated values from each subtask:

...

Code Block
... {
    (...)
    SpanBuilder span =
            Span.builder(CheckpointStatsTracker.class.getCanonicalName(), "JobInitialization")
                    .setStartTsMillis(jobInitializationMetrics.getStartTs())
                    .setEndTsMillis(jobInitializationMetrics.getEndTs())
                    .setAttribute(
                            "initializationStatus",
                            jobInitializationMetrics.getStatus().name());
    setDurationSpanAttribute(span, jobInitializationMetrics.getMailboxStart());
    setDurationSpanAttribute(span, jobInitializationMetrics.getReadOutput());
    setDurationSpanAttribute(span, jobInitializationMetrics.getInitializeState());
    setDurationSpanAttribute(span, jobInitializationMetrics.getGateRestore());
    if (jobInitializationMetrics.getCheckpointId() != JobInitializationMetrics.UNSET) {
        span.setAttribute("checkpointId", jobInitializationMetrics.getCheckpointId());
    }
    if (jobInitializationMetrics.getStateSize() != JobInitializationMetrics.UNSET) {
        span.setAttribute("fullSize", jobInitializationMetrics.getStateSize());
    }
}

private void setDurationSpanAttribute(
        SpanBuilder span, JobInitializationMetrics.SumMaxDuration duration) {
    span.setAttribute("max" + duration.getName(), duration.getMax());
    span.setAttribute("sum" + duration.getName(), duration.getSum());
}

Checkpointing

Checkpoint is also reported as a single span trace. One span for whole job per each checkpoint. Currently without aggregated subtask metrics (I hope this will be added in the future).

Code Block
metricGroup.addSpan(
        new Span(CheckpointStatsTracker.class.getCanonicalName(), "Checkpoint")
                .setStartTsMillis(checkpointStats.getTriggerTimestamp())
                .setEndTsMillis(checkpointStats.getLatestAckTimestamp())
                .setAttribute("checkpointId", checkpointStats.getCheckpointId())
                .setAttribute("fullSize", checkpointStats.getStateSize())
                .setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize())
                .setAttribute("checkpointStatus", checkpointStats.getStatus().name()));

...