Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update based on discussion feedback

...

Current state: "Under Discussion"

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...

This is in contrast to the majority of the internal partition lifecycle management, which operates on ResultPartitionIDs,
the reasoning being that there is likely no use-case for partially deleting/reading results, as this would be inherently difficult to model in our APIs (which usually hide the concept of sub-tasks).

METHODURLResponseResponse Code
GET/partitions
{ "partitions": [
    { "id": "12345ABC",
      "job-name": "ProducingJob",
      "job-id": "123",
      "created-at": "2018-08-03" }]}
200 OK
GET/partitions/:partitionid
{ "partitions": [
    { "id": "12345ABC",
      "job-name": "ProducingJob",
      "job-id": "123",
      "created-at": "2018-08-03",
"descriptors": [
<Based64 encoded byte array> ]}]}
200 OK
DELETE/partitions/:partitionid
{ "request-id": "54321CBA" }
202 ACCEPTED
GET/partitions/delete-requests/:request-id
{ "id": "IN_PROGRESS"/"COMPLETED" }
200 OK

Proposed Changes

On a high level, the partition life-cycle must be modified so that

...

All changes are only relevant for cases where jobs finish successfully.

Details:

JobMaster

Currently, at job termination, the JM iterates over all connected TEs and calls PartitionTracker#stopTrackingAndReleasePartitionsFor, causing the tracker to remove all partitions hosted on that TE and issue a release call to said TE (and shuffle service).

This behavior must be modified so that, if a job finishes successfully, global partitions are not released but promoted. This logic is encapsulated in the PartitionTracker.

The PartitionTracker must be extended to be able to distinguish between successful/failed(,canceled) jobs, so that it can make the appropriate release call to the TE.

The PartitionTracker  interface shall be extended by an additional method for the express purpose of handling the case of a successful job, which will be called from JobMaster#jobStatusChanged :

/**
* Releases all local partitions and promotes all global partitions for the given task executor ID, and stops the tracking of partitions that were released/promoted.
*/
void stopTrackingAndReleaseOrPromotePartitionsFor(ResourceID producingTaskExecutorId)

stopTrackingAndReleaseOrPromotePartitionsFor issues the release/promote call to the TE.

TaskExecutor

Currently, on job termination the JM sends a releasePartitions RPC call to all TEs, releasing all blocking partitions they have stored for the respective job.

This RPC call must be extended to additionally pass a set of partitions to promote.

void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionsToRelease, Collection<ResultPartitionID> partitionsToPromote)

The set of partitions to release may contain local and/or global partitions; the promotion set must sets may only refer to local partitions. This effectively means that the JM cannot issue release calls for global partitions.

While it is technically feasible to do this as 2 RPC calls (one for releasing partitions that shouldn't be promoted, another RPC for promoting the remaining ones) this would potentially double the number of RPC invocations at the end of job.
This can be especially problematic given the possibility of many short lived jobs being executed in rapid succession.

To store these partitions we can generalize the PartitionTable in the TE to PartitionTable<AbstractID> and store global partitions under the ResourceManagerId of the RMintroduce another Set<ResultPartitionID> into the TE or PartitionTable.


The RM is regularly informed about promoted partitions, piggybacking on the heartbeats that are already being exchanged between the two. This makes it easy to keep the state of the RM and TE in sync.


Global partitions will be cleaned up in in these 3 cases:

  1. The TaskExecutor shuts down for whatever reason.
  2. An explicit release call was issued via RPC from the RM.
  3. A heartbeat to the RM times out. This is because the RM could've issued release calls for partitions which may have been lost.

An alternative for case 3) would be to acknowledge the release of partitions, and track on the RM which delete requests have been fully processed. This is a more complex solution however, and could also be added as a follow-up.

JobMaster

The release logic on the JM that is executed on job termination must be adjusted accordingly. At job termination, the JM iterates over all connected TEs and calls PartitionTracker#stopTrackingAndReleasePartitionsFor, causing the tracker to remove all partitions hosted on that TE and issue a release call to said TE (and shuffle service).

The PartitionTracker must be extended to be able to distinguish between successful/failed(,canceled) jobs, so that it can make the appropriate release call to the TE. Fortunately there is only a single case where this is relevant: If the job finishes.

The PartitionTracker  interface shall be extended by additional methods for the express purpose of handling the case of a successful job, which will be called from JobMaster#jobStatusChanged :

...

/**
* Releases all non-persistent partitions and promotes all persistent partitions for the given task executor ID, and stops the tracking of partitions that were released/promoted.
*/
void stopTrackingAndReleaseOrPromotePartitionsFor(ResourceID producingTaskExecutorId)

For case 2) a new RPC method is required, which will be exclusively used by the RM. This method could be placed into a dedicated GlobalPartitionHolder interface that the TE implements.

void releaseGlobalPartitions(Collection<ResultPartitionID> partitionsToRelease)

ResourceManager

The RM is informed about global partitions via heartbeats from TEs.

stopTrackingAndReleaseOrPromotePartitionsFor issues the release/promote call to the TE, while getPersistedBlockingPartitions is required for a safe hand-off of the partition location data from JM to RM.

It must be ensured that at all times during the hand-off either the JM or RM is capable of cleaning up partitions on the TE should an error occur.

For example, releasing&promoting partitions on the TE must not be done before partitions were handed to the RM; if the JM crashes after the promotion but before the hand-off to the RM, the TE would neither release partitions due to the loss of the JM, nor would the RM ever issue release calls. This is problematic since the RM relies on the TE to determine whether it still hosts partitions (TaskExecutor#canBeReleased); with some partitions never being released the cluster could never shut down. One could set things up so that the RM takes over this responsibility in the context of global partitions, but it would introduce a weird inconsistency, where for local partitions we care about the actual state on the TE, but for global partitions about the supposed state stored on the RM.

As such we must first retrieve the set of to-be-promoted partitions from the tracker, pass them to the RM and afterwards issue the release/promote call. This does imply that there is a window where the partition state is not in sync within the cluster: The TE still treats all partitions as local, whereas to the RM the TE is already hosting global partitions. This shouldn't be a problem, so long as a DELETE call for a global partition is executed correctly on the TE regardless of whether the partition is global or not according to the local state.

Since the job has already terminated we do not have to worry that the partition set changes between calls.

ResourceManager

To support handing over partitions the ResourceManagerGateway must be extended to accept a set of PartitionInfos . (a Tuple2 containing the ResourceID of the producing TE and the ResultPartitionDeploymentDescriptor)

...

The given partitions should be stored in something akin to the PartitionTracker used by the JM; the core requirements are as follows:

...

The RM will require access to the ShuffleMaster for calling ShuffleMaster#releasePartitionsExternally on release global partitions.

ShuffleMaster

Currently, the shuffle master lives within the JM and is thus bound to the lifetime of a job.

Since the RM now requires access to the shuffle master, in case global partitions are released, one option may be to move this component out of the JM.

However, if the separation of JM/RM into separate processes, as outlined in FLIP-6, is ever fully realized it necessarily implies that multiple shuffle master instances may exist for a given shuffle service.

As such, to introduce this requirement for shuffle masters as early as possible, we could instead opt for creating a separate shuffle master instance for the RM.

REST API

The extensions to the REST API do not require any changes to the existing REST code, as the WebMonitorEndpoint already contains a retriever for the ResourceManagerGateway.

We only require similar appropriate extensions to the ResourceManagerGateway for retrieving the ResultPartitionDeploymentDescriptors or deleting partitions.

/**
 * Returns the {@link ResultPartitionDeploymentDescriptor} of each partition belonging to the {@link IntermediateResult}, identified by the given {@link IntermediateDatasetID}.
 */
CompletableFuture<Collection<ResultPartitionDeploymentDescriptor>> getResultPartitionDeploymentDescriptors(IntermediateDatasetID intermediateResultId)


/**
 * Release all partitions belonging to the {@link IntermediateResult}, identified by the given {@link IntermediateDatasetID}.
 */
void releaseGlobalPartition(IntermediateDatasetID intermediateResultId)

Compatibility, Deprecation, and Migration Plan

...

Will be tested via unit/IT cases. IT cases can generate job graphs with vertices that create global partitions, which should not be cleaned up on job shutdown. The RM state can be verified through the REST API, whereas for the TE state we will have to resort to checking local storage.

Rejected Alternatives

Single method for releasing partitions on the JM

Releasing partitions could be facilitated via a single method on the TE, but this needlessly gives the JM/RM more capabilities than required.

Pass partition info from JM to RM instead of via heartbeat from TM to RM

Instead of having the TE inform the RM about hosted partitions via heartbeat messages it would also be possible for the JM to inform the RM about partitions that have been promoted.

This approach requires careful handling of the handoff to ensure that either the JM or RM are at all times in a position to release partitions on the TE in case of errors, which implies interface constraints on the side of the TE since the JM/RM may issue release calls irrespective of whether partitions have been already promoted or not.

Crucially however this approach requires the JM to connect to the RM at the end of the job, which currently is only required at the very start. This makes the JM fairly independent of the RM, which is a nice property.

Persist dataset to external FS

Global partitions are only truly relevant for jobs that want to re-use partitions from previous jobs. From a pure functionality standpoint, all one really needs to do is persist the dataset to durable storage, like writing it to HDFS, and in subsequent jobs declare a source that reads that partition again.

...