...
Page properties |
---|
...
|
...
|
...
...
...
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Terminology/Abbreviations
"Cluster datasets" denote collections of cluster partitions belonging to the same IntermediateDataSet
.
"Cluster partitions" denote partitions that exist beyond the lifetime of a job.
...
This is in contrast to the majority of the internal partition lifecycle life-cycle management, which operates on ResultPartitionIDs
,
the reasoning being that there is likely no use-case for partially deleting/reading results, as this would be inherently difficult to model in our APIs (which usually hide the concept of sub-tasks).
METHOD | URL | Response | Response Code |
---|---|---|---|
GET | /partitionsdatasets | { "partitions | 200 OK |
GET | /partitionsdatasets/:partitioniddatasetid | { "partitions": [ | 200 OK |
DELETE | /partitionsdatasets/:partitioniddatasetid | { "request-id": "54321CBA" } | 202 ACCEPTED |
GET | /partitionsdatasets/delete-requests/:request-id | { "status": | 200 OK |
Proposed Changes
On a high level, the partition life-cycle must be modified so that
...
This RPC call must be extended to additionally pass a set of partitions to promote.
void releasePartitionsreleaseOrPromotePartitions(JobID jobId, Collection<ResultPartitionID> partitionsToRelease, Collection<ResultPartitionID> partitionsToPromote) |
...
While it is technically feasible to do this as 2 RPC calls (one for releasing partitions that shouldn't be promoted, another RPC for promoting the remaining ones) this would potentially double the number of RPC invocations at the end of job.
This can be especially problematic given the possibility of many short lived jobs being executed in rapid succession.
The decision on whether to release or promote a partition can not be deferred to the TE, as it would not able to make the correct choice in case of a job restart, where all partitions should be released.
To store these partitions we can introduce another Set<ResultPartitionID>
into the TE or PartitionTable
.
...
For case 2) a new RPC method is required, which will be exclusively used by the RM. This method could be placed into a dedicated ClusterPartitionHolder
interface that the TE implements.
void releaseClusterPartitions(Collection<ResultPartitionID> partitionsToRelease) |
...
ClusterPartitionShuffleClient<SD extends ShuffleDescriptor>:
|
ShuffleServiceFactory:
|
...
We only require similar appropriate extensions to the ResourceManagerGateway
for retrieving the ResultPartitionDeploymentDescriptors
and listing or deleting partitionscluster datasets.
/** CompletableFuture<Void> releaseClusterPartition(IntermediateDatasetID intermediateResultId) /** |
Compatibility, Deprecation, and Migration Plan
...
Will be tested via unit/IT cases. IT cases can generate job graphs with vertices that create cluster partitions, which should not be cleaned up on job shutdown. The RM state can be verified through the REST API, whereas for the TE state we will have to resort to checking local storage.
Implementation notes
The core functionality required by FLIP-36 was implemented in FLINK-14474.
Some parts of the FLIP have been deferred to follow-ups:
- extensive meta-data about producing job / creation dates
- interactions of the ResourceManager with the ShuffleService
- retrieval of partition descriptors through the REST API, as this may significantly warp the resource requirements which may render the heartbeat approach unfeasible
Rejected Alternatives
Single method for releasing partitions on the JM
...