This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Motivation
Storing metadata of completed jobs
...
Proposed Changes
Interfaces & Methods
...
draw.io Diagram
...
border
...
Calling Code Location
...
Migration Plan
...
size() : int
...
not used
...
Can be removed.
...
get(JobID) : ExecutionGraphInfo
...
Dispatcher#cancel
¹
...
The graph’s JobStatus
² is used to decide whether an error should be thrown
...
Dispatcher#requestExecutionGraphInfo
¹
...
It can be replaced by the CompletedJobStore
to retrieve the ExecutionGraphInfo
...
Dispatcher#requestJobResult
¹
...
Return type JobResult
²
...
put(ExecutionGraphInfo) : void
...
createDirtyResult(JobResultEntry) : void
...
Both methods are called from within the Dispatcher
as soon as the job reaches a globally-terminal state.
...
Used JobResultEntry
is also just derived from AccessExecutionGraph
². Both methods can be merged into a single one to create the entry with the relevant information.
...
getStoredJobsOverview() : JobsOverview
...
Dispatcher#requestClusterOverview
...
Data can be stored in a file in the JobResultStore
folder to persist the information.
...
getAvailableJobDetails() : Collection<JobDetails>
...
Dispatcher#requestMultipleJobDetails
...
Return type JobDetails
²
...
getAvailableJobDetails(JobID) : JobDetails
...
Dispatcher#requestJobStatus
...
Return type JobDetails
²
...
markResultAsClean(JobID) : void
...
At the end of the cleanup process within the Dispatcher
...
Can be kept as is
...
hasJobResultEntry(JobID) : boolean
...
Dispatcher
for checking whether the job is in globally-terminal state
JobMasterServiceLeadershipRunner
to check whether the job is in globally-terminal state when processing a failover
...
Can be kept as is
true diagramName CompletedJobStore simpleViewer false width links auto tbstyle top lbox true diagramWidth 1161 revision 7
- CompletedJobStore: Takes care of the artifact (the ExecutionGraphInfo) lifecycle.
- EmbeddedCompletedJobStore: In-memory implementation of both, the job details and the ExecutionGraphInfo
- FileBasedCompletedJobStore: Writes the ExecutionGraphInfo in files but keeps the JobDetails in emory
- JobDetailsProvider: Accesses the jobs details which should always live in memory (in contrast to the ExecutionGraphInfo) to have the information accessible in a reasonable amount of time.
Migration Proposal
|
| Calling Code Location | Migration Plan |
| not used | Can be removed. | |
| The graph’s | Access | |
| The method can use | ||
| The method can use | ||
|
| Both methods are called from within the | Merging of the two methods into |
|
| The data is available through the | |
|
| Moves into JobDetailsProvider. | |
|
| Moves into JobDetailsProvider. | |
| At the end of the cleanup process within the |
| |
|
|
| |
| At the end of the cleanup process within the |
| |
| At the end of the cleanup process within the |
| |
| Before instantiating the |
|
¹ if no JobManagerRunner
is registered for the given JobID
anymore
² can be derived from ArchivedExecutionGraph
Serialization
The ExecutionGraphInfoStore
utilizes Java’s object serialization. It dumps the ExecutionGraphInfo
(which includes the ArchivedExecutionGraph
and the exception history) into a file. In contrast, the JobResultEntry
is currently serialized as JSON containing the JobResult
and a version (i.e. the version of the schema). The JobResult
is used in the cleanup process to generate a sparse ArchivedExecutionGraph
to serve to the REST API while cleaning up after failover. The version was added to allow changes to the format.
Content-wise, we can merge the two representations because the content of the JobResultEntry
can be extracted from the ArchivedExecutionGraph
. Format-wise, we run into issues because the ArchivedExecutionGraph
cannot be fully represented in a human-readable format like JSON without losing the ability to deserialize the graph. Going through the ExecutionGraphInfo
structure, the following parts have been identified as problematic for providing a human-readable JSON representation:
Accumulators
: TheArchivedExecutionGraph
comes with the customAccumulators
that were attached to theJobExecutionResult
. There is a “stringified” representation ofAccumulators
which would allow us to add “something” to the JSON representation that's human-readable. But that representation cannot be used for deserialization.ErrorInfo
is used in two places (exception history andArchivedExecution
). It stores the timestamp of when an error appeared and the error’s cause as aThrowable
. TheThrowable
is wrapped in aSerializedThrowable
to make it Serializable. TheSerializedThrowable
cannot be JSON-serialized and used for deserialization again.IOMetrics
cannot be serialized in JSON in a human-readable format because of theResultPartitionBytes
that come with it.
You can find a detailed list of the ExecutionGraphInfo
data structure in the Appendix. But essentially, we could provide a JSON representation of the data structure by keeping the three items mentioned above as Base64-encoded Strings (and provide a human-readable version if possible).
JSON Format
Serializing the ExecutionGraphInfo
in JSON allows users to access all the data without Flink binaries. The structure of the ExecutionGraphInfo
becomes kind of public, though, which makes it harder to change things in the future.
Binary Format
We could also move away from JSON as the container format. Instead, we use the JDK serialization (analogously to what the ExecutionGraphInfoStore
does right now). The version can be encoded in the file name (no specified version would imply the pre-FLIP-360 version). The flaw of this approach is that we always have to use Flink code (from a specific Flink version) to parse the content. This can be achieved through the Flink client. The ClusterClient
could provide the API to access those files and to generate the JSON without starting a dedicated cluster.
Persisting the Checkpoint statistics only
As mentioned in the Motivation section of this FLIP, having the latest checkpoint’s external storage path being persisted in the JobResultStore
is good enough to cover scenarios where the JobManager disappears too early. One could argue that we could keep the two components and only expose the checkpoint statistics as part of the JobResultStore
entry. That would allow us to keep the ArchivedExecutionGraph
data structure internal.
Readability & Backwards Compatibility
Both approaches have the flaw that they are not backwards compatible. A file generated by an older version might not be parseable by a new version of Flink if the ArchivedExecutionGraph
data structure was modified. We have to provide an deserializer implementation for each format version. A fallback option would be to persist the JobResult
individually as part of the container format. That way, we can always have the fallback behavior that is currently implemented. Currently, we create a sparse ArchivedExecutionGraph
based on the JobResult
if no further information is given.
CompletedJobStore Entry Lifecycle Management/Caching
The following table illustrates the state of entry lifecycle management and caching in the current implementations of the ExecutionGraphInfoStore
and the JobResultStore
:
|
| ||
In-Memory | Caching | Utilizes cache to maintain the lifecycle of the entries | No caching involved |
Lifecycle Mgmt. | TTL and cache capacity can be specified in Flink’s config | Data stays forever in memory (FLINK-32806) | |
File-based | Caching | Utilizes cache to reduce IO | No caching involved |
Lifecycle Mgmt. | TTL and cache capacity can be specified in Flink’s config. A separate cache is maintained for | Data is either removed when the job is globally-terminated and its artifacts properly cleaned up or by the user |
The lifecycle of the entries should follow the current ExecutionGraphInfoStore
implementations. The current JobResultStore
behavior can be mimicked by setting the expiration time for an entry to 0.
The in-memory implementation should utilize caching (analogously to the in-memory implementation of the ExecutionGraphInfoStore
) to reduce the footprint. But we should add some log messages to inform the user that a completed job disappeared because of its expiration time.
Configuration parameters
Old Parameter | Description | Migration Plan |
job-result-store.delete-on-commit (default: true) | Used for determining when the | Replaced by jobstore.expiration-time¹ (true=0, false= |
job-result-store.storage-path (default: | DFS location of the files | Deprecated and replaced by: |
Number of clean jobs for which the data shall be kept | kept | |
TTL for clean job entries | kept | |
Capacity for the | kept | |
File-based (File) or memory based (Memory) | kept |
¹ jobstore.*
could be also renamed into completed-job-store.*
since it’s less generic. I’m not sure whether there is some desire to keep it like that for the sake of consistency.
Documentation
The following items will be updated:
JobResultStore
documentation in the High Availability section of the documentation (link)- The configuration documentation will be updated according to the changes mentioned above.
Compatibility, Deprecation, and Migration Plan
Artifacts
There is no real concern in terms of backwards compatibility for the ExecutionGraphInfoStore
, because its data is lost when shutting down the Flink cluster. Old JobResultStore
entries can be still served due to the versioning of the JobResultStore
entry format.
Configuration
The JobResultStore
-related configuration parameters will become deprecated and replaced by corresponding jobstore.*
(or completed-job-store.*
) configuration parameters as described above.
Changing the default value for job-result-store.storage-path
to jobstore.storage-path
/completed-job-store.storage-path
would require us to first look for the old default value and only use the new default value if the old one doesn’t exist. It has to be considered that checking for the non-existence of values in object storage can be expensive for large buckets. This check is only done once when initializing the cluster.
Test Plan
- The existing tests should continue to work with the refactored approach.
- Additional unit tests might be added to cover new functionality.
- An e2e test might be added to verify that deleting the JobResultStore entry makes the system behave as expected.
Rejected Alternatives
Investigate merging the HistoryServer into the new component
Having a permanent storage for the ArchivedExecutionGraph
might enable us to retrieve all REST requests through the new component. This could allow us to remove the HistoryServer
.
The HistoryServer
gives us the capability to access jobs from older Flink versions without worrying about the backwards compatibility of the ArchivedExecutionGraph
. Working on such a topic would require additional investigations which is out-of-scope for this FLIP.
Splitting up the CompetedJobStore into two different components
Interfaces & Classes
draw.io Diagram border true diagramName CompletedJobStore Split simpleViewer false width links auto tbstyle top lbox true diagramWidth 1246 revision 2
Summary of the changes:
- CompletedJobStore: Takes care of the failover scenario and stores the actual data. The data might live in-memory all the time. A file-based and an in-memory implementation will be provided.
- JobDetailsStore: Takes care of the job details which live in memory. The store will also take care of triggering the final deletion of the CompletedJobStore entry if a TTL is set. The file-based CompletedJobStore will provide a in-memory version of the JobDetailsStore (i.e. the JobDetails will be created along the ExecutionGraphInfo and will live in memory as long as the CompletedJobStore entry isn't removed). In contrast, the in-memory CompletedJobStore will come with a JobDetailsStore implementation that accesses the in-memory ExecutionGraphInfo and creates the JobDetails on the fly.
The sequence diagram below visualizes the process of creating and removing the entries.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
...
hasDirtyJobResultEntry(JobID) : boolean
...
At the end of the cleanup process within the Dispatcher
...
Can be kept as is
...
hasCleanJobResultEntry(JobID) : boolean
...
At the end of the cleanup process within the Dispatcher
...
Can be kept as is
...
getDirtyResults() : Set<JobResult>
...
Before instantiating the Dispatcher
to filter the jobs that are meant to be recovered
...
Can be kept as is
¹ if no JobManagerRunner
is registered for the given JobID
anymore
² can be derived from ArchivedExecutionGraph
Summary of the changes:
ExecutionInfoStore
will be renamed intoCompletedJobStore
to reflect the new purpose- Optional: The
ExecutionGraphInfoStore
methods will be made asynchronous (analogously to what is/was done for theJobResultStore
in FLINK-27204) - All methods from the
JobResultStore
will be integrated into new interfaceCompletedJobStore
ExecutionGraphInfoStore#size()
will be removed: This method is not used anywhere.JobResultStore#getDirtyResults()
is moved into its own interfaceCompletedJobStoreWithDirtyJobRetrieval
(that’s the only method which is used outside of theDispatcher
). It derives fromCompletedJobStore
.
The file-based and in-memory implementations will implement CompletedJobStoreWithDirtyJobRetrieval
but only CompletedJobStore
is going to be passed on into the Dispatcher
.
Serialization
The ExecutionGraphInfoStore
utilizes Java’s object serialization. It dumps the ExecutionGraphInfo
(which includes the ArchivedExecutionGraph
and the exception history) into a file. In contrast, the JobResultEntry
is currently serialized as JSON containing the JobResult
and a version (i.e. the version of the schema). The JobResult
is used in the cleanup process to generate a sparse ArchivedExecutionGraph
to serve to the REST API while cleaning up after failover. The version was added to allow changes to the format.
Content-wise, we can merge the two representations because the content of the JobResultEntry
can be extracted from the ArchivedExecutionGraph
. Format-wise, we run into issues because the ArchivedExecutionGraph
cannot be fully represented in a human-readable format like JSON without losing the ability to deserialize the graph. Going through the ExecutionGraphInfo
structure, the following parts have been identified as problematic for providing a human-readable JSON representation:
Accumulators
: TheArchivedExecutionGraph
comes with the customAccumulators
that were attached to theJobExecutionResult
. There is a “stringified” representation ofAccumulators
which would allow us to add “something” to the JSON representation that's human-readable. But that representation cannot be used for deserialization.ErrorInfo
is used in two places (exception history andArchivedExecution
). It stores the timestamp of when an error appeared and the error’s cause as aThrowable
. TheThrowable
is wrapped in aSerializedThrowable
to make it Serializable. TheSerializedThrowable
cannot be JSON-serialized and used for deserialization again.IOMetrics
cannot be serialized in JSON in a human-readable format because of theResultPartitionBytes
that come with it.
You can find a detailed list of the ExecutionGraphInfo
data structure in the Appendix. But essentially, we could provide a JSON representation of the data structure by keeping the three items mentioned above as Base64-encoded Strings (and provide a human-readable version if possible).
JSON Format
Serializing the ExecutionGraphInfo
in JSON allows users to access all the data without Flink binaries. The structure of the ExecutionGraphInfo
becomes kind of public, though, which makes it harder to change things in the future.
Binary Format
We could also move away from JSON as the container format. Instead, we use the JDK serialization (analogously to what the ExecutionGraphInfoStore
does right now). The version can be encoded in the file name (no specified version would imply the pre-FLIP-360 version). The flaw of this approach is that we always have to use Flink code (from a specific Flink version) to parse the content. This can be achieved through the Flink client. The ClusterClient
could provide the API to access those files and to generate the JSON without starting a dedicated cluster.
Persisting the Checkpoint statistics only
As mentioned in the Motivation section of this FLIP, having the latest checkpoint’s external storage path being persisted in the JobResultStore
is good enough to cover scenarios where the JobManager disappears too early. One could argue that we could keep the two components and only expose the checkpoint statistics as part of the JobResultStore
entry. That would allow us to keep the ArchivedExecutionGraph
data structure internal.
Readability & Backwards Compatibility
Both approaches have the flaw that they are not backwards compatible. A file generated by an older version might not be parseable by a new version of Flink if the ArchivedExecutionGraph
data structure was modified. We have to provide an deserializer implementation for each format version. A fallback option would be to persist the JobResult
individually as part of the container format. That way, we can always have the fallback behavior that is currently implemented. Currently, we create a sparse ArchivedExecutionGraph
based on the JobResult
if no further information is given.
CompletedJobStore Entry Lifecycle Management/Caching
The following table illustrates the state of entry lifecycle management and caching in the current implementations of the ExecutionGraphInfoStore
and the JobResultStore
:
...
ExecutionGraphInfoStore
...
JobResultStore
...
In-Memory
...
Caching
...
Utilizes cache to maintain the lifecycle of the entries
...
No caching involved
...
Lifecycle Mgmt.
...
TTL and cache capacity can be specified in Flink’s config
...
Data stays forever in memory (FLINK-32806)
...
File-based
...
Caching
...
Utilizes cache to reduce IO
...
No caching involved
...
Lifecycle Mgmt.
...
TTL and cache capacity can be specified in Flink’s config.
A separate cache is maintained for JobDetails
.
...
Data is either removed when the job is globally-terminated and its artifacts properly cleaned up or by the user
The lifecycle of the entries should follow the current ExecutionGraphInfoStore
implementations. The current JobResultStore
behavior can be mimicked by setting the expiration time for an entry to 0.
The in-memory implementation should utilize caching (analogously to the in-memory implementation of the ExecutionGraphInfoStore
) to reduce the footprint. But we should add some log messages to inform the user that a completed job disappeared because of its expiration time.
Configuration parameters
Old | Description | New |
job-result-store.delete-on-commit (default: true) | Used for determining when the | Replaced by jobstore.expiration-time¹ (true=0, false= |
job-result-store.storage-path (default: | DFS location of the files | Deprecated and replaced by: |
Number of clean jobs for which the data shall be kept | kept | |
TTL for clean job entries | kept | |
Capacity for the | kept | |
File-based (File) or memory based (Memory) | kept |
¹ jobstore.*
could be also renamed into completed-job-store.*
since it’s less generic. I’m not sure whether there is some desire to keep it like that for the sake of consistency.
Documentation
The following items will be updated:
JobResultStore
documentation in the High Availability section of the documentation (link)- The configuration documentation will be updated according to the changes mentioned above.
Compatibility, Deprecation, and Migration Plan
Artifacts
There is no real concern in terms of backwards compatibility for the ExecutionGraphInfoStore
, because its data is lost when shutting down the Flink cluster. Old JobResultStore
entries can be still served due to the versioning of the JobResultStore
entry format.
Configuration
The JobResultStore
-related configuration parameters will become deprecated and replaced by corresponding jobstore.*
(or completed-job-store.*
) configuration parameters as described above.
Changing the default value for job-result-store.storage-path
to jobstore.storage-path
/completed-job-store.storage-path
would require us to first look for the old default value and only use the new default value if the old one doesn’t exist. It has to be considered that checking for the non-existence of values in object storage can be expensive for large buckets. This check is only done once when initializing the cluster.
Test Plan
- The existing tests should continue to work with the refactored approach.
- Additional unit tests might be added to cover new functionality.
- An e2e test might be added to verify that deleting the JobResultStore entry makes the system behave as expected.
Rejected Alternatives
Investigate merging the HistoryServer into the new component
Having a permanent storage for the ArchivedExecutionGraph
might enable us to retrieve all REST requests through the new component. This could allow us to remove the HistoryServer
.
The HistoryServer
gives us the capability to access jobs from older Flink versions without worrying about the backwards compatibility of the ArchivedExecutionGraph
. Working on such a topic would require additional investigations which is out-of-scope for this FLIP.
Appendix Anchorappendix appendix
appendix | |
appendix |
...