Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release<Flink Version>
thread/b65r92n8msgdrv6xjsnjwrlwgnxj81j3
Vote threadtba
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-31709

Release<Flink Version>


Please keep Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Storing metadata of completed jobs

...

Proposed Changes

Interfaces & Methods

draw.io Diagram

...

bordertrue

...

diagramName

...

Calling Code Location

...

Migration Plan

...

size() : int

...

not used

...

Can be removed.

...

get(JobID) : ExecutionGraphInfo

...

Dispatcher#cancel ¹

...

The graph’s JobStatus ² is used to decide whether an error should be thrown

...

Dispatcher#requestExecutionGraphInfo ¹

...

It can be replaced by the CompletedJobStore to retrieve the ExecutionGraphInfo

...

Dispatcher#requestJobResult ¹

...

Return type JobResult ²

...

put(ExecutionGraphInfo) : void

...

createDirtyResult(JobResultEntry) : void

...

Both methods are called from within the Dispatcher as soon as the job reaches a globally-terminal state.

...

Used JobResultEntry is also just derived from AccessExecutionGraph². Both methods can be merged into a single one to create the entry with the relevant information.

...

getStoredJobsOverview() : JobsOverview

...

Dispatcher#requestClusterOverview

...

Data can be stored in a file in the JobResultStore folder to persist the information.

...

getAvailableJobDetails() : Collection<JobDetails>

...

Dispatcher#requestMultipleJobDetails

...

Return type JobDetails ²

...

getAvailableJobDetails(JobID) : JobDetails

...

Dispatcher#requestJobStatus

...

Return type JobDetails  ²

...

markResultAsClean(JobID) : void

...

At the end of the cleanup process within the Dispatcher

...

Can be kept as is

...

hasJobResultEntry(JobID) : boolean

...

Dispatcher for checking whether the job is in globally-terminal state

JobMasterServiceLeadershipRunner to check whether the job is in globally-terminal state when processing a failover

...

Can be kept as is

CompletedJobStore
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1161
revision7

  • CompletedJobStore: Takes care of the artifact (the ExecutionGraphInfo) lifecycle.
    • EmbeddedCompletedJobStore: In-memory implementation of both, the job details and the ExecutionGraphInfo
    • FileBasedCompletedJobStore: Writes the ExecutionGraphInfo in files but keeps the JobDetails in emory
  • JobDetailsProvider: Accesses the jobs details which should always live in memory (in contrast to the ExecutionGraphInfo) to have the information accessible in a reasonable amount of time. 

Migration Proposal

ExecutionGraphInfoStore

JobResultStore

Calling Code Location

Migration Plan

size() : int


not used

Can be removed.

get(JobID) : ExecutionGraphInfo


The graph’s JobStatus² is used within Dispatcher#cancel¹ to decide whether an error should be thrown. 

Access JobDetailsProvder to retrieve the JobStatus through the JobDetails of the corresponding job.

Dispatcher#requestExecutionGraphInfo¹ returns the ExecutionGraphInfo.

The method can use CompletedJobStore to retrieve the ExecutionGraphInfo.

Dispatcher#requestJobResult¹ derives the JobResult from the ExecutionGraphInfo.

The method can use CompletedJobStore to retrieve the JobResult through the ExecutionGraphInfo.

put(ExecutionGraphInfo) : void

createDirtyResultAsync(JobResultEntry) : CompletableFuture<Void>

Both methods are called from within the Dispatcher as soon as the job reaches a globally-terminal state.

Merging of the two methods into addDirtyJobAsync(ExecutionGraphInfo) : CompletableFuture<Void> 

getStoredJobsOverview() : JobsOverview


Dispatcher#requestClusterOverview

The data is available through the JobDetailsProvider through the get*Count() methods. The JobsOverview instantiation will happen within the Dispatcher instead.

getAvailableJobDetails() : Collection<JobDetails>


Dispatcher#requestMultipleJobDetails

Moves into JobDetailsProvider.

getAvailableJobDetails(JobID) : JobDetails


Dispatcher#requestJobStatus

Moves into JobDetailsProvider.


markResultAsCleanAsync(JobID) : CompletableFuture<Void>

At the end of the cleanup process within the Dispatcher

CompletableJobStore#markJobAsCleanAsync(JobID) : CompletableFuture<Void>


hasJobResultEntryAsync(JobID) : CompletableFuture<Boolean>

Dispatcher for checking whether the job is in globally-terminal state

JobMasterServiceLeadershipRunner to check whether the job is in globally-terminal state when processing a failover

CompletableJobStore#isGloballyTerminated(JobID) : CompletableFuture<Boolean>


hasDirtyJobResultEntryAsync(JobID) : CompletableFuture<Boolean>

At the end of the cleanup process within the Dispatcher

CompletableJobStore#isCleanedAsync(JobID) : CompletableFuture<Boolean>


hasCleanJobResultEntryAsync(JobID) : CompletableFuture<Boolean>

At the end of the cleanup process within the Dispatcher

CompletableJobStore#isDirty(JobID) : CompletableFuture<Boolean>


getDirtyResults() : Set<JobResult>

Before instantiating the Dispatcher to filter the jobs that are meant to be recovered

CompletableJobStore#getDirtyResults() : Set<ExecutionGraphInfo>

¹ if no JobManagerRunner is registered for the given JobID anymore
² can be derived from ArchivedExecutionGraph

Serialization

The ExecutionGraphInfoStore utilizes Java’s object serialization. It dumps the ExecutionGraphInfo (which includes the ArchivedExecutionGraph and the exception history) into a file. In contrast, the JobResultEntry is currently serialized as JSON containing the JobResult and a version (i.e. the version of the schema). The JobResult is used in the cleanup process to generate a sparse ArchivedExecutionGraph to serve to the REST API while cleaning up after failover. The version was added to allow changes to the format.

Content-wise, we can merge the two representations because the content of the JobResultEntry can be extracted from the ArchivedExecutionGraphFormat-wise, we run into issues because the ArchivedExecutionGraph cannot be fully represented in a human-readable format like JSON without losing the ability to deserialize the graph. Going through the ExecutionGraphInfo structure, the following parts have been identified as problematic for providing a human-readable JSON representation:

  • Accumulators: The ArchivedExecutionGraph comes with the custom Accumulators that were attached to the JobExecutionResult. There is a “stringified” representation of Accumulators which would allow us to add “something” to the JSON representation that's human-readable. But that representation cannot be used for deserialization.
  • ErrorInfo is used in two places (exception history and ArchivedExecution). It stores the timestamp of when an error appeared and the error’s cause as a Throwable. The Throwable is wrapped in a SerializedThrowable to make it Serializable. The SerializedThrowable cannot be JSON-serialized and used for deserialization again.
  • IOMetrics cannot be serialized in JSON in a human-readable format because of the ResultPartitionBytes that come with it.

You can find a detailed list of the ExecutionGraphInfo data structure in the Appendix. But essentially, we could provide a JSON representation of the data structure by keeping the three items mentioned above as Base64-encoded Strings (and provide a human-readable version if possible).

JSON Format

Serializing the ExecutionGraphInfo in JSON allows users to access all the data without Flink binaries. The structure of the ExecutionGraphInfo becomes kind of public, though, which makes it harder to change things in the future.

Binary Format

We could also move away from JSON as the container format. Instead, we use the JDK serialization (analogously to what the ExecutionGraphInfoStore does right now). The version can be encoded in the file name (no specified version would imply the pre-FLIP-360 version). The flaw of this approach is that we always have to use Flink code (from a specific Flink version) to parse the content. This can be achieved through the Flink client. The ClusterClient could provide the API to access those files and to generate the JSON without starting a dedicated cluster.

Persisting the Checkpoint statistics only

As mentioned in the Motivation section of this FLIP, having the latest checkpoint’s external storage path being persisted in the JobResultStore is good enough to cover scenarios where the JobManager disappears too early. One could argue that we could keep the two components and only expose the checkpoint statistics as part of the JobResultStore entry. That would allow us to keep the ArchivedExecutionGraph data structure internal.

Readability & Backwards Compatibility

Both approaches have the flaw that they are not backwards compatible. A file generated by an older version might not be parseable by a new version of Flink if the ArchivedExecutionGraph data structure was modified. We have to provide an deserializer implementation for each format version. A fallback option would be to persist the JobResult individually as part of the container format. That way, we can always have the fallback behavior that is currently implemented. Currently, we create a sparse ArchivedExecutionGraph based on the JobResult if no further information is given.


CompletedJobStore Entry Lifecycle Management/Caching

The following table illustrates the state of entry lifecycle management and caching in the current implementations of the ExecutionGraphInfoStore and the JobResultStore:



ExecutionGraphInfoStore

JobResultStore

In-Memory

Caching

Utilizes cache to maintain the lifecycle of the entries

No caching involved

Lifecycle Mgmt.

TTL and cache capacity can be specified in Flink’s config

Data stays forever in memory (FLINK-32806)

File-based

Caching

Utilizes cache to reduce IO

No caching involved

Lifecycle Mgmt.

TTL and cache capacity can be specified in Flink’s config. 

A separate cache is maintained for JobDetails.

Data is either removed when the job is globally-terminated and its artifacts properly cleaned up or by the user


The lifecycle of the entries should follow the current ExecutionGraphInfoStore implementations. The current JobResultStore behavior can be mimicked by setting the expiration time for an entry to 0.

The in-memory implementation should utilize caching (analogously to the in-memory implementation of the ExecutionGraphInfoStore) to reduce the footprint. But we should add some log messages to inform the user that a completed job disappeared because of its expiration time.

Configuration parameters

Old Parameter

Description

Migration Plan

job-result-store.delete-on-commit (default: true)

Used for determining when the JobResultStore entry should be deleted.

Replaced by jobstore.expiration-time¹ (true=0, false=Integer.MAX_VALUE)

job-result-store.storage-path (default: ${haStorage}/job-result-store/${clusterId})

DFS location of the files

Deprecated and replaced by: jobstore.storage-path (default: ${haStorage}/completed-job-store/{$clusterId})

jobstore.cache-size¹

Number of clean jobs for which the data shall be kept

kept

jobstore.expiration-time¹

TTL for clean job entries

kept

jobstore.max-capacity¹

Capacity for the ExecutionGraphInfo to be kept in-memory

kept

jobstore.type¹

File-based (File) or memory based (Memory)

kept

¹ jobstore.* could be also renamed into completed-job-store.* since it’s less generic. I’m not sure whether there is some desire to keep it like that for the sake of consistency.

Documentation

The following items will be updated:

  • JobResultStore documentation in the High Availability section of the documentation (link)
  • The configuration documentation will be updated according to the changes mentioned above.


Compatibility, Deprecation, and Migration Plan

Artifacts

There is no real concern in terms of backwards compatibility for the ExecutionGraphInfoStore, because its data is lost when shutting down the Flink cluster. Old JobResultStore entries can be still served due to the versioning of the JobResultStore entry format.

Configuration

The JobResultStore-related configuration parameters will become deprecated and replaced by corresponding jobstore.* (or completed-job-store.*) configuration parameters as described above.

Changing the default value for job-result-store.storage-path to jobstore.storage-path/completed-job-store.storage-path would require us to first look for the old default value and only use the new default value if the old one doesn’t exist. It has to be considered that checking for the non-existence of values in object storage can be expensive for large buckets. This check is only done once when initializing the cluster.

Test Plan

  • The existing tests should continue to work with the refactored approach.
  • Additional unit tests might be added to cover new functionality.
  • An e2e test might be added to verify that deleting the JobResultStore entry makes the system behave as expected.

Rejected Alternatives

Investigate merging the HistoryServer into the new component

Having a permanent storage for the ArchivedExecutionGraph might enable us to retrieve all REST requests through the new component. This could allow us to remove the HistoryServer.

The HistoryServer gives us the capability to access jobs from older Flink versions without worrying about the backwards compatibility of the ArchivedExecutionGraph. Working on such a topic would require additional investigations which is out-of-scope for this FLIP.

Splitting up the CompetedJobStore into two different components

Interfaces & Classes

draw.io Diagram
bordertrue
diagramNameCompletedJobStore Split
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1246
revision2

Summary of the changes:

  • CompletedJobStore: Takes care of the failover scenario and stores the actual data. The data might live in-memory all the time. A file-based and an in-memory implementation will be provided.
  • JobDetailsStore: Takes care of the job details which live in memory. The store will also take care of triggering the final deletion of the CompletedJobStore entry if a TTL is set. The file-based CompletedJobStore will provide a in-memory version of the JobDetailsStore (i.e. the JobDetails will be created along the ExecutionGraphInfo and will live in memory as long as the CompletedJobStore entry isn't removed). In contrast, the in-memory CompletedJobStore will come with a JobDetailsStore implementation that accesses the in-memory ExecutionGraphInfo and creates the JobDetails on the fly.

The sequence diagram below visualizes the process of creating and removing the entries.

draw.io Diagram
bordertrue
diagramNameFLIP-360 - Job Completion
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth466
revision2

...

hasDirtyJobResultEntry(JobID) : boolean

...

At the end of the cleanup process within the Dispatcher

...

Can be kept as is

...

hasCleanJobResultEntry(JobID) : boolean

...

At the end of the cleanup process within the Dispatcher

...

Can be kept as is

...

getDirtyResults() : Set<JobResult>

...

Before instantiating the Dispatcher to filter the jobs that are meant to be recovered

...

Can be kept as is

¹ if no JobManagerRunner is registered for the given JobID anymore
² can be derived from ArchivedExecutionGraph

Summary of the changes:

  • ExecutionInfoStore will be renamed into CompletedJobStore to reflect the new purpose
  • Optional: The ExecutionGraphInfoStore methods will be made asynchronous (analogously to what is/was done for the JobResultStore in FLINK-27204)
  • All methods from the JobResultStore will be integrated into new interface CompletedJobStore
  • ExecutionGraphInfoStore#size() will be removed: This method is not used anywhere.
  • JobResultStore#getDirtyResults() is moved into its own interface CompletedJobStoreWithDirtyJobRetrieval (that’s the only method which is used outside of the Dispatcher). It derives from CompletedJobStore.

The file-based and in-memory implementations will implement CompletedJobStoreWithDirtyJobRetrieval but only CompletedJobStore is going to be passed on into the Dispatcher.

Serialization

The ExecutionGraphInfoStore utilizes Java’s object serialization. It dumps the ExecutionGraphInfo (which includes the ArchivedExecutionGraph and the exception history) into a file. In contrast, the JobResultEntry is currently serialized as JSON containing the JobResult and a version (i.e. the version of the schema). The JobResult is used in the cleanup process to generate a sparse ArchivedExecutionGraph to serve to the REST API while cleaning up after failover. The version was added to allow changes to the format.

Content-wise, we can merge the two representations because the content of the JobResultEntry can be extracted from the ArchivedExecutionGraphFormat-wise, we run into issues because the ArchivedExecutionGraph cannot be fully represented in a human-readable format like JSON without losing the ability to deserialize the graph. Going through the ExecutionGraphInfo structure, the following parts have been identified as problematic for providing a human-readable JSON representation:

  • Accumulators: The ArchivedExecutionGraph comes with the custom Accumulators that were attached to the JobExecutionResult. There is a “stringified” representation of Accumulators which would allow us to add “something” to the JSON representation that's human-readable. But that representation cannot be used for deserialization.
  • ErrorInfo is used in two places (exception history and ArchivedExecution). It stores the timestamp of when an error appeared and the error’s cause as a Throwable. The Throwable is wrapped in a SerializedThrowable to make it Serializable. The SerializedThrowable cannot be JSON-serialized and used for deserialization again.
  • IOMetrics cannot be serialized in JSON in a human-readable format because of the ResultPartitionBytes that come with it.

You can find a detailed list of the ExecutionGraphInfo data structure in the Appendix. But essentially, we could provide a JSON representation of the data structure by keeping the three items mentioned above as Base64-encoded Strings (and provide a human-readable version if possible).

JSON Format

Serializing the ExecutionGraphInfo in JSON allows users to access all the data without Flink binaries. The structure of the ExecutionGraphInfo becomes kind of public, though, which makes it harder to change things in the future.

Binary Format

We could also move away from JSON as the container format. Instead, we use the JDK serialization (analogously to what the ExecutionGraphInfoStore does right now). The version can be encoded in the file name (no specified version would imply the pre-FLIP-360 version). The flaw of this approach is that we always have to use Flink code (from a specific Flink version) to parse the content. This can be achieved through the Flink client. The ClusterClient could provide the API to access those files and to generate the JSON without starting a dedicated cluster.

Persisting the Checkpoint statistics only

As mentioned in the Motivation section of this FLIP, having the latest checkpoint’s external storage path being persisted in the JobResultStore is good enough to cover scenarios where the JobManager disappears too early. One could argue that we could keep the two components and only expose the checkpoint statistics as part of the JobResultStore entry. That would allow us to keep the ArchivedExecutionGraph data structure internal.

Readability & Backwards Compatibility

Both approaches have the flaw that they are not backwards compatible. A file generated by an older version might not be parseable by a new version of Flink if the ArchivedExecutionGraph data structure was modified. We have to provide an deserializer implementation for each format version. A fallback option would be to persist the JobResult individually as part of the container format. That way, we can always have the fallback behavior that is currently implemented. Currently, we create a sparse ArchivedExecutionGraph based on the JobResult if no further information is given.

CompletedJobStore Entry Lifecycle Management/Caching

The following table illustrates the state of entry lifecycle management and caching in the current implementations of the ExecutionGraphInfoStore and the JobResultStore:

...

ExecutionGraphInfoStore

...

JobResultStore

...

In-Memory

...

Caching

...

Utilizes cache to maintain the lifecycle of the entries

...

No caching involved

...

Lifecycle Mgmt.

...

TTL and cache capacity can be specified in Flink’s config

...

Data stays forever in memory (FLINK-32806)

...

File-based

...

Caching

...

Utilizes cache to reduce IO

...

No caching involved

...

Lifecycle Mgmt.

...

TTL and cache capacity can be specified in Flink’s config. 

A separate cache is maintained for JobDetails.

...

Data is either removed when the job is globally-terminated and its artifacts properly cleaned up or by the user

The lifecycle of the entries should follow the current ExecutionGraphInfoStore implementations. The current JobResultStore behavior can be mimicked by setting the expiration time for an entry to 0.

The in-memory implementation should utilize caching (analogously to the in-memory implementation of the ExecutionGraphInfoStore) to reduce the footprint. But we should add some log messages to inform the user that a completed job disappeared because of its expiration time.

Configuration parameters

Old

Description

New

job-result-store.delete-on-commit (default: true)

Used for determining when the JobResultStore entry should be deleted.

Replaced by jobstore.expiration-time¹ (true=0, false=Integer.MAX_VALUE)

job-result-store.storage-path (default: ${haStorage}/job-result-store/${clusterId})

DFS location of the files

Deprecated and replaced by: jobstore.storage-path (default: ${haStorage}/completed-job-store/{$clusterId})

jobstore.cache-size¹

Number of clean jobs for which the data shall be kept

kept

jobstore.expiration-time¹

TTL for clean job entries

kept

jobstore.max-capacity¹

Capacity for the ExecutionGraphInfo to be kept in-memory

kept

jobstore.type¹

File-based (File) or memory based (Memory)

kept

¹ jobstore.* could be also renamed into completed-job-store.* since it’s less generic. I’m not sure whether there is some desire to keep it like that for the sake of consistency.

Documentation

The following items will be updated:

  • JobResultStore documentation in the High Availability section of the documentation (link)
  • The configuration documentation will be updated according to the changes mentioned above.

Compatibility, Deprecation, and Migration Plan

Artifacts

There is no real concern in terms of backwards compatibility for the ExecutionGraphInfoStore, because its data is lost when shutting down the Flink cluster. Old JobResultStore entries can be still served due to the versioning of the JobResultStore entry format.

Configuration

The JobResultStore-related configuration parameters will become deprecated and replaced by corresponding jobstore.* (or completed-job-store.*) configuration parameters as described above.

Changing the default value for job-result-store.storage-path to jobstore.storage-path/completed-job-store.storage-path would require us to first look for the old default value and only use the new default value if the old one doesn’t exist. It has to be considered that checking for the non-existence of values in object storage can be expensive for large buckets. This check is only done once when initializing the cluster.

Test Plan

  • The existing tests should continue to work with the refactored approach.
  • Additional unit tests might be added to cover new functionality.
  • An e2e test might be added to verify that deleting the JobResultStore entry makes the system behave as expected.

Rejected Alternatives

Investigate merging the HistoryServer into the new component

Having a permanent storage for the ArchivedExecutionGraph might enable us to retrieve all REST requests through the new component. This could allow us to remove the HistoryServer.

The HistoryServer gives us the capability to access jobs from older Flink versions without worrying about the backwards compatibility of the ArchivedExecutionGraph. Working on such a topic would require additional investigations which is out-of-scope for this FLIP.

Appendix
Anchor
appendix
appendix

...