Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update to reflect implemented state

...

JIRAFLINK-14474

Released: TBD1.11

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Terminology/Abbreviations

"Cluster datasets" denote collections of cluster partitions belonging to the same IntermediateDataSet.

"Cluster partitions" denote partitions that exist beyond the lifetime of a job.

...

This is in contrast to the majority of the internal partition lifecycle life-cycle management, which operates on ResultPartitionIDs,
the reasoning being that there is likely no use-case for partially deleting/reading results, as this would be inherently difficult to model in our APIs (which usually hide the concept of sub-tasks).

METHODURLResponseResponse Code
GET/partitionsdatasets
{ "partitionsdatasets": [
    { "id": "12345ABC",
      "job-namejobName": "ProducingJob",
      "job-idjobId": "123",
      "created-atcreatedAt": "2018-08-03",
      "isComplete": "true" }]}
200 OK
GET/partitionsdatasets/:partitioniddatasetid
{ "partitions": [
    { "id": "12345ABC",
      "job-namejobName": "ProducingJob",
      "job-idjobId": "123",
      "created-atcreatedAt": "2018-08-03",
"descriptors": [
<Based64 encoded byte array> ]}]}
200 OK
DELETE/partitionsdatasets/:partitioniddatasetid
{ "request-id": "54321CBA" }
202 ACCEPTED
GET/partitionsdatasets/delete-requests/:request-id
{ "status":
    { "id": "IN_PROGRESS"/"COMPLETED" }}
200 OK

Proposed Changes

On a high level, the partition life-cycle must be modified so that

...

This RPC call must be extended to additionally pass a set of partitions to promote.

void releasePartitionsreleaseOrPromotePartitions(JobID jobId, Collection<ResultPartitionID> partitionsToRelease, Collection<ResultPartitionID> partitionsToPromote)

...

While it is technically feasible to do this as 2 RPC calls (one for releasing partitions that shouldn't be promoted, another RPC for promoting the remaining ones) this would potentially double the number of RPC invocations at the end of job.
This can be especially problematic given the possibility of many short lived jobs being executed in rapid succession.

The decision on whether to release or promote a partition can not be deferred to the TE, as it would not able to make the correct choice in case of a job restart, where all partitions should be released.

To store these partitions we can introduce another Set<ResultPartitionID> into the TE or PartitionTable.

...

For case 2) a new RPC method is required, which will be exclusively used by the RM. This method could be placed into a dedicated ClusterPartitionHolder interface that the TE implements.

void releaseClusterPartitions(Collection<ResultPartitionID> partitionsToRelease)

...

We only require similar appropriate extensions to the ResourceManagerGateway for retrieving the ResultPartitionDeploymentDescriptors and listing or deleting partitionscluster datasets.

/**
 * Returns the {@link ResultPartitionDeploymentDescriptor} of each partition belonging to the {@link IntermediateResult}, identified by the given {@link IntermediateDatasetID}.
 */
CompletableFuture<Collection<ResultPartitionDeploymentDescriptor>> getResultPartitionDeploymentDescriptors(IntermediateDatasetID intermediateResultId)


/**
 * Release all partitions belonging to the {@link IntermediateResult}, identified by the given {@link IntermediateDatasetID}.
 */void
CompletableFuture<Void> releaseClusterPartition(IntermediateDatasetID intermediateResultId)

/**
* Returns all datasets for which partitions are being tracked.
*/
CompletableFuture<Map<IntermediateDataSetID, DataSetMetaInfo>> listDataSets();

Compatibility, Deprecation, and Migration Plan

...

Will be tested via unit/IT cases. IT cases can generate job graphs with vertices that create cluster partitions, which should not be cleaned up on job shutdown. The RM state can be verified through the REST API, whereas for the TE state we will have to resort to checking local storage.

Implementation notes

The core functionality required by FLIP-36 was implemented in FLINK-14474.

Some parts of the FLIP have been deferred to follow-ups:

  • extensive meta-data about producing job / creation dates
  • interactions of the ResourceManager with the ShuffleService
  • retrieval of partition descriptors through the REST API, as this may significantly warp the resource requirements which may render the heartbeat approach unfeasible

Rejected Alternatives

Single method for releasing partitions on the JM

...