THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Page properties | |||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block |
---|
/** * Span represents something that happened in Flink at certain point of time, that will be reported * to a {@link org.apache.flink.traces.reporter.TraceReporter}. * * <p>Currently we don't support traces with multiple spans. Each span is self-contained and * represents things like a checkpoint or recovery. */ @Experimental public interface Span { public static SpanBuilder builder(StringClass<?> scopeclassScope, String name) { return new SpanBuilder(scopeclassScope, name); } public String getScope(); public String getName(); public long getStartTsMillis(); public long getEndTsMillis(); /** * Currently returned values can be of type String, Long or Double, however more types can * be added in the future. */ public Map<String, Object> getAttributes(); } |
...
Code Block |
---|
@Experimental public class SpanBuilder { SpanSpanBuilder(StringClass<?> scopeclassScope, String name) {...} public Span build() {...} public SpanSpanBuilder setStartTsMillis(long startTsMillis) {...} public SpanBuilder setEndTsMillis(long endTsMillis) {...} public SpanBuilder setAttribute(String key, String value) {...} public SpanBuilder setAttribute(String key, long value) {...} public SpanBuilder setAttribute(String key, double value) {...} } |
...
Recovery trace would initially have just a single span per whole job, with sum/max aggregated values from each subtask:
...
Code Block |
---|
... {
(...)
SpanBuilder span =
Span.builder(CheckpointStatsTracker.class.getCanonicalName(), "JobInitialization")
.setStartTsMillis(jobInitializationMetrics.getStartTs())
.setEndTsMillis(jobInitializationMetrics.getEndTs())
.setAttribute(
"initializationStatus",
jobInitializationMetrics.getStatus().name());
setDurationSpanAttribute(span, jobInitializationMetrics.getMailboxStart());
setDurationSpanAttribute(span, jobInitializationMetrics.getReadOutput());
setDurationSpanAttribute(span, jobInitializationMetrics.getInitializeState());
setDurationSpanAttribute(span, jobInitializationMetrics.getGateRestore());
if (jobInitializationMetrics.getCheckpointId() != JobInitializationMetrics.UNSET) {
span.setAttribute("checkpointId", jobInitializationMetrics.getCheckpointId());
}
if (jobInitializationMetrics.getStateSize() != JobInitializationMetrics.UNSET) {
span.setAttribute("fullSize", jobInitializationMetrics.getStateSize());
}
}
private void setDurationSpanAttribute(
SpanBuilder span, JobInitializationMetrics.SumMaxDuration duration) {
span.setAttribute("max" + duration.getName(), duration.getMax());
span.setAttribute("sum" + duration.getName(), duration.getSum());
} |
Checkpointing
Checkpoint is also reported as a single span trace. One span for whole job per each checkpoint. Currently without aggregated subtask metrics (I hope this will be added in the future).
Code Block |
---|
metricGroup.addSpan( new Span.builder(CheckpointStatsTracker.class.getCanonicalName(), "Checkpoint") .setStartTsMillis(checkpointStats.getTriggerTimestamp()) .setEndTsMillis(checkpointStats.getLatestAckTimestamp()) .setAttribute("checkpointId", checkpointStats.getCheckpointId()) .setAttribute("fullSize", checkpointStats.getStateSize()) .setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize()) .setAttribute("checkpointStatus", checkpointStats.getStatus().name())); |
...