...
Page properties |
---|
...
|
...
|
...
...
...
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Terminology/Abbreviations
"Cluster datasets" denote collections of cluster partitions belonging to the same IntermediateDataSet
.
"Cluster partitions" denote partitions that exist beyond the lifetime of a job.
...
This is in contrast to the majority of the internal partition lifecycle life-cycle management, which operates on ResultPartitionIDs
,
the reasoning being that there is likely no use-case for partially deleting/reading results, as this would be inherently difficult to model in our APIs (which usually hide the concept of sub-tasks).
METHOD | URL | Response | Response Code |
---|---|---|---|
GET | /partitionsdatasets | { "partitions | 200 OK |
GET | /partitionsdatasets/:partitioniddatasetid | { "partitions": [ | 200 OK |
DELETE | /partitionsdatasets/:partitioniddatasetid | { "request-id": "54321CBA" } | 202 ACCEPTED |
GET | /partitionsdatasets/delete-requests/:request-id | { "status": | 200 OK |
Proposed Changes
On a high level, the partition life-cycle must be modified so that
...
The PartitionTracker
interface shall be extended by an additional method for the express purpose of handling the case of a successful job, which will be called from JobMaster#jobStatusChanged
:
/** |
stopTrackingAndReleaseOrPromotePartitionsFor
issues the release/promote call to the TE.
...
This RPC call must be extended to additionally pass a set of partitions to promote.
void releasePartitionsreleaseOrPromotePartitions(JobID jobId, Collection<ResultPartitionID> partitionsToRelease, Collection<ResultPartitionID> partitionsToPromote) |
...
While it is technically feasible to do this as 2 RPC calls (one for releasing partitions that shouldn't be promoted, another RPC for promoting the remaining ones) this would potentially double the number of RPC invocations at the end of job.
This can be especially problematic given the possibility of many short lived jobs being executed in rapid succession.
The decision on whether to release or promote a partition can not be deferred to the TE, as it would not able to make the correct choice in case of a job restart, where all partitions should be released.
To store these partitions we can introduce another Set<ResultPartitionID>
into the TE or PartitionTable
.
...
For case 2) a new RPC method is required, which will be exclusively used by the RM. This method could be placed into a dedicated ClusterPartitionHolder
interface that the TE implements.
void releaseClusterPartitions(Collection<ResultPartitionID> partitionsToRelease) |
ResourceManager
The RM is informed about cluster partitions via heartbeats from TEs.
...
For the new shuffle service component (for now called ThinShuffleMaster
ClusterPartitionShuffleClient
) living in the RM we require a new interface and have to extend the ShuffleServiceFactory to create this new component:
ClusterPartitionShuffleClient<SD extends ShuffleDescriptor>:
|
ShuffleServiceFactory: |
|
|
With these changes there is a slight difference in how Flink interacts with the ShuffleMaster
, as certain partitions may never be released on the ShuffleMaster
after being registered.
...
We only require similar appropriate extensions to the ResourceManagerGateway
for retrieving the ResultPartitionDeploymentDescriptors
and listing or deleting partitionscluster datasets.
/** CompletableFuture<Void> releaseClusterPartition(IntermediateDatasetID intermediateResultId) /** |
Compatibility, Deprecation, and Migration Plan
...
Will be tested via unit/IT cases. IT cases can generate job graphs with vertices that create cluster partitions, which should not be cleaned up on job shutdown. The RM state can be verified through the REST API, whereas for the TE state we will have to resort to checking local storage.
Implementation notes
The core functionality required by FLIP-36 was implemented in FLINK-14474.
Some parts of the FLIP have been deferred to follow-ups:
- extensive meta-data about producing job / creation dates
- interactions of the ResourceManager with the ShuffleService
- retrieval of partition descriptors through the REST API, as this may significantly warp the resource requirements which may render the heartbeat approach unfeasible
Rejected Alternatives
Single method for releasing partitions on the JM
...