Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Storing metadata of completed jobs
Apache Flink provides the JobResultStore
and the ExecutionGraphInfoStore
as two components to manage the state of globally-terminated jobs.
ExecutionGraphInfoStore
The ExecutionGraphInfoStore
stores an archived version of the ExecutionGraphInfo
¹ to make it accessible via the Flink REST API. Flink provides two implementations of this interface:
- The
FileExecutionGraphInfoStore
provides a file-based implementation. The data is stored in a temporary folder on the JobManager’s disk. Caches are used to reduce IO operations. - The
MemoryExecutionGraphInfoStore
stores the data in the JobManager’s memory. A cache enables the user to define the number of entries and the TTL of each entry.
In either case, the information about completed jobs will be lost (the web UI won’t list those anymore) in case of a JobManager failover or a Flink cluster shutdown.
¹ Note: The ExecutionGraphInfoStore
saves the ExecutionGraphInfo
which consists of the ArchivedExecutionGraph
and the exception history for now. The JobResult
can be derived from the ArchivedExecutionGraph
.
JobResultStore
The JobResultStore
was introduced with FLIP-194 to mitigate effects of a JobManager failover while cleaning up a globally-terminated job. The goal was to have the information about the job’s terminal state available even after a failover to avoid rerunning the job (FLINK-11813).
Flink provides two implementations of the JobResultStore
:
- The
EmbeddedJobResultStore
stores the data in-memory. This covers scenarios where the failover of jobs isn’t desired/necessary (i.e. non-HA cases). The data is lost in case of a JobManager failover or Flink cluster shutdown. - The
FileSystemBasedJobResultStore
stores the data in a DFS (e.g. S3). The path can be configured through Flink’s configuration.
There are two ways to handle the lifecycle of the JobResultStore
entries:
- Default behavior: The entry is removed by Flink automatically as soon as the job’s metadata was cleaned up after reaching a globally-terminal state.
- Optional for the
FileSystemBasedJobResultStore
: The entry is kept. In that case, Flink "hands over" the ownership of the corresponding file to the user. The user has to take care to remove artifacts that are not needed anymore. This only works with the file-based implementation.
The CompletedJobStore as a single store for completed jobs
The two components introduced in the previous section have similar use-cases. The difference lies in the type of information that’s stored and the durability of this information. This FLIP proposes to unite both purposes in a single component CompletedJobStore.
The benefits would be:
- The REST API (and the web UI) will be able to list completed jobs even after a JobManager failover.
- The user has more control over what information should be kept. Flink could provide a TTL to allow automatic removal of entries after a certain grace period. But files can be (re)moved by the users to remove individual jobs from the cluster’s job overview.
- Additional information (e.g. the latest checkpoint/state) is persisted in the
ArchivedExecutionGraph
that would survive a JobManager failover or Flink cluster restart. - The HistoryServer isn’t necessary anymore to have access to completed jobs (at least for the same Flink version).
Persisting Latest State
There have been discussions about this in the past: The reference to the latest checkpoint of a globally-terminated job can be lost if the JobManager fails while cleaning up the job. The ArchivedExecutionGraph
will be written to the file-based ExecutionGraphInfoStore
but the data is not accessible anymore. Persisting (at least) the checkpoint statistics along the JobResult
in the JobResultStore
would help avoiding these scenarios. The Kubernetes Operator (and any other external systems relying on the checkpoint metadata) would benefit from this (see the kind of related Jira issue FLINK-27569).
Public Interfaces
No API was annotated as @Public
or @PublicEvolving
, yet. Both the ExecutionGraphInfoStore
and the JobResultStore
interfaces are considered @Internal
.
The content of the JobResultStore
files are accessible by the user, though. The file content is not documented (that needs to change) but should be considered public API (we cannot be sure that this data isn’t used by users in some way). The format of the file content already supports versioning which would allow us to provide fallback behavior.
Proposed Changes
Interfaces & Methods
|
| Calling Code Location | Migration Plan |
| not used | Can be removed. | |
|
| The graph’s | |
| It can be replaced by the | ||
| Return type | ||
|
| Both methods are called from within the | Used |
|
| Data can be stored in a file in the | |
|
| Return type | |
|
| Return type | |
| At the end of the cleanup process within the | Can be kept as is | |
|
| Can be kept as is | |
| At the end of the cleanup process within the | Can be kept as is | |
| At the end of the cleanup process within the | Can be kept as is | |
| Before instantiating the | Can be kept as is |
¹ if no JobManagerRunner
is registered for the given JobID
anymore
² can be derived from ArchivedExecutionGraph
Summary of the changes:
ExecutionInfoStore
will be renamed intoCompletedJobStore
to reflect the new purpose- Optional: The
ExecutionGraphInfoStore
methods will be made asynchronous (analogously to what is/was done for theJobResultStore
in FLINK-27204) - All methods from the
JobResultStore
will be integrated into new interfaceCompletedJobStore
ExecutionGraphInfoStore#size()
will be removed: This method is not used anywhere.JobResultStore#getDirtyResults()
is moved into its own interfaceCompletedJobStoreWithDirtyJobRetrieval
(that’s the only method which is used outside of theDispatcher
). It derives fromCompletedJobStore
.
The file-based and in-memory implementations will implement CompletedJobStoreWithDirtyJobRetrieval
but only CompletedJobStore
is going to be passed on into the Dispatcher
.
Serialization
The ExecutionGraphInfoStore
utilizes Java’s object serialization. It dumps the ExecutionGraphInfo
(which includes the ArchivedExecutionGraph
and the exception history) into a file. In contrast, the JobResultEntry
is currently serialized as JSON containing the JobResult
and a version (i.e. the version of the schema). The JobResult
is used in the cleanup process to generate a sparse ArchivedExecutionGraph
to serve to the REST API while cleaning up after failover. The version was added to allow changes to the format.
Content-wise, we can merge the two representations because the content of the JobResultEntry
can be extracted from the ArchivedExecutionGraph
. Format-wise, we run into issues because the ArchivedExecutionGraph
cannot be fully represented in a human-readable format like JSON without losing the ability to deserialize the graph. Going through the ExecutionGraphInfo
structure, the following parts have been identified as problematic for providing a human-readable JSON representation:
Accumulators
: TheArchivedExecutionGraph
comes with the customAccumulators
that were attached to theJobExecutionResult
. There is a “stringified” representation ofAccumulators
which would allow us to add “something” to the JSON representation that's human-readable. But that representation cannot be used for deserialization.ErrorInfo
is used in two places (exception history andArchivedExecution
). It stores the timestamp of when an error appeared and the error’s cause as aThrowable
. TheThrowable
is wrapped in aSerializedThrowable
to make it Serializable. TheSerializedThrowable
cannot be JSON-serialized and used for deserialization again.IOMetrics
cannot be serialized in JSON in a human-readable format because of theResultPartitionBytes
that come with it.
You can find a detailed list of the ExecutionGraphInfo
data structure in the Appendix. But essentially, we could provide a JSON representation of the data structure by keeping the three items mentioned above as Base64-encoded Strings (and provide a human-readable version if possible).
JSON Format
Serializing the ExecutionGraphInfo
in JSON allows users to access all the data without Flink binaries. The structure of the ExecutionGraphInfo
becomes kind of public, though, which makes it harder to change things in the future.
Binary Format
We could also move away from JSON as the container format. Instead, we use the JDK serialization (analogously to what the ExecutionGraphInfoStore
does right now). The version can be encoded in the file name (no specified version would imply the pre-FLIP-360 version). The flaw of this approach is that we always have to use Flink code (from a specific Flink version) to parse the content. This can be achieved through the Flink client. The ClusterClient
could provide the API to access those files and to generate the JSON without starting a dedicated cluster.
Persisting the Checkpoint statistics only
As mentioned in the Motivation section of this FLIP, having the latest checkpoint’s external storage path being persisted in the JobResultStore
is good enough to cover scenarios where the JobManager disappears too early. One could argue that we could keep the two components and only expose the checkpoint statistics as part of the JobResultStore
entry. That would allow us to keep the ArchivedExecutionGraph
data structure internal.
Readability & Backwards Compatibility
Both approaches have the flaw that they are not backwards compatible. A file generated by an older version might not be parseable by a new version of Flink if the ArchivedExecutionGraph
data structure was modified. We have to provide an deserializer implementation for each format version. A fallback option would be to persist the JobResult
individually as part of the container format. That way, we can always have the fallback behavior that is currently implemented. Currently, we create a sparse ArchivedExecutionGraph
based on the JobResult
if no further information is given.
CompletedJobStore Entry Lifecycle Management/Caching
The following table illustrates the state of entry lifecycle management and caching in the current implementations of the ExecutionGraphInfoStore
and the JobResultStore
:
|
| ||
In-Memory | Caching | Utilizes cache to maintain the lifecycle of the entries | No caching involved |
Lifecycle Mgmt. | TTL and cache capacity can be specified in Flink’s config | Data stays forever in memory (FLINK-32806) | |
File-based | Caching | Utilizes cache to reduce IO | No caching involved |
Lifecycle Mgmt. | TTL and cache capacity can be specified in Flink’s config. A separate cache is maintained for | Data is either removed when the job is globally-terminated and its artifacts properly cleaned up or by the user |
The lifecycle of the entries should follow the current ExecutionGraphInfoStore
implementations. The current JobResultStore
behavior can be mimicked by setting the expiration time for an entry to 0.
The in-memory implementation should utilize caching (analogously to the in-memory implementation of the ExecutionGraphInfoStore
) to reduce the footprint. But we should add some log messages to inform the user that a completed job disappeared because of its expiration time.
Configuration parameters
Old | Description | New |
job-result-store.delete-on-commit (default: true) | Used for determining when the | Replaced by jobstore.expiration-time¹ (true=0, false= |
job-result-store.storage-path (default: | DFS location of the files | Deprecated and replaced by: |
Number of clean jobs for which the data shall be kept | kept | |
TTL for clean job entries | kept | |
Capacity for the | kept | |
File-based (File) or memory based (Memory) | kept |
¹ jobstore.*
could be also renamed into completed-job-store.*
since it’s less generic. I’m not sure whether there is some desire to keep it like that for the sake of consistency.
Documentation
The following items will be updated:
JobResultStore
documentation in the High Availability section of the documentation (link)- The configuration documentation will be updated according to the changes mentioned above.
Compatibility, Deprecation, and Migration Plan
Artifacts
There is no real concern in terms of backwards compatibility for the ExecutionGraphInfoStore
, because its data is lost when shutting down the Flink cluster. Old JobResultStore
entries can be still served due to the versioning of the JobResultStore
entry format.
Configuration
The JobResultStore
-related configuration parameters will become deprecated and replaced by corresponding jobstore.*
(or completed-job-store.*
) configuration parameters as described above.
Changing the default value for job-result-store.storage-path
to jobstore.storage-path
/completed-job-store.storage-path
would require us to first look for the old default value and only use the new default value if the old one doesn’t exist. It has to be considered that checking for the non-existence of values in object storage can be expensive for large buckets. This check is only done once when initializing the cluster.
Test Plan
- The existing tests should continue to work with the refactored approach.
- Additional unit tests might be added to cover new functionality.
- An e2e test might be added to verify that deleting the JobResultStore entry makes the system behave as expected.
Rejected Alternatives
Investigate merging the HistoryServer into the new component
Having a permanent storage for the ArchivedExecutionGraph
might enable us to retrieve all REST requests through the new component. This could allow us to remove the HistoryServer
.
The HistoryServer
gives us the capability to access jobs from older Flink versions without worrying about the backwards compatibility of the ArchivedExecutionGraph
. Working on such a topic would require additional investigations which is out-of-scope for this FLIP.
Appendix
ExecutionGraphInfo Data Structure
The following tables represent the components of the ExecutionGraphInfo
structure. Here’s a summary of the color scheme:
- 🟢 refers to public members that can be serialized and deserialized in JSON.
- 🟡 members don’t need to be represented in JSON because they can be derived from other members.
- 🔴 indicates that the type or some parts of the type are not JSON-deserializable and would require (at least partially) Base64 encoding.
Several data structures (e.g. StatsSummarySnapshot
) mix up the mutable and immutable version. This FLIP might also include coming up with a clear separation of immutable instances (included in the ArchivedExecutionGraph
) and mutable instances (which are part of the ExecutionGraph
) using the Archiveable
interface.
🔴 | |
| From |
| |
|
🔴 | |
| |
| Requires Base64 encoding but allows human-readable representation |
🔴 | |
| |
|
|
| From |
| We have to block and wait for the future to complete here |
| From |
🔴 | |
|
🟢 | |
| enum |
|
|
| |
| |
|
|
| |
| enum |
| |
|
|
| |
| |
| |
| |
| |
| Could be used as a human-readable version of the accumulator |
| Base64 encoding necessary |
| essentially immutable dictionary |
| |
| |
| From |
| |
| Order can be serialized through a |
| From |
🟢 | |
| |
| |
|
🟢 | |
| mutable dictionary (which could be made immutable for the archived version) of int/long values |
| |
| |
|
🟢 | |
| |
| |
| |
| |
|
| |
---|---|
| |
| |
| |
| |
| |
|
🟢 | |
| |
| From |
| |
| |
|
🟢 | |
| enum |
| |
| |
| |
| |
| |
| |
| essentially immutable dictionary |
| |
| |
| |
| |
| |
| |
| from |
| from |
🟢 | |
| |
|
🟢 |
🟢 | |
| from |
| |
|
🟢 | |
| |
| from |
| |
| |
| from |
| from |
| from |
| from |
| from |
| from |
| |
| dictionary of |
| |
| |
| |
| |
|
🟢 dictionary of boolean values | |
|
🟢 | |
| |
| |
| enum |
| |
| |
| |
| |
|
| |
| |
| |
| |
| |
| |
| |
|
🔴 | |
| |
| |
| |
| |
| |
| |
| from |
|
🔴 | |
| |
| |
| From |
| |
| enum |
| |
| |
| |
|
🔴 | |
| |
| From |
| |
| |
| |
| |
| |
| |
| From |
| From |
| |
| From |
|
🟢 | |
| |
| |
| From |
| From |
|
| |
| |
|
🔴 | |
| |
| |
| |
| |
| |
| |
| |
| Base64 |
🟢 | |
| From |
| |
|
🟢 (can be replaced by ExceptionHistoryEntry.ArchivedTaskManagerLocation) | |
| |
| |
| Not necessary in Read-Only version |
| |
| |
| |
|
🟢 | |
| String representation of |
|