You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Current »


Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/b65r92n8msgdrv6xjsnjwrlwgnxj81j3
Vote threadtba
JIRA

Unable to render Jira issues macro, execution error.

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Storing metadata of completed jobs

Apache Flink provides the JobResultStore and the ExecutionGraphInfoStore as two components to manage the state of globally-terminated jobs.

ExecutionGraphInfoStore 

The ExecutionGraphInfoStore stores an archived version of the ExecutionGraphInfo¹ to make it accessible via the Flink REST API. Flink provides two implementations of this interface:

  • The FileExecutionGraphInfoStore provides a file-based implementation. The data is stored in a temporary folder on the JobManager’s disk. Caches are used to reduce IO operations.
  • The MemoryExecutionGraphInfoStore stores the data in the JobManager’s memory. A cache enables the user to define the number of entries and the TTL of each entry.

In either case, the information about completed jobs will be lost (the web UI won’t list those anymore) in case of a JobManager failover or a Flink cluster shutdown.

¹ Note: The ExecutionGraphInfoStore saves the ExecutionGraphInfo which consists of the ArchivedExecutionGraph and the exception history for now. The JobResult can be derived from the ArchivedExecutionGraph.

JobResultStore

The JobResultStore was introduced with FLIP-194 to mitigate effects of a JobManager failover while cleaning up a globally-terminated job. The goal was to have the information about the job’s terminal state available even after a failover to avoid rerunning the job (FLINK-11813).

Flink provides two implementations of the JobResultStore:

  • The EmbeddedJobResultStore stores the data in-memory. This covers scenarios where the failover of jobs isn’t desired/necessary (i.e. non-HA cases). The data is lost in case of a JobManager failover or Flink cluster shutdown.
  • The FileSystemBasedJobResultStore stores the data in a DFS (e.g. S3). The path can be configured through Flink’s configuration.

There are two ways to handle the lifecycle of the JobResultStore entries:

  • Default behavior: The entry is removed by Flink automatically as soon as the job’s metadata was cleaned up after reaching a globally-terminal state.
  • Optional for the FileSystemBasedJobResultStore: The entry is kept. In that case, Flink "hands over" the ownership of the corresponding file to the user. The user has to take care to remove artifacts that are not needed anymore. This only works with the file-based implementation.

The CompletedJobStore as a single store for completed jobs

The two components introduced in the previous section have similar use-cases. The difference lies in the type of information that’s stored and the durability of this information. This FLIP proposes to unite both purposes in a single component CompletedJobStore.

The benefits would be:

  • The REST API (and the web UI) will be able to list completed jobs even after a JobManager failover.
  • The user has more control over what information should be kept. Flink could provide a TTL to allow automatic removal of entries after a certain grace period. But files can be (re)moved by the users to remove individual jobs from the cluster’s job overview.
  • Additional information (e.g. the latest checkpoint/state) is persisted in the ArchivedExecutionGraph that would survive a JobManager failover or Flink cluster restart.
  • The HistoryServer isn’t necessary anymore to have access to completed jobs (at least for the same Flink version).

Persisting Latest State

There have been discussions about this in the past: The reference to the latest checkpoint of a globally-terminated job can be lost if the JobManager fails while cleaning up the job. The ArchivedExecutionGraph will be written to the file-based ExecutionGraphInfoStore but the data is not accessible anymore. Persisting (at least) the checkpoint statistics along the JobResult in the JobResultStore would help avoiding these scenarios. The Kubernetes Operator (and any other external systems relying on the checkpoint metadata) would benefit from this (see the kind of related Jira issue FLINK-27569).

Public Interfaces

No API was annotated as @Public or @PublicEvolving, yet. Both the ExecutionGraphInfoStore and the JobResultStore interfaces are considered @Internal

The content of the JobResultStore files are accessible by the user, though. The file content is not documented (that needs to change) but should be considered public API (we cannot be sure that this data isn’t used by users in some way). The format of the file content already supports versioning which would allow us to provide fallback behavior.

Proposed Changes

Interfaces & Methods

ExecutionGraphInfoStore

JobResultStore

Calling Code Location

Migration Plan

size() : int


not used

Can be removed.

get(JobID) : ExecutionGraphInfo


The graph’s JobStatus² is used within Dispatcher#cancel¹ to decide whether an error should be thrown. 

Access JobDetailsStore to retrieve the JobStatus through the JobDetails of the corresponding job.

Dispatcher#requestExecutionGraphInfo¹ returns the ExecutionGraphInfo.

The method can use CompletedJobStore to retrieve the ExecutionGraphInfo.

Dispatcher#requestJobResult¹ derives the JobResult from the ExecutionGraphInfo.

The method can use CompletedJobStore to retrieve the JobResult through the ExecutionGraphInfo.

put(ExecutionGraphInfo) : void

createDirtyResultAsync(JobResultEntry) : CompletableFuture<Void>

Both methods are called from within the Dispatcher as soon as the job reaches a globally-terminal state.

Merging of the two methods into addDirtyJobAsync(ExecutionGraphInfo) : CompletableFuture<Void> 

getStoredJobsOverview() : JobsOverview


Dispatcher#requestClusterOverview

The data is available through the JobDetailsStore through the get*Count() methods. The JobsOverview instantiation will happen within the Dispatcher instead.

getAvailableJobDetails() : Collection<JobDetails>


Dispatcher#requestMultipleJobDetails

Moves into JobDetailsStore.

getAvailableJobDetails(JobID) : JobDetails


Dispatcher#requestJobStatus

Moves into JobDetailsStore.


markResultAsCleanAsync(JobID) : CompletableFuture<Void>

At the end of the cleanup process within the Dispatcher

CompletableJobStore#markJobAsCleanAsync(JobID) : CompletableFuture<Void>


hasJobResultEntryAsync(JobID) : CompletableFuture<Boolean>

Dispatcher for checking whether the job is in globally-terminal state

JobMasterServiceLeadershipRunner to check whether the job is in globally-terminal state when processing a failover

CompletableJobStore#isGloballyTerminated(JobID) : CompletableFuture<Boolean>


hasDirtyJobResultEntryAsync(JobID) : CompletableFuture<Boolean>

At the end of the cleanup process within the Dispatcher

CompletableJobStore#isCleanedAsync(JobID) : CompletableFuture<Boolean>


hasCleanJobResultEntryAsync(JobID) : CompletableFuture<Boolean>

At the end of the cleanup process within the Dispatcher

CompletableJobStore#isDirty(JobID) : CompletableFuture<Boolean>


getDirtyResults() : Set<JobResult>

Before instantiating the Dispatcher to filter the jobs that are meant to be recovered

CompletableJobStore#getDirtyResults() : Set<ExecutionGraphInfo>

¹ if no JobManagerRunner is registered for the given JobID anymore
² can be derived from ArchivedExecutionGraph

Summary of the changes:

  • CompletedJobStore: Takes care of the failover scenario and stores the actual data. The data might live in-memory all the time. A file-based and an in-memory implementation will be provided.
  • JobDetailsStore: Takes care of the job details which live in memory. The store will also take care of triggering the final deletion of the CompletedJobStore entry if a TTL is set. The file-based CompletedJobStore will provide a in-memory version of the JobDetailsStore (i.e. the JobDetails will be created along the ExecutionGraphInfo and will live in memory as long as the CompletedJobStore entry isn't removed). In contrast, the in-memory CompletedJobStore will come with a JobDetailsStore implementation that accesses the in-memory ExecutionGraphInfo and creates the JobDetails on the fly.

The sequence diagram below visualizes the process of creating and removing the entries.

Serialization

The ExecutionGraphInfoStore utilizes Java’s object serialization. It dumps the ExecutionGraphInfo (which includes the ArchivedExecutionGraph and the exception history) into a file. In contrast, the JobResultEntry is currently serialized as JSON containing the JobResult and a version (i.e. the version of the schema). The JobResult is used in the cleanup process to generate a sparse ArchivedExecutionGraph to serve to the REST API while cleaning up after failover. The version was added to allow changes to the format.

Content-wise, we can merge the two representations because the content of the JobResultEntry can be extracted from the ArchivedExecutionGraphFormat-wise, we run into issues because the ArchivedExecutionGraph cannot be fully represented in a human-readable format like JSON without losing the ability to deserialize the graph. Going through the ExecutionGraphInfo structure, the following parts have been identified as problematic for providing a human-readable JSON representation:

  • Accumulators: The ArchivedExecutionGraph comes with the custom Accumulators that were attached to the JobExecutionResult. There is a “stringified” representation of Accumulators which would allow us to add “something” to the JSON representation that's human-readable. But that representation cannot be used for deserialization.
  • ErrorInfo is used in two places (exception history and ArchivedExecution). It stores the timestamp of when an error appeared and the error’s cause as a Throwable. The Throwable is wrapped in a SerializedThrowable to make it Serializable. The SerializedThrowable cannot be JSON-serialized and used for deserialization again.
  • IOMetrics cannot be serialized in JSON in a human-readable format because of the ResultPartitionBytes that come with it.

You can find a detailed list of the ExecutionGraphInfo data structure in the Appendix. But essentially, we could provide a JSON representation of the data structure by keeping the three items mentioned above as Base64-encoded Strings (and provide a human-readable version if possible).

JSON Format

Serializing the ExecutionGraphInfo in JSON allows users to access all the data without Flink binaries. The structure of the ExecutionGraphInfo becomes kind of public, though, which makes it harder to change things in the future.

Binary Format

We could also move away from JSON as the container format. Instead, we use the JDK serialization (analogously to what the ExecutionGraphInfoStore does right now). The version can be encoded in the file name (no specified version would imply the pre-FLIP-360 version). The flaw of this approach is that we always have to use Flink code (from a specific Flink version) to parse the content. This can be achieved through the Flink client. The ClusterClient could provide the API to access those files and to generate the JSON without starting a dedicated cluster.

Persisting the Checkpoint statistics only

As mentioned in the Motivation section of this FLIP, having the latest checkpoint’s external storage path being persisted in the JobResultStore is good enough to cover scenarios where the JobManager disappears too early. One could argue that we could keep the two components and only expose the checkpoint statistics as part of the JobResultStore entry. That would allow us to keep the ArchivedExecutionGraph data structure internal.

Readability & Backwards Compatibility

Both approaches have the flaw that they are not backwards compatible. A file generated by an older version might not be parseable by a new version of Flink if the ArchivedExecutionGraph data structure was modified. We have to provide an deserializer implementation for each format version. A fallback option would be to persist the JobResult individually as part of the container format. That way, we can always have the fallback behavior that is currently implemented. Currently, we create a sparse ArchivedExecutionGraph based on the JobResult if no further information is given.

CompletedJobStore Entry Lifecycle Management/Caching

The following table illustrates the state of entry lifecycle management and caching in the current implementations of the ExecutionGraphInfoStore and the JobResultStore:



ExecutionGraphInfoStore

JobResultStore

In-Memory

Caching

Utilizes cache to maintain the lifecycle of the entries

No caching involved

Lifecycle Mgmt.

TTL and cache capacity can be specified in Flink’s config

Data stays forever in memory (FLINK-32806)

File-based

Caching

Utilizes cache to reduce IO

No caching involved

Lifecycle Mgmt.

TTL and cache capacity can be specified in Flink’s config. 

A separate cache is maintained for JobDetails.

Data is either removed when the job is globally-terminated and its artifacts properly cleaned up or by the user


The lifecycle of the entries should follow the current ExecutionGraphInfoStore implementations. The current JobResultStore behavior can be mimicked by setting the expiration time for an entry to 0.

The in-memory implementation should utilize caching (analogously to the in-memory implementation of the ExecutionGraphInfoStore) to reduce the footprint. But we should add some log messages to inform the user that a completed job disappeared because of its expiration time.

Configuration parameters

Old Parameter

Description

Migration Plan

job-result-store.delete-on-commit (default: true)

Used for determining when the JobResultStore entry should be deleted.

Replaced by jobstore.expiration-time¹ (true=0, false=Integer.MAX_VALUE)

job-result-store.storage-path (default: ${haStorage}/job-result-store/${clusterId})

DFS location of the files

Deprecated and replaced by: jobstore.storage-path (default: ${haStorage}/completed-job-store/{$clusterId})

jobstore.cache-size¹

Number of clean jobs for which the data shall be kept

kept

jobstore.expiration-time¹

TTL for clean job entries

kept

jobstore.max-capacity¹

Capacity for the ExecutionGraphInfo to be kept in-memory

kept

jobstore.type¹

File-based (File) or memory based (Memory)

kept

¹ jobstore.* could be also renamed into completed-job-store.* since it’s less generic. I’m not sure whether there is some desire to keep it like that for the sake of consistency.

Documentation

The following items will be updated:

  • JobResultStore documentation in the High Availability section of the documentation (link)
  • The configuration documentation will be updated according to the changes mentioned above.

Compatibility, Deprecation, and Migration Plan

Artifacts

There is no real concern in terms of backwards compatibility for the ExecutionGraphInfoStore, because its data is lost when shutting down the Flink cluster. Old JobResultStore entries can be still served due to the versioning of the JobResultStore entry format.

Configuration

The JobResultStore-related configuration parameters will become deprecated and replaced by corresponding jobstore.* (or completed-job-store.*) configuration parameters as described above.

Changing the default value for job-result-store.storage-path to jobstore.storage-path/completed-job-store.storage-path would require us to first look for the old default value and only use the new default value if the old one doesn’t exist. It has to be considered that checking for the non-existence of values in object storage can be expensive for large buckets. This check is only done once when initializing the cluster.

Test Plan

  • The existing tests should continue to work with the refactored approach.
  • Additional unit tests might be added to cover new functionality.
  • An e2e test might be added to verify that deleting the JobResultStore entry makes the system behave as expected.

Rejected Alternatives

Investigate merging the HistoryServer into the new component

Having a permanent storage for the ArchivedExecutionGraph might enable us to retrieve all REST requests through the new component. This could allow us to remove the HistoryServer.

The HistoryServer gives us the capability to access jobs from older Flink versions without worrying about the backwards compatibility of the ArchivedExecutionGraph. Working on such a topic would require additional investigations which is out-of-scope for this FLIP.

Appendix

ExecutionGraphInfo Data Structure

The following tables represent the components of the ExecutionGraphInfo structure. Here’s a summary of the color scheme:

  • 🟢 refers to public members that can be serialized and deserialized in JSON. 
  • 🟡 members don’t need to be represented in JSON because they can be derived from other members.
  • 🔴 indicates that the type or some parts of the type are not JSON-deserializable and would require (at least partially) Base64 encoding.

Several data structures (e.g. StatsSummarySnapshot) mix up the mutable and immutable version. This FLIP might also include coming up with a clear separation of immutable instances (included in the ArchivedExecutionGraph) and mutable instances (which are part of the ExecutionGraph) using the Archiveable interface.

🔴 ExecutionGraphInfo

🟡 JobID getJobId()

From getArchivedExecutionGraph()

🔴 ArchivedExecutionGraph getArchivedExecutionGraph()


🔴 Iterable<RootExceptionHistoryEntry> getExceptionHistory()



🔴 ErrorInfo

🟢 long getTimestamp()


🔴 SerializedThrowable getException()

Requires Base64 encoding but allows human-readable representation


🔴 ExceptionHistoryEntry (extends ErrorInfo)

🟢 String getFailingTaskName()


🟢 ArchivedTaskManagerLocation getTaskManagerLocation()

TaskManagerLocation does contain InetAddress which is not needed in the ArchivedExecutionGraph (because no processing takes place)

🟡 Map<String, String> getFailureLabels()

From getFailureLabelsFuture() 

🟢 CompletableFuture<Map<String, String>> getFailureLabelsFuture()

We have to block and wait for the future to complete here

🟡 isGlobal()

From getFailingTaskName()


🔴 RootExceptionHistoryEntry (extends ExceptionHistoryEntry)

🔴 Iterable<ExceptionHistoryEntry> getConcurrentExceptions()



🟢 JobStatusProvider

🟢 JobStatus getState()

enum

🟢 long getStatusTimestamp(JobStatus status)

JobStatus to long dictionary


ArchivedExecutionGraph (extends JobStatusProvider)

🟢 String getJsonPlan()


🟢 JobID getJobID()

AbstractID

🟢 String getJobName()


🟢 JobStatus getState()

enum

🔴 ErrorInfo getFailureInfo()


🟢 long getStatusTimestamp(JobStatus status)

JobStatus to long dictionary

🟢 boolean isStoppable()


🟢 Optional<String> getStateBackendName()


🟢 Optional<String> getCheckpointStorageName()


🟢 TernaryBoolean isChangelogStateBackendEnabled()


🟢 Optional<String> getChangelogStorageName()


🟢 StringifiedAccumulatorResult[] getAccumulatorResultsStringified()

Could be used as a human-readable version of the accumulator

🔴 Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorsSerialized()

Base64 encoding necessary

🟢 CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration()

essentially immutable dictionary

🟢 CheckpointStatsSnapshot getCheckpointStatsSnapshot()


🟢 ArchivedExecutionConfig getArchivedExecutionConfig()


🟡 AccessExecutionJobVertex getJobVertex(JobVertexID id)

From getAllVertices()

🔴 Map<JobVertexID, ? extends AccessExecutionJobVertex> getAllVertices()


🟡 Iterable<? extends AccessExecutionJobVertex> getVerticesTopologically()

Order can be serialized through a JobVertexID list

🟡 Iterable<? extends AccessExecutionVertex> getAllExecutionVertices()

From getVerticesTopologically() and getAllVertices()


🟢 StringifiedAccumulatorResult

🟢 String getName()


🟢 String getType()


🟢 String getValue()



🟢 CheckpointStatsSnapshot

🟢 CheckpointStatsCounts getCounts()

mutable dictionary (which could be made immutable for the archived version) of int/long values

🟢 CompletedCheckpointStatsSummarySnapshot getSummaryStats()


🟢 CheckpointStatsHistory getHistory()


🟢 RestoredCheckpointStats getLatestRestoredCheckpoint()



🟢 CompletedCheckpointStatsSummarySnapshot

🟢 StatsSummarySnapshot getEndToEndDurationStats()


🟢 StatsSummarySnapshot getPersistedDataStats()


🟢 StatsSummarySnapshot getProcessedDataStats()


🟢 StatsSummarySnapshot getStateSizeStats()


🟢 StatsSummarySnapshot getCheckpointedSize()



🟢 StatsSummarySnapshot

🟢 long getMinimum()


🟢 long getMaximum()


🟢 long getSum()


🟢 long getCount()


🟢 long getAverage()


🟢 double getQuantile(double quantile)



🟢 CheckpointStatsHistory

🟢 List<AbstractCheckpointStats> getCheckpoints()


🟡 AbstractCheckpointStats getCheckpointById(long checkpointId)

From getCheckpoints()

🟢 CompletedCheckpointStats getLatestCompletedCheckpoint()


🟢 FailedCheckpointStats getLatestFailedCheckpoint()


🟢 CompletedCheckpointStats getLatestSavepoint()



🟢 AbstractCheckpointStats

🟢 CheckpointStatsStatus getStatus()

enum

🟢 int getNumberOfAcknowledgedSubtasks()


🟢 long getStateSize()


🟢 long getCheckpointedSize()


🟢 long getProcessedData()


🟢 long getPersistedData()


🟢 boolean isUnalignedCheckpoint()


🟢 SubtaskStateStats getLatestAcknowledgedSubtaskStats()

essentially immutable dictionary

🟢 long getCheckpointId()


🟢 long getTriggerTimestamp()


🟢 CheckpointProperties getProperties()


🟢 int getNumberOfSubtasks()


🟢 TaskStateStats getTaskStateStats(JobVertexID jobVertexId)


🟢 Collection<TaskStateStats> getAllTaskStateStats()


🟡 long getLatestAckTimestamp()

from getLatestAcknowledgedSubtaskStats()

🟡 long getEndToEndDuration()

from getLatestAcknowledgedSubtaskStats()


🟢 CompletedCheckpointStats (extends AbstractCheckpointStats)

🟢 boolean isDiscarded()


🟢 String getExternalPath()



🟢 PendingCheckpointStats (extends AbstractCheckpointStats)


🟢 FailedCheckpointStats (extends PendingCheckpointStats)

🟡 long getEndToEndDuration()

from getFailureTimestamp() and getTriggerTimestamp()

🟢 long getFailureTimestamp()


🟢 String getFailureMessage()



🟢 TaskStateStats

🟢 JobVertexID getJobVertexId()


🟡 int getNumberOfSubtasks()

from getSubtaskStats()

🟢 int getNumberOfAcknowledgedSubtasks()


🟢 SubtaskStateStats getLatestAcknowledgedSubtaskStats()


🟡 long getLatestAckTimestamp()

from getLatestAcknowledgedSubtaskStats()

🟡 long getCheckpointedSize()

from getSummaryStats()

🟡 long getStateSize()

from getSummaryStats()

🟡 long getProcessedDataStats()

from getSummaryStats()

🟡 long getPersistedDataStats()

from getSummaryStats()

🟡 long getEndToEndDuration(long triggerTimestamp)

from getLatestAcknowledgedSubtaskStats()

🟢 SubtaskStateStats[] getSubtaskStats()


🟢 TaskStateStatsSummary getSummaryStats()

dictionary of StatsSummary


🟢 StatsSummary

🟢 long getMinimum()


🟢 long getMaximum()


🟢 long getSum()


🟢 long getCount()



🟢 CheckpointProperties

dictionary of boolean values

🟢 SnapshotType getCheckpointType()



🟢 SnapshotType

🟢 boolean isSavepoint()


🟢 String getName()


🟢 SharingFilesStrategy getSharingFilesStrategy()

enum


🟢 RestoredCheckpointStats

🟢 long getCheckpointId()


🟢 CheckpointProperties getProperties()


🟢 long getRestoreTimestamp()


🟢 String getExternalPath()



🟢 ArchivedExecutionConfig

🟢 String getExecutionMode()


🟢 String getRestartStrategyDescription()


🟢 int getMaxParallelism()


🟢 int getParallelism()


🟢 boolean getObjectReuseEnabled()


🟢 long getPeriodicMaterializeIntervalMillis()


🟢 Map<String, String> getGlobalJobParameters()



🔴 ArchivedExecutionJobVertex

🟢 String getName()


🟢 int getParallelism()


🟢 int getMaxParallelism()


🟢 ResourceProfile getResourceProfile()


🟢 JobVertexID getJobVertexId()


🔴 ArchivedExecutionVertex[] getTaskVertices()


🟡 ExecutionState getAggregateState()

from getTaskVertices()

🟢 StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified()



🔴 ArchivedExecutionVertex

🟢 String getTaskNameWithSubtaskIndex()


🟢 int getParallelSubtaskIndex()


🟡 ArchivedExecution getCurrentExecutionAttempt()

From getCurrentExecutions()

🔴 Collection<AccessExecution> getCurrentExecutions()


🟢 ExecutionState getExecutionState()

enum

🟢 long getStateTimestamp(ExecutionState state)


🔴 Optional<ErrorInfo> getFailureInfo()


🟡 TaskManagerLocation getCurrentAssignedResourceLocation()


🟢 ExecutionHistory getExecutionHistory()



🔴 ArchivedExecution

🟢 ExecutionAttemptID getAttemptId()


🟡 int getAttemptNumber()

From getAttemptId()

🟢 long[] getStateTimestamps()


🟢 long[] getStateEndTimestamps()


🟢 ExecutionState getState()


🟡 TaskManagerLocation getAssignedResourceLocation()


🟢 AllocationID getAssignedAllocationID()


🔴 Optional<ErrorInfo> getFailureInfo()


🟡 long getStateTimestamp(ExecutionState state)

From getStateTimestamps()

🟡 long getStateEndTimestamp(ExecutionState state)

From getStateEndTimestamps()

🟢 StringifiedAccumulatorResult[] getUserAccumulatorsStringified() 


🟡 int getParallelSubtaskIndex()

From getAttemptId()

🔴 IOMetrics getIOMetrics()



🟢 ExecutionAttemptID

🟢 ExecutionVertexID getExecutionVertexId()


🟢 private executionGraphId


🟡 JobVertexID getJobVertexId()

From getExecutionVertexId()

🟡 int getSubtaskIndex()

From getExecutionVertexId()

🟢 int getAttemptNumber()



🟢 ExecutionVertexID

🟢 JobVertexID getJobVertexId()


🟢 int getSubtaskIndex()



🔴 IOMetrics

🟢 long getNumRecordsIn()


🟢 long getNumRecordsOut()


🟢 long getNumBytesIn()


🟢 long getNumBytesOut()


🟢 double getAccumulateBusyTime()


🟢 long getAccumulateBackPressuredTime()


🟢 long getAccumulateIdleTime()


🔴 Map<IntermediateResultPartitionID, ResultPartitionBytes> getResultPartitionBytes()

Base64


🟢 ExecutionHistory

🟡 Optional<ArchivedExecution> getHistoricalExecution(int attemptNumber)

From getHistoricalExecutions()

🟢 Collection<ArchivedExecution> getHistoricalExecutions()


🟢 private maxAttemptNumber



🟢 TaskManagerLocation

(can be replaced by ExceptionHistoryEntry.ArchivedTaskManagerLocation)

🟢 ResourceID getResourceID()


🟢 int dataPort()


🟡 InetAddress address()

Not necessary in Read-Only version

🟢 String addressString()


🟢 String getFQDNHostname()


🟢 String getHostname()


🟢 String getNodeId()



🟢 ResourceID

🟢 private String resourceID

String representation of AbstractID

🟢 String getMetadata()


  • No labels