Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

Interfaces & Methods

ExecutionGraphInfoStore

JobResultStore

Calling Code Location

Migration Plan

size() : int


not used

Can be removed.

get(JobID) : ExecutionGraphInfo


Dispatcher#cancel ¹

The graph’s JobStatus ² is used to decide whether an error should be thrown

Dispatcher#requestExecutionGraphInfo ¹

It can be replaced by the CompletedJobStore to retrieve the ExecutionGraphInfo

Dispatcher#requestJobResult ¹

Return type JobResult ²

put(ExecutionGraphInfo) : void

createDirtyResult(JobResultEntry) : void

Both methods are called from within the Dispatcher as soon as the job reaches a globally-terminal state.

Used JobResultEntry is also just derived from AccessExecutionGraph². Both methods can be merged into a single one to create the entry with the relevant information.

getStoredJobsOverview() : JobsOverview


Dispatcher#requestClusterOverview

Data can be stored in a file in the JobResultStore folder to persist the information.

getAvailableJobDetails() : Collection<JobDetails>


Dispatcher#requestMultipleJobDetails

Return type JobDetails ²

getAvailableJobDetails(JobID) : JobDetails


Dispatcher#requestJobStatus

Return type JobDetails  ²


markResultAsClean(JobID) : void

At the end of the cleanup process within the Dispatcher

Can be kept as is


hasJobResultEntry(JobID) : boolean

Dispatcher for checking whether the job is in globally-terminal state

JobMasterServiceLeadershipRunner to check whether the job is in globally-terminal state when processing a failover

Can be kept as is


hasDirtyJobResultEntry(JobID) : boolean

At the end of the cleanup process within the Dispatcher

Can be kept as is


hasCleanJobResultEntry(JobID) : boolean

At the end of the cleanup process within the Dispatcher

Can be kept as is


getDirtyResults() : Set<JobResult>

Before instantiating the Dispatcher to filter the jobs that are meant to be recovered

Can be kept as is

¹ if no JobManagerRunner is registered for the given JobID anymore
² can be derived from ArchivedExecutionGraph

...

The following table illustrates the state of entry lifecycle management and caching in the current implementations of the ExecutionGraphInfoStore and the JobResultStore:



ExecutionGraphInfoStore

JobResultStore

In-Memory

Caching

Utilizes cache to maintain the lifecycle of the entries

No caching involved

Lifecycle Mgmt.

TTL and cache capacity can be specified in Flink’s config

Data stays forever in memory (FLINK-32806)

File-based

Caching

Utilizes cache to reduce IO

No caching involved

Lifecycle Mgmt.

TTL and cache capacity can be specified in Flink’s config. 

A separate cache is maintained for JobDetails.

Data is either removed when the job is globally-terminated and its artifacts properly cleaned up or by the user


The lifecycle of the entries should follow the current ExecutionGraphInfoStore implementations. The current JobResultStore behavior can be mimicked by setting the expiration time for an entry to 0.

The in-memory implementation should utilize caching (analogously to the in-memory implementation of the ExecutionGraphInfoStore) to reduce the footprint. But we should add some log messages to inform the user that a completed job disappeared because of its expiration time.

Configuration parameters

Old

Description

New

job-result-store.delete-on-commit (default: true)

Used for determining when the JobResultStore entry should be deleted.

Replaced by jobstore.expiration-time¹ (true=0, false=Integer.MAX_VALUE)

job-result-store.storage-path (default: ${haStorage}/job-result-store/${clusterId})

DFS location of the files

Deprecated and replaced by: jobstore.storage-path (default: ${haStorage}/completed-job-store/{$clusterId})

jobstore.cache-size¹

Number of clean jobs for which the data shall be kept

kept

jobstore.expiration-time¹

TTL for clean job entries

kept

jobstore.max-capacity¹

Capacity for the ExecutionGraphInfo to be kept in-memory

kept

jobstore.type¹

File-based (File) or memory based (Memory)

kept

¹ jobstore.* could be also renamed into completed-job-store.* since it’s less generic. I’m not sure whether there is some desire to keep it like that for the sake of consistency.

...

Several data structures (e.g. StatsSummarySnapshot) mix up the mutable and immutable version. This FLIP might also include coming up with a clear separation of immutable instances (included in the ArchivedExecutionGraph) and mutable instances (which are part of the ExecutionGraph) using the Archiveable interface.

🔴 ExecutionGraphInfo

🟡 JobID getJobId()

From getArchivedExecutionGraph()

🔴 ArchivedExecutionGraph getArchivedExecutionGraph()


🔴 Iterable<RootExceptionHistoryEntry> getExceptionHistory()



🔴 ErrorInfo

🟢 long getTimestamp()


🔴 SerializedThrowable getException()

Requires Base64 encoding but allows human-readable representation


🔴 ExceptionHistoryEntry (extends ErrorInfo)

🟢 String getFailingTaskName()


🟢 ArchivedTaskManagerLocation getTaskManagerLocation()

TaskManagerLocation does contain InetAddress which is not needed in the ArchivedExecutionGraph (because no processing takes place)

🟡 Map<String, String> getFailureLabels()

From getFailureLabelsFuture() 

🟢 CompletableFuture<Map<String, String>> getFailureLabelsFuture()

We have to block and wait for the future to complete here

🟡 isGlobal()

From getFailingTaskName()



🔴 RootExceptionHistoryEntry (extends ExceptionHistoryEntry)

🔴 Iterable<ExceptionHistoryEntry> getConcurrentExceptions()



🟢 JobStatusProvider

🟢 JobStatus getState()

enum

🟢 long getStatusTimestamp(JobStatus status)

JobStatus to long dictionary


ArchivedExecutionGraph (extends JobStatusProvider)

🟢 String getJsonPlan()


🟢 JobID getJobID()

AbstractID

🟢 String getJobName()


🟢 JobStatus getState()

enum

🔴 ErrorInfo getFailureInfo()


🟢 long getStatusTimestamp(JobStatus status)

JobStatus to long dictionary

🟢 boolean isStoppable()


🟢 Optional<String> getStateBackendName()


🟢 Optional<String> getCheckpointStorageName()


🟢 TernaryBoolean isChangelogStateBackendEnabled()


🟢 Optional<String> getChangelogStorageName()


🟢 StringifiedAccumulatorResult[] getAccumulatorResultsStringified()

Could be used as a human-readable version of the accumulator

🔴 Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorsSerialized()

Base64 encoding necessary

🟢 CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration()

essentially immutable dictionary

🟢 CheckpointStatsSnapshot getCheckpointStatsSnapshot()


🟢 ArchivedExecutionConfig getArchivedExecutionConfig()


🟡 AccessExecutionJobVertex getJobVertex(JobVertexID id)

From getAllVertices()

🔴 Map<JobVertexID, ? extends AccessExecutionJobVertex> getAllVertices()


🟡 Iterable<? extends AccessExecutionJobVertex> getVerticesTopologically()

Order can be serialized through a JobVertexID list

🟡 Iterable<? extends AccessExecutionVertex> getAllExecutionVertices()

From getVerticesTopologically() and getAllVertices()


🟢 StringifiedAccumulatorResult

🟢 String getName()


🟢 String getType()


🟢 String getValue()



🟢 CheckpointStatsSnapshot

🟢 CheckpointStatsCounts getCounts()

mutable dictionary (which could be made immutable for the archived version) of int/long values

🟢 CompletedCheckpointStatsSummarySnapshot getSummaryStats()


🟢 CheckpointStatsHistory getHistory()


🟢 RestoredCheckpointStats getLatestRestoredCheckpoint()



🟢 CompletedCheckpointStatsSummarySnapshot

🟢 StatsSummarySnapshot getEndToEndDurationStats()


🟢 StatsSummarySnapshot getPersistedDataStats()


🟢 StatsSummarySnapshot getProcessedDataStats()


🟢 StatsSummarySnapshot getStateSizeStats()


🟢 StatsSummarySnapshot getCheckpointedSize()



🟢 StatsSummarySnapshot

🟢 long getMinimum()


🟢 long getMaximum()


🟢 long getSum()


🟢 long getCount()


🟢 long getAverage()


🟢 double getQuantile(double quantile)



🟢 CheckpointStatsHistory

🟢 List<AbstractCheckpointStats> getCheckpoints()


🟡 AbstractCheckpointStats getCheckpointById(long checkpointId)

From getCheckpoints()

🟢 CompletedCheckpointStats getLatestCompletedCheckpoint()


🟢 FailedCheckpointStats getLatestFailedCheckpoint()


🟢 CompletedCheckpointStats getLatestSavepoint()



🟢 AbstractCheckpointStats

🟢 CheckpointStatsStatus getStatus()

enum

🟢 int getNumberOfAcknowledgedSubtasks()


🟢 long getStateSize()


🟢 long getCheckpointedSize()


🟢 long getProcessedData()


🟢 long getPersistedData()


🟢 boolean isUnalignedCheckpoint()


🟢 SubtaskStateStats getLatestAcknowledgedSubtaskStats()

essentially immutable dictionary

🟢 long getCheckpointId()


🟢 long getTriggerTimestamp()


🟢 CheckpointProperties getProperties()


🟢 int getNumberOfSubtasks()


🟢 TaskStateStats getTaskStateStats(JobVertexID jobVertexId)


🟢 Collection<TaskStateStats> getAllTaskStateStats()


🟡 long getLatestAckTimestamp()

from getLatestAcknowledgedSubtaskStats()

🟡 long getEndToEndDuration()

from getLatestAcknowledgedSubtaskStats()



🟢 CompletedCheckpointStats (extends AbstractCheckpointStats)

🟢 boolean isDiscarded()


🟢 String getExternalPath()



🟢 PendingCheckpointStats (extends AbstractCheckpointStats)


🟢 FailedCheckpointStats (extends PendingCheckpointStats)

🟡 long getEndToEndDuration()

from getFailureTimestamp() and getTriggerTimestamp()

🟢 long getFailureTimestamp()


🟢 String getFailureMessage()



🟢 TaskStateStats

🟢 JobVertexID getJobVertexId()


🟡 int getNumberOfSubtasks()

from getSubtaskStats()

🟢 int getNumberOfAcknowledgedSubtasks()


🟢 SubtaskStateStats getLatestAcknowledgedSubtaskStats()


🟡 long getLatestAckTimestamp()

from getLatestAcknowledgedSubtaskStats()

🟡 long getCheckpointedSize()

from getSummaryStats()

🟡 long getStateSize()

from getSummaryStats()

🟡 long getProcessedDataStats()

from getSummaryStats()

🟡 long getPersistedDataStats()

from getSummaryStats()

🟡 long getEndToEndDuration(long triggerTimestamp)

from getLatestAcknowledgedSubtaskStats()

🟢 SubtaskStateStats[] getSubtaskStats()


🟢 TaskStateStatsSummary getSummaryStats()

dictionary of StatsSummary


🟢 StatsSummary

🟢 long getMinimum()


🟢 long getMaximum()


🟢 long getSum()


🟢 long getCount()



🟢CheckpointProperties

dictionary of boolean values

🟢 SnapshotType getCheckpointType()



🟢 SnapshotType

🟢 boolean isSavepoint()


🟢 String getName()


🟢 SharingFilesStrategy getSharingFilesStrategy()

enum


🟢 RestoredCheckpointStats

🟢 long getCheckpointId()


🟢 CheckpointProperties getProperties()


🟢 long getRestoreTimestamp()


🟢 String getExternalPath()



🟢 ArchivedExecutionConfig

🟢 String getExecutionMode()


🟢 String getRestartStrategyDescription()


🟢 int getMaxParallelism()


🟢 int getParallelism()


🟢 boolean getObjectReuseEnabled()


🟢 long getPeriodicMaterializeIntervalMillis()


🟢 Map<String, String> getGlobalJobParameters()



🔴 ArchivedExecutionJobVertex

🟢 String getName()


🟢 int getParallelism()


🟢 int getMaxParallelism()


🟢 ResourceProfile getResourceProfile()


🟢 JobVertexID getJobVertexId()


🔴 ArchivedExecutionVertex[] getTaskVertices()


🟡 ExecutionState getAggregateState()

from getTaskVertices()

🟢 StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified()



🔴 ArchivedExecutionVertex

🟢 String getTaskNameWithSubtaskIndex()


🟢 int getParallelSubtaskIndex()


🟡 ArchivedExecution getCurrentExecutionAttempt()

From getCurrentExecutions()

🔴 Collection<AccessExecution> getCurrentExecutions()


🟢 ExecutionState getExecutionState()

enum

🟢 long getStateTimestamp(ExecutionState state)


🔴 Optional<ErrorInfo> getFailureInfo()


🟡 TaskManagerLocation getCurrentAssignedResourceLocation()


🟢 ExecutionHistory getExecutionHistory()



🔴 ArchivedExecution

🟢 ExecutionAttemptID getAttemptId()


🟡 int getAttemptNumber()

From getAttemptId()

🟢 long[] getStateTimestamps()


🟢 long[] getStateEndTimestamps()


🟢 ExecutionState getState()


🟡 TaskManagerLocation getAssignedResourceLocation()


🟢 AllocationID getAssignedAllocationID()


🔴 Optional<ErrorInfo> getFailureInfo()


🟡 long getStateTimestamp(ExecutionState state)

From getStateTimestamps()

🟡 long getStateEndTimestamp(ExecutionState state)

From getStateEndTimestamps()

🟢 StringifiedAccumulatorResult[] getUserAccumulatorsStringified() 


🟡 int getParallelSubtaskIndex()

From getAttemptId()

🔴 IOMetrics getIOMetrics()



🟢 ExecutionAttemptID

🟢 ExecutionVertexID getExecutionVertexId()


🟢 private executionGraphId


🟡 JobVertexID getJobVertexId()

From getExecutionVertexId()

🟡 int getSubtaskIndex()

From getExecutionVertexId()

🟢 int getAttemptNumber()



🟢 ExecutionVertexID

🟢 JobVertexID getJobVertexId()


🟢 int getSubtaskIndex()



🔴 IOMetrics

🟢 long getNumRecordsIn()


🟢 long getNumRecordsOut()


🟢 long getNumBytesIn()


🟢 long getNumBytesOut()


🟢 double getAccumulateBusyTime()


🟢 long getAccumulateBackPressuredTime()


🟢 long getAccumulateIdleTime()


🔴 Map<IntermediateResultPartitionID, ResultPartitionBytes> getResultPartitionBytes()

base64

Base64


🟢 ExecutionHistory

🟡 Optional<ArchivedExecution> getHistoricalExecution(int attemptNumber)

From getHistoricalExecutions()

🟢 Collection<ArchivedExecution> getHistoricalExecutions()


🟢 private maxAttemptNumber



🟢 TaskManagerLocation

(can be replaced by ExceptionHistoryEntry.ArchivedTaskManagerLocation)

🟢 ResourceID getResourceID()


🟢 int dataPort()


🟡 InetAddress address()

Not necessary in Read-Only version

🟢 String addressString()


🟢 String getFQDNHostname()


🟢 String getHostname()


🟢 String getNodeId()



🟢 ResourceID

🟢 private String resourceID

String representation of AbstractID

🟢 String getMetadata()