You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: "Under discussion"

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: None create yet

Released: Target 1.15

Motivation

Currently, Flink processes (JobManager and TaskManager) can only persist information via Flink's HA services. The stored information is usually cluster-scoped, meaning that it is currently not easily possible to store process specific information. Moreover, for storing large blobs Flink currently needs to go to a remote object store from which it needs to download these blobs when restarting. If the Flink processes were restarted on the same node or on a node where the same volume was mounted to, then we could simply use the local disk/mounted volume to store process specific information. For example, we could use this volume to store local state. This would allow us to use local state when recovering from a process failure. Flink could also store the cached blobs on this volume which would avoid having to download them again from the BlobServer.

Since we cannot be sure that a used volume can be remounted or that Flink gets redeployed on the same node, we can use this storage only as a cache for information. This means that there must always be another place from where we can retrieve the required information in case that it is no longer locally cached.

Proposed Changes

We propose to introduce a working directory for Flink processes. This working directory can be used by Flink processes to store instance specific information that might be reusable in case of a process failover. Per default the working directory will be created in the temporary directory of the node Flink is running on. The path will include the ResourceID of the process to make it unique. The user can specify the working directory root to store the working directory on a specific volume.

By assigning a deterministic id for a Flink a process (across restarts), a restarted Flink process will be started with the same working directory.

Given <WORKING_DIRECTORY_BASE> and <RESOURCE_ID>, the working directory would be <WORKING_DIRECTORY_BASE>/<RESOURCE_ID>.

Public Interfaces

In order to configure the working directory base, we propose to introduce the process.working-dir, process.jobmanager.working-dir and process.taskmanager.working-dir  configuration options. Per default, process.working-dir  will default to System.getProperty("java.io.tmpdir") and the process specific configuration options will default to process.working-dir if not configured.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

  • No labels