Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

Interfaces & Methods

draw.io Diagram
bordertrue
diagramNameCompletedJobStore
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1246
revision3

ExecutionGraphInfoStore

JobResultStore

Calling Code Location

Migration Plan

size() : int


not used

Can be removed.

get(JobID) : ExecutionGraphInfo

Dispatcher#cancel ¹


The graph’s JobStatus² is used within Dispatcher#cancel¹ to decide whether an error should be thrown

Dispatcher#requestExecutionGraphInfo ¹

It can be replaced by the CompletedJobStore to retrieve the ExecutionGraphInfo

Dispatcher#requestJobResult ¹

Return type JobResult ²

Access JobDetailsStore to retrieve the JobStatus through the JobDetails of the corresponding job.

Dispatcher#requestExecutionGraphInfo¹ returns the ExecutionGraphInfo.

The method can use CompletedJobStore to retrieve the ExecutionGraphInfo.

Dispatcher#requestJobResult¹ derives the JobResult from the ExecutionGraphInfo.

The method can use CompletedJobStore to retrieve the JobResult through the ExecutionGraphInfo.

put(ExecutionGraphInfo) : void

createDirtyResult

createDirtyResultAsync(JobResultEntry) :

void

CompletableFuture<Void>

Both methods are called from within the Dispatcher as soon as the job reaches a globally-terminal state.

Used JobResultEntry is also just derived from AccessExecutionGraph². Both methods can be merged into a single one to create the entry with the relevant information.

Merging of the two methods into addDirtyJobAsync(ExecutionGraphInfo) : CompletableFuture<Void> 

getStoredJobsOverview() : JobsOverview


Dispatcher#requestClusterOverview

Data can be stored in a file in the JobResultStore folder to persist the information

The data is available through the JobDetailsStore through the get*Count() methods. The JobsOverview instantiation will happen within the Dispatcher instead.

getAvailableJobDetails() : Collection<JobDetails>


Dispatcher#requestMultipleJobDetails

Return type JobDetails ²

Moves into JobDetailsStore.

getAvailableJobDetails(JobID) : JobDetails


Dispatcher#requestJobStatus

Return type JobDetails  ²

Moves into JobDetailsStore.


markResultAsCleanAsync

markResultAsClean

(JobID) :

void

CompletableFuture<Void>

At the end of the cleanup process within the Dispatcher

Can be kept as is

hasJobResultEntry

CompletableJobStore#markJobAsCleanAsync(JobID) : CompletableFuture<Void>


hasJobResultEntryAsync(JobID) :

boolean

CompletableFuture<Boolean>

Dispatcher for checking whether the job is in globally-terminal state

JobMasterServiceLeadershipRunner to check whether the job is in globally-terminal state when processing a failover

Can be kept as is

hasDirtyJobResultEntry

CompletableJobStore#isGloballyTerminated(JobID) : CompletableFuture<Boolean>


hasDirtyJobResultEntryAsync(JobID) :

boolean

CompletableFuture<Boolean>

At the end of the cleanup process within the Dispatcher

Can be kept as is

hasCleanJobResultEntry

CompletableJobStore#isCleanedAsync(JobID) : CompletableFuture<Boolean>


hasCleanJobResultEntryAsync(JobID) :

boolean

CompletableFuture<Boolean>

At the end of the cleanup process within the Dispatcher

Can be kept as is

CompletableJobStore#isDirty(JobID) : CompletableFuture<Boolean>


getDirtyResults() : Set<JobResult>

Before instantiating the Dispatcher to filter the jobs that are meant to be recovered

Can be kept as is

CompletableJobStore#getDirtyResults() : Set<ExecutionGraphInfo>

¹ if no JobManagerRunner is registered for the given JobID anymore
² can be derived from ArchivedExecutionGraph

Summary of the changes:

  • ExecutionInfoStore will be renamed into CompletedJobStore to reflect the new purpose
  • Optional: The ExecutionGraphInfoStore methods will be made asynchronous (analogously to what is/was done for the JobResultStore in FLINK-27204)
  • All methods from the JobResultStore will be integrated into new interface CompletedJobStore
  • ExecutionGraphInfoStore#size() will be removed: This method is not used anywhere.
  • JobResultStore#getDirtyResults() is moved into its own interface CompletedJobStoreWithDirtyJobRetrieval (that’s the only method which is used outside of the Dispatcher). It derives from CompletedJobStore.
  • CompletedJobStore: Takes care of the failover scenario and stores the actual data. The data might live in-memory all the time. A file-based and an in-memory implementation will be provided.
  • JobDetailsStore: Takes care of the job details which live in memory. The store will also take care of triggering the final deletion of the CompletedJobStore entry if a TTL is set. The file-based CompletedJobStore will provide a in-memory version of the JobDetailsStore (i.e. the JobDetails will be created along the ExecutionGraphInfo and will live in memory as long as the CompletedJobStore entry isn't removed). In contrast, the in-memory CompletedJobStore will come with a JobDetailsStore implementation that accesses the in-memory ExecutionGraphInfo and creates the JobDetails on the fly.

The sequence diagram below visualizes the process of creating and removing the entries.

draw.io Diagram
bordertrue
diagramNameFLIP-360 - Job Completion
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth466
revision2
The file-based and in-memory implementations will implement CompletedJobStoreWithDirtyJobRetrieval but only CompletedJobStore is going to be passed on into the Dispatcher.

Serialization

The ExecutionGraphInfoStore utilizes Java’s object serialization. It dumps the ExecutionGraphInfo (which includes the ArchivedExecutionGraph and the exception history) into a file. In contrast, the JobResultEntry is currently serialized as JSON containing the JobResult and a version (i.e. the version of the schema). The JobResult is used in the cleanup process to generate a sparse ArchivedExecutionGraph to serve to the REST API while cleaning up after failover. The version was added to allow changes to the format.

...