Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release<Flink Version>
thread/b65r92n8msgdrv6xjsnjwrlwgnxj81j3
Vote threadtba
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-31709

Release<Flink Version>


Please keep Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Storing metadata of completed jobs

...

Proposed Changes

Interfaces & Methods

draw.io Diagram

...

bordertrue

...

diagramName

...

Calling Code Location

...

Migration Plan

...

size() : int

...

not used

...

Can be removed.

...

get(JobID) : ExecutionGraphInfo

...

Dispatcher#cancel ¹

...

The graph’s JobStatus ² is used to decide whether an error should be thrown

...

Dispatcher#requestExecutionGraphInfo ¹

...

It can be replaced by the CompletedJobStore to retrieve the ExecutionGraphInfo

...

Dispatcher#requestJobResult ¹

...

Return type JobResult ²

...

put(ExecutionGraphInfo) : void

...

createDirtyResult(JobResultEntry) : void

...

Both methods are called from within the Dispatcher as soon as the job reaches a globally-terminal state.

...

Used JobResultEntry is also just derived from AccessExecutionGraph². Both methods can be merged into a single one to create the entry with the relevant information.

...

getStoredJobsOverview() : JobsOverview

...

Dispatcher#requestClusterOverview

...

Data can be stored in a file in the JobResultStore folder to persist the information.

...

getAvailableJobDetails() : Collection<JobDetails>

...

Dispatcher#requestMultipleJobDetails

...

Return type JobDetails ²

...

getAvailableJobDetails(JobID) : JobDetails

...

Dispatcher#requestJobStatus

...

Return type JobDetails  ²

...

markResultAsClean(JobID) : void

...

At the end of the cleanup process within the Dispatcher

...

Can be kept as is

...

hasJobResultEntry(JobID) : boolean

...

Dispatcher for checking whether the job is in globally-terminal state

JobMasterServiceLeadershipRunner to check whether the job is in globally-terminal state when processing a failover

...

Can be kept as is

CompletedJobStore
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1161
revision7

  • CompletedJobStore: Takes care of the artifact (the ExecutionGraphInfo) lifecycle.
    • EmbeddedCompletedJobStore: In-memory implementation of both, the job details and the ExecutionGraphInfo
    • FileBasedCompletedJobStore: Writes the ExecutionGraphInfo in files but keeps the JobDetails in emory
  • JobDetailsProvider: Accesses the jobs details which should always live in memory (in contrast to the ExecutionGraphInfo) to have the information accessible in a reasonable amount of time. 

Migration Proposal

ExecutionGraphInfoStore

JobResultStore

Calling Code Location

Migration Plan

size() : int


not used

Can be removed.

get(JobID) : ExecutionGraphInfo


The graph’s JobStatus² is used within Dispatcher#cancel¹ to decide whether an error should be thrown. 

Access JobDetailsProvder to retrieve the JobStatus through the JobDetails of the corresponding job.

Dispatcher#requestExecutionGraphInfo¹ returns the ExecutionGraphInfo.

The method can use CompletedJobStore to retrieve the ExecutionGraphInfo.

Dispatcher#requestJobResult¹ derives the JobResult from the ExecutionGraphInfo.

The method can use CompletedJobStore to retrieve the JobResult through the ExecutionGraphInfo.

put(ExecutionGraphInfo) : void

createDirtyResultAsync(JobResultEntry) : CompletableFuture<Void>

Both methods are called from within the Dispatcher as soon as the job reaches a globally-terminal state.

Merging of the two methods into addDirtyJobAsync(ExecutionGraphInfo) : CompletableFuture<Void> 

getStoredJobsOverview() : JobsOverview


Dispatcher#requestClusterOverview

The data is available through the JobDetailsProvider through the get*Count() methods. The JobsOverview instantiation will happen within the Dispatcher instead.

getAvailableJobDetails() : Collection<JobDetails>


Dispatcher#requestMultipleJobDetails

Moves into JobDetailsProvider.

getAvailableJobDetails(JobID) : JobDetails


Dispatcher#requestJobStatus

Moves into JobDetailsProvider.


markResultAsCleanAsync(JobID) : CompletableFuture<Void>

At the end of the cleanup process within the Dispatcher

CompletableJobStore#markJobAsCleanAsync(JobID) : CompletableFuture<Void>


hasJobResultEntryAsync(JobID) : CompletableFuture<Boolean>

Dispatcher for checking whether the job is in globally-terminal state

JobMasterServiceLeadershipRunner to check whether the job is in globally-terminal state when processing a failover

CompletableJobStore#isGloballyTerminated(JobID) : CompletableFuture<Boolean>


hasDirtyJobResultEntryAsync(JobID) : CompletableFuture<Boolean>

At the end of the cleanup process within the Dispatcher

CompletableJobStore#isCleanedAsync(JobID) : CompletableFuture<Boolean>


hasCleanJobResultEntryAsync(JobID) : CompletableFuture<Boolean>

At the end of the cleanup process within the Dispatcher

CompletableJobStore#isDirty(JobID) : CompletableFuture<Boolean>


getDirtyResults() : Set<JobResult>

Before instantiating the Dispatcher to filter the jobs that are meant to be recovered

CompletableJobStore#getDirtyResults() : Set<ExecutionGraphInfo>

¹ if no JobManagerRunner is registered for the given JobID anymore
² can be derived from ArchivedExecutionGraph

Serialization

The ExecutionGraphInfoStore utilizes Java’s object serialization. It dumps the ExecutionGraphInfo (which includes the ArchivedExecutionGraph and the exception history) into a file. In contrast, the JobResultEntry is currently serialized as JSON containing the JobResult and a version (i.e. the version of the schema). The JobResult is used in the cleanup process to generate a sparse ArchivedExecutionGraph to serve to the REST API while cleaning up after failover. The version was added to allow changes to the format.

Content-wise, we can merge the two representations because the content of the JobResultEntry can be extracted from the ArchivedExecutionGraphFormat-wise, we run into issues because the ArchivedExecutionGraph cannot be fully represented in a human-readable format like JSON without losing the ability to deserialize the graph. Going through the ExecutionGraphInfo structure, the following parts have been identified as problematic for providing a human-readable JSON representation:

  • Accumulators: The ArchivedExecutionGraph comes with the custom Accumulators that were attached to the JobExecutionResult. There is a “stringified” representation of Accumulators which would allow us to add “something” to the JSON representation that's human-readable. But that representation cannot be used for deserialization.
  • ErrorInfo is used in two places (exception history and ArchivedExecution). It stores the timestamp of when an error appeared and the error’s cause as a Throwable. The Throwable is wrapped in a SerializedThrowable to make it Serializable. The SerializedThrowable cannot be JSON-serialized and used for deserialization again.
  • IOMetrics cannot be serialized in JSON in a human-readable format because of the ResultPartitionBytes that come with it.

You can find a detailed list of the ExecutionGraphInfo data structure in the Appendix. But essentially, we could provide a JSON representation of the data structure by keeping the three items mentioned above as Base64-encoded Strings (and provide a human-readable version if possible).

JSON Format

Serializing the ExecutionGraphInfo in JSON allows users to access all the data without Flink binaries. The structure of the ExecutionGraphInfo becomes kind of public, though, which makes it harder to change things in the future.

Binary Format

We could also move away from JSON as the container format. Instead, we use the JDK serialization (analogously to what the ExecutionGraphInfoStore does right now). The version can be encoded in the file name (no specified version would imply the pre-FLIP-360 version). The flaw of this approach is that we always have to use Flink code (from a specific Flink version) to parse the content. This can be achieved through the Flink client. The ClusterClient could provide the API to access those files and to generate the JSON without starting a dedicated cluster.

Persisting the Checkpoint statistics only

As mentioned in the Motivation section of this FLIP, having the latest checkpoint’s external storage path being persisted in the JobResultStore is good enough to cover scenarios where the JobManager disappears too early. One could argue that we could keep the two components and only expose the checkpoint statistics as part of the JobResultStore entry. That would allow us to keep the ArchivedExecutionGraph data structure internal.

Readability & Backwards Compatibility

Both approaches have the flaw that they are not backwards compatible. A file generated by an older version might not be parseable by a new version of Flink if the ArchivedExecutionGraph data structure was modified. We have to provide an deserializer implementation for each format version. A fallback option would be to persist the JobResult individually as part of the container format. That way, we can always have the fallback behavior that is currently implemented. Currently, we create a sparse ArchivedExecutionGraph based on the JobResult if no further information is given.


CompletedJobStore Entry Lifecycle Management/Caching

The following table illustrates the state of entry lifecycle management and caching in the current implementations of the ExecutionGraphInfoStore and the JobResultStore:



ExecutionGraphInfoStore

JobResultStore

In-Memory

Caching

Utilizes cache to maintain the lifecycle of the entries

No caching involved

Lifecycle Mgmt.

TTL and cache capacity can be specified in Flink’s config

Data stays forever in memory (FLINK-32806)

File-based

Caching

Utilizes cache to reduce IO

No caching involved

Lifecycle Mgmt.

TTL and cache capacity can be specified in Flink’s config. 

A separate cache is maintained for JobDetails.

Data is either removed when the job is globally-terminated and its artifacts properly cleaned up or by the user


The lifecycle of the entries should follow the current ExecutionGraphInfoStore implementations. The current JobResultStore behavior can be mimicked by setting the expiration time for an entry to 0.

The in-memory implementation should utilize caching (analogously to the in-memory implementation of the ExecutionGraphInfoStore) to reduce the footprint. But we should add some log messages to inform the user that a completed job disappeared because of its expiration time.

Configuration parameters

Old Parameter

Description

Migration Plan

job-result-store.delete-on-commit (default: true)

Used for determining when the JobResultStore entry should be deleted.

Replaced by jobstore.expiration-time¹ (true=0, false=Integer.MAX_VALUE)

job-result-store.storage-path (default: ${haStorage}/job-result-store/${clusterId})

DFS location of the files

Deprecated and replaced by: jobstore.storage-path (default: ${haStorage}/completed-job-store/{$clusterId})

jobstore.cache-size¹

Number of clean jobs for which the data shall be kept

kept

jobstore.expiration-time¹

TTL for clean job entries

kept

jobstore.max-capacity¹

Capacity for the ExecutionGraphInfo to be kept in-memory

kept

jobstore.type¹

File-based (File) or memory based (Memory)

kept

¹ jobstore.* could be also renamed into completed-job-store.* since it’s less generic. I’m not sure whether there is some desire to keep it like that for the sake of consistency.

Documentation

The following items will be updated:

  • JobResultStore documentation in the High Availability section of the documentation (link)
  • The configuration documentation will be updated according to the changes mentioned above.


Compatibility, Deprecation, and Migration Plan

Artifacts

There is no real concern in terms of backwards compatibility for the ExecutionGraphInfoStore, because its data is lost when shutting down the Flink cluster. Old JobResultStore entries can be still served due to the versioning of the JobResultStore entry format.

Configuration

The JobResultStore-related configuration parameters will become deprecated and replaced by corresponding jobstore.* (or completed-job-store.*) configuration parameters as described above.

Changing the default value for job-result-store.storage-path to jobstore.storage-path/completed-job-store.storage-path would require us to first look for the old default value and only use the new default value if the old one doesn’t exist. It has to be considered that checking for the non-existence of values in object storage can be expensive for large buckets. This check is only done once when initializing the cluster.

Test Plan

  • The existing tests should continue to work with the refactored approach.
  • Additional unit tests might be added to cover new functionality.
  • An e2e test might be added to verify that deleting the JobResultStore entry makes the system behave as expected.

Rejected Alternatives

Investigate merging the HistoryServer into the new component

Having a permanent storage for the ArchivedExecutionGraph might enable us to retrieve all REST requests through the new component. This could allow us to remove the HistoryServer.

The HistoryServer gives us the capability to access jobs from older Flink versions without worrying about the backwards compatibility of the ArchivedExecutionGraph. Working on such a topic would require additional investigations which is out-of-scope for this FLIP.

Splitting up the CompetedJobStore into two different components

Interfaces & Classes

draw.io Diagram
bordertrue
diagramNameCompletedJobStore Split
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1246
revision2

Summary of the changes:

  • CompletedJobStore: Takes care of the failover scenario and stores the actual data. The data might live in-memory all the time. A file-based and an in-memory implementation will be provided.
  • JobDetailsStore: Takes care of the job details which live in memory. The store will also take care of triggering the final deletion of the CompletedJobStore entry if a TTL is set. The file-based CompletedJobStore will provide a in-memory version of the JobDetailsStore (i.e. the JobDetails will be created along the ExecutionGraphInfo and will live in memory as long as the CompletedJobStore entry isn't removed). In contrast, the in-memory CompletedJobStore will come with a JobDetailsStore implementation that accesses the in-memory ExecutionGraphInfo and creates the JobDetails on the fly.

The sequence diagram below visualizes the process of creating and removing the entries.

draw.io Diagram
bordertrue
diagramNameFLIP-360 - Job Completion
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth466
revision2

Appendix
Anchor
appendix
appendix

...

hasDirtyJobResultEntry(JobID) : boolean

...

At the end of the cleanup process within the Dispatcher

...

Can be kept as is

...

hasCleanJobResultEntry(JobID) : boolean

...

At the end of the cleanup process within the Dispatcher

...

Can be kept as is

...

getDirtyResults() : Set<JobResult>

...

Before instantiating the Dispatcher to filter the jobs that are meant to be recovered

...

Can be kept as is

¹ if no JobManagerRunner is registered for the given JobID anymore
² can be derived from ArchivedExecutionGraph

Summary of the changes:

  • ExecutionInfoStore will be renamed into CompletedJobStore to reflect the new purpose
  • Optional: The ExecutionGraphInfoStore methods will be made asynchronous (analogously to what is/was done for the JobResultStore in FLINK-27204)
  • All methods from the JobResultStore will be integrated into new interface CompletedJobStore
  • ExecutionGraphInfoStore#size() will be removed: This method is not used anywhere.
  • JobResultStore#getDirtyResults() is moved into its own interface CompletedJobStoreWithDirtyJobRetrieval (that’s the only method which is used outside of the Dispatcher). It derives from CompletedJobStore.

The file-based and in-memory implementations will implement CompletedJobStoreWithDirtyJobRetrieval but only CompletedJobStore is going to be passed on into the Dispatcher.

Serialization

The ExecutionGraphInfoStore utilizes Java’s object serialization. It dumps the ExecutionGraphInfo (which includes the ArchivedExecutionGraph and the exception history) into a file. In contrast, the JobResultEntry is currently serialized as JSON containing the JobResult and a version (i.e. the version of the schema). The JobResult is used in the cleanup process to generate a sparse ArchivedExecutionGraph to serve to the REST API while cleaning up after failover. The version was added to allow changes to the format.

Content-wise, we can merge the two representations because the content of the JobResultEntry can be extracted from the ArchivedExecutionGraphFormat-wise, we run into issues because the ArchivedExecutionGraph cannot be fully represented in a human-readable format like JSON without losing the ability to deserialize the graph. Going through the ExecutionGraphInfo structure, the following parts have been identified as problematic for providing a human-readable JSON representation:

  • Accumulators: The ArchivedExecutionGraph comes with the custom Accumulators that were attached to the JobExecutionResult. There is a “stringified” representation of Accumulators which would allow us to add “something” to the JSON representation that's human-readable. But that representation cannot be used for deserialization.
  • ErrorInfo is used in two places (exception history and ArchivedExecution). It stores the timestamp of when an error appeared and the error’s cause as a Throwable. The Throwable is wrapped in a SerializedThrowable to make it Serializable. The SerializedThrowable cannot be JSON-serialized and used for deserialization again.
  • IOMetrics cannot be serialized in JSON in a human-readable format because of the ResultPartitionBytes that come with it.

You can find a detailed list of the ExecutionGraphInfo data structure in the Appendix. But essentially, we could provide a JSON representation of the data structure by keeping the three items mentioned above as Base64-encoded Strings (and provide a human-readable version if possible).

JSON Format

Serializing the ExecutionGraphInfo in JSON allows users to access all the data without Flink binaries. The structure of the ExecutionGraphInfo becomes kind of public, though, which makes it harder to change things in the future.

Binary Format

We could also move away from JSON as the container format. Instead, we use the JDK serialization (analogously to what the ExecutionGraphInfoStore does right now). The version can be encoded in the file name (no specified version would imply the pre-FLIP-360 version). The flaw of this approach is that we always have to use Flink code (from a specific Flink version) to parse the content. This can be achieved through the Flink client. The ClusterClient could provide the API to access those files and to generate the JSON without starting a dedicated cluster.

Persisting the Checkpoint statistics only

As mentioned in the Motivation section of this FLIP, having the latest checkpoint’s external storage path being persisted in the JobResultStore is good enough to cover scenarios where the JobManager disappears too early. One could argue that we could keep the two components and only expose the checkpoint statistics as part of the JobResultStore entry. That would allow us to keep the ArchivedExecutionGraph data structure internal.

Readability & Backwards Compatibility

Both approaches have the flaw that they are not backwards compatible. A file generated by an older version might not be parseable by a new version of Flink if the ArchivedExecutionGraph data structure was modified. We have to provide an deserializer implementation for each format version. A fallback option would be to persist the JobResult individually as part of the container format. That way, we can always have the fallback behavior that is currently implemented. Currently, we create a sparse ArchivedExecutionGraph based on the JobResult if no further information is given.

CompletedJobStore Entry Lifecycle Management/Caching

The following table illustrates the state of entry lifecycle management and caching in the current implementations of the ExecutionGraphInfoStore and the JobResultStore:

...

ExecutionGraphInfoStore

...

JobResultStore

...

In-Memory

...

Caching

...

Utilizes cache to maintain the lifecycle of the entries

...

No caching involved

...

Lifecycle Mgmt.

...

TTL and cache capacity can be specified in Flink’s config

...

Data stays forever in memory (FLINK-32806)

...

File-based

...

Caching

...

Utilizes cache to reduce IO

...

No caching involved

...

Lifecycle Mgmt.

...

TTL and cache capacity can be specified in Flink’s config. 

A separate cache is maintained for JobDetails.

...

Data is either removed when the job is globally-terminated and its artifacts properly cleaned up or by the user

The lifecycle of the entries should follow the current ExecutionGraphInfoStore implementations. The current JobResultStore behavior can be mimicked by setting the expiration time for an entry to 0.

The in-memory implementation should utilize caching (analogously to the in-memory implementation of the ExecutionGraphInfoStore) to reduce the footprint. But we should add some log messages to inform the user that a completed job disappeared because of its expiration time.

Configuration parameters

Old

Description

New

job-result-store.delete-on-commit (default: true)

Used for determining when the JobResultStore entry should be deleted.

Replaced by jobstore.expiration-time¹ (true=0, false=Integer.MAX_VALUE)

job-result-store.storage-path (default: ${haStorage}/job-result-store/${clusterId})

DFS location of the files

Deprecated and replaced by: jobstore.storage-path (default: ${haStorage}/completed-job-store/{$clusterId})

jobstore.cache-size¹

Number of clean jobs for which the data shall be kept

kept

jobstore.expiration-time¹

TTL for clean job entries

kept

jobstore.max-capacity¹

Capacity for the ExecutionGraphInfo to be kept in-memory

kept

jobstore.type¹

File-based (File) or memory based (Memory)

kept

¹ jobstore.* could be also renamed into completed-job-store.* since it’s less generic. I’m not sure whether there is some desire to keep it like that for the sake of consistency.

Documentation

The following items will be updated:

  • JobResultStore documentation in the High Availability section of the documentation (link)
  • The configuration documentation will be updated according to the changes mentioned above.

Compatibility, Deprecation, and Migration Plan

Artifacts

There is no real concern in terms of backwards compatibility for the ExecutionGraphInfoStore, because its data is lost when shutting down the Flink cluster. Old JobResultStore entries can be still served due to the versioning of the JobResultStore entry format.

Configuration

The JobResultStore-related configuration parameters will become deprecated and replaced by corresponding jobstore.* (or completed-job-store.*) configuration parameters as described above.

Changing the default value for job-result-store.storage-path to jobstore.storage-path/completed-job-store.storage-path would require us to first look for the old default value and only use the new default value if the old one doesn’t exist. It has to be considered that checking for the non-existence of values in object storage can be expensive for large buckets. This check is only done once when initializing the cluster.

Test Plan

  • The existing tests should continue to work with the refactored approach.
  • Additional unit tests might be added to cover new functionality.
  • An e2e test might be added to verify that deleting the JobResultStore entry makes the system behave as expected.

Rejected Alternatives

Investigate merging the HistoryServer into the new component

Having a permanent storage for the ArchivedExecutionGraph might enable us to retrieve all REST requests through the new component. This could allow us to remove the HistoryServer.

The HistoryServer gives us the capability to access jobs from older Flink versions without worrying about the backwards compatibility of the ArchivedExecutionGraph. Working on such a topic would require additional investigations which is out-of-scope for this FLIP.

Appendix

ExecutionGraphInfo Data Structure

...

Several data structures (e.g. StatsSummarySnapshot) mix up the mutable and immutable version. This FLIP might also include coming up with a clear separation of immutable instances (included in the ArchivedExecutionGraph) and mutable instances (which are part of the ExecutionGraph) using the Archiveable interface.

🔴 ExecutionGraphInfo

🟡 JobID getJobId()

From getArchivedExecutionGraph()

🔴 ArchivedExecutionGraph getArchivedExecutionGraph()


🔴 Iterable<RootExceptionHistoryEntry> getExceptionHistory()



🔴 ErrorInfo

🟢 long getTimestamp()


🔴 SerializedThrowable getException()

Requires Base64 encoding but allows human-readable representation


🔴 ExceptionHistoryEntry (extends ErrorInfo)

🟢 String getFailingTaskName()


🟢 ArchivedTaskManagerLocation getTaskManagerLocation()

TaskManagerLocation does contain InetAddress which is not needed in the ArchivedExecutionGraph (because no processing takes place)

🟡 Map<String, String> getFailureLabels()

From getFailureLabelsFuture() 

🟢 CompletableFuture<Map<String, String>> getFailureLabelsFuture()

We have to block and wait for the future to complete here

🟡 isGlobal()

From getFailingTaskName()


🔴 RootExceptionHistoryEntry (extends ExceptionHistoryEntry)

🔴 Iterable<ExceptionHistoryEntry> getConcurrentExceptions()



🟢 JobStatusProvider

🟢 JobStatus getState()

enum

🟢 long getStatusTimestamp(JobStatus status)

JobStatus to long dictionary


ArchivedExecutionGraph (extends JobStatusProvider)

🟢 String getJsonPlan()


🟢 JobID getJobID()

AbstractID

🟢 String getJobName()


🟢 JobStatus getState()

enum

🔴 ErrorInfo getFailureInfo()


🟢 long getStatusTimestamp(JobStatus status)

JobStatus to long dictionary

🟢 boolean isStoppable()


🟢 Optional<String> getStateBackendName()


🟢 Optional<String> getCheckpointStorageName()


🟢 TernaryBoolean isChangelogStateBackendEnabled()


🟢 Optional<String> getChangelogStorageName()


🟢 StringifiedAccumulatorResult[] getAccumulatorResultsStringified()

Could be used as a human-readable version of the accumulator

🔴 Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorsSerialized()

Base64 encoding necessary

🟢 CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration()

essentially immutable dictionary

🟢 CheckpointStatsSnapshot getCheckpointStatsSnapshot()


🟢 ArchivedExecutionConfig getArchivedExecutionConfig()


🟡 AccessExecutionJobVertex getJobVertex(JobVertexID id)

From getAllVertices()

🔴 Map<JobVertexID, ? extends AccessExecutionJobVertex> getAllVertices()


🟡 Iterable<? extends AccessExecutionJobVertex> getVerticesTopologically()

Order can be serialized through a JobVertexID list

🟡 Iterable<? extends AccessExecutionVertex> getAllExecutionVertices()

From getVerticesTopologically() and getAllVertices()


🟢 StringifiedAccumulatorResult

🟢 String getName()


🟢 String getType()


🟢 String getValue()



🟢 CheckpointStatsSnapshot

🟢 CheckpointStatsCounts getCounts()

mutable dictionary (which could be made immutable for the archived version) of int/long values

🟢 CompletedCheckpointStatsSummarySnapshot getSummaryStats()


🟢 CheckpointStatsHistory getHistory()


🟢 RestoredCheckpointStats getLatestRestoredCheckpoint()



🟢 CompletedCheckpointStatsSummarySnapshot

🟢 StatsSummarySnapshot getEndToEndDurationStats()


🟢 StatsSummarySnapshot getPersistedDataStats()


🟢 StatsSummarySnapshot getProcessedDataStats()


🟢 StatsSummarySnapshot getStateSizeStats()


🟢 StatsSummarySnapshot getCheckpointedSize()



🟢 StatsSummarySnapshot

🟢 long getMinimum()


🟢 long getMaximum()


🟢 long getSum()


🟢 long getCount()


🟢 long getAverage()


🟢 double getQuantile(double quantile)



🟢 CheckpointStatsHistory

🟢 List<AbstractCheckpointStats> getCheckpoints()


🟡 AbstractCheckpointStats getCheckpointById(long checkpointId)

From getCheckpoints()

🟢 CompletedCheckpointStats getLatestCompletedCheckpoint()


🟢 FailedCheckpointStats getLatestFailedCheckpoint()


🟢 CompletedCheckpointStats getLatestSavepoint()



🟢 AbstractCheckpointStats

🟢 CheckpointStatsStatus getStatus()

enum

🟢 int getNumberOfAcknowledgedSubtasks()


🟢 long getStateSize()


🟢 long getCheckpointedSize()


🟢 long getProcessedData()


🟢 long getPersistedData()


🟢 boolean isUnalignedCheckpoint()


🟢 SubtaskStateStats getLatestAcknowledgedSubtaskStats()

essentially immutable dictionary

🟢 long getCheckpointId()


🟢 long getTriggerTimestamp()


🟢 CheckpointProperties getProperties()


🟢 int getNumberOfSubtasks()


🟢 TaskStateStats getTaskStateStats(JobVertexID jobVertexId)


🟢 Collection<TaskStateStats> getAllTaskStateStats()


🟡 long getLatestAckTimestamp()

from getLatestAcknowledgedSubtaskStats()

🟡 long getEndToEndDuration()

from getLatestAcknowledgedSubtaskStats()


🟢 CompletedCheckpointStats (extends AbstractCheckpointStats)

🟢 boolean isDiscarded()


🟢 String getExternalPath()



🟢 PendingCheckpointStats (extends AbstractCheckpointStats)


🟢 FailedCheckpointStats (extends PendingCheckpointStats)

🟡 long getEndToEndDuration()

from getFailureTimestamp() and getTriggerTimestamp()

🟢 long getFailureTimestamp()


🟢 String getFailureMessage()



🟢 TaskStateStats

🟢 JobVertexID getJobVertexId()


🟡 int getNumberOfSubtasks()

from getSubtaskStats()

🟢 int getNumberOfAcknowledgedSubtasks()


🟢 SubtaskStateStats getLatestAcknowledgedSubtaskStats()


🟡 long getLatestAckTimestamp()

from getLatestAcknowledgedSubtaskStats()

🟡 long getCheckpointedSize()

from getSummaryStats()

🟡 long getStateSize()

from getSummaryStats()

🟡 long getProcessedDataStats()

from getSummaryStats()

🟡 long getPersistedDataStats()

from getSummaryStats()

🟡 long getEndToEndDuration(long triggerTimestamp)

from getLatestAcknowledgedSubtaskStats()

🟢 SubtaskStateStats[] getSubtaskStats()


🟢 TaskStateStatsSummary getSummaryStats()

dictionary of StatsSummary


🟢 StatsSummary

🟢 long getMinimum()


🟢 long getMaximum()


🟢 long getSum()


🟢 long getCount()



🟢CheckpointProperties

dictionary of boolean values

🟢 SnapshotType getCheckpointType()



🟢 SnapshotType

🟢 boolean isSavepoint()


🟢 String getName()


🟢 SharingFilesStrategy getSharingFilesStrategy()

enum


🟢 RestoredCheckpointStats

🟢 long getCheckpointId()


🟢 CheckpointProperties getProperties()


🟢 long getRestoreTimestamp()


🟢 String getExternalPath()



🟢 ArchivedExecutionConfig

🟢 String getExecutionMode()


🟢 String getRestartStrategyDescription()


🟢 int getMaxParallelism()


🟢 int getParallelism()


🟢 boolean getObjectReuseEnabled()


🟢 long getPeriodicMaterializeIntervalMillis()


🟢 Map<String, String> getGlobalJobParameters()



🔴 ArchivedExecutionJobVertex

🟢 String getName()


🟢 int getParallelism()


🟢 int getMaxParallelism()


🟢 ResourceProfile getResourceProfile()


🟢 JobVertexID getJobVertexId()


🔴 ArchivedExecutionVertex[] getTaskVertices()


🟡 ExecutionState getAggregateState()

from getTaskVertices()

🟢 StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified()



🔴 ArchivedExecutionVertex

🟢 String getTaskNameWithSubtaskIndex()


🟢 int getParallelSubtaskIndex()


🟡 ArchivedExecution getCurrentExecutionAttempt()

From getCurrentExecutions()

🔴 Collection<AccessExecution> getCurrentExecutions()


🟢 ExecutionState getExecutionState()

enum

🟢 long getStateTimestamp(ExecutionState state)


🔴 Optional<ErrorInfo> getFailureInfo()


🟡 TaskManagerLocation getCurrentAssignedResourceLocation()


🟢 ExecutionHistory getExecutionHistory()



🔴 ArchivedExecution

🟢 ExecutionAttemptID getAttemptId()


🟡 int getAttemptNumber()

From getAttemptId()

🟢 long[] getStateTimestamps()


🟢 long[] getStateEndTimestamps()


🟢 ExecutionState getState()


🟡 TaskManagerLocation getAssignedResourceLocation()


🟢 AllocationID getAssignedAllocationID()


🔴 Optional<ErrorInfo> getFailureInfo()


🟡 long getStateTimestamp(ExecutionState state)

From getStateTimestamps()

🟡 long getStateEndTimestamp(ExecutionState state)

From getStateEndTimestamps()

🟢 StringifiedAccumulatorResult[] getUserAccumulatorsStringified() 


🟡 int getParallelSubtaskIndex()

From getAttemptId()

🔴 IOMetrics getIOMetrics()



🟢 ExecutionAttemptID

🟢 ExecutionVertexID getExecutionVertexId()


🟢 private executionGraphId


🟡 JobVertexID getJobVertexId()

From getExecutionVertexId()

🟡 int getSubtaskIndex()

From getExecutionVertexId()

🟢 int getAttemptNumber()



🟢 ExecutionVertexID

🟢 JobVertexID getJobVertexId()


🟢 int getSubtaskIndex()



🔴 IOMetrics

🟢 long getNumRecordsIn()


🟢 long getNumRecordsOut()


🟢 long getNumBytesIn()


🟢 long getNumBytesOut()


🟢 double getAccumulateBusyTime()


🟢 long getAccumulateBackPressuredTime()


🟢 long getAccumulateIdleTime()


🔴 Map<IntermediateResultPartitionID, ResultPartitionBytes> getResultPartitionBytes()

base64

Base64


🟢 ExecutionHistory

🟡 Optional<ArchivedExecution> getHistoricalExecution(int attemptNumber)

From getHistoricalExecutions()

🟢 Collection<ArchivedExecution> getHistoricalExecutions()


🟢 private maxAttemptNumber



🟢 TaskManagerLocation

(can be replaced by ExceptionHistoryEntry.ArchivedTaskManagerLocation)

🟢 ResourceID getResourceID()


🟢 int dataPort()


🟡 InetAddress address()

Not necessary in Read-Only version

🟢 String addressString()


🟢 String getFQDNHostname()


🟢 String getHostname()


🟢 String getNodeId()



🟢 ResourceID

🟢 private String resourceID

String representation of AbstractID

🟢 String getMetadata()