The PartitionTracker  interface shall be extended by an additional method for the express purpose of handling the case of a successful job, which will be called from JobMaster#jobStatusChanged :

* Releases all job partitions and promotes all cluster partitions for the given task executor ID, and stops the tracking of partitions that were released/promoted.
void stopTrackingAndReleaseOrPromotePartitionsFor(ResourceID producingTaskExecutorId)

stopTrackingAndReleaseOrPromotePartitionsFor issues the release/promote call to the TE.


For case 2) a new RPC method is required, which will be exclusively used by the RM. This method could be placed into a dedicated ClusterPartitionHolder interface that the TE implements.

void releaseClusterPartitions(Collection<ResultPartitionID> partitionsToRelease)


The RM is informed about cluster partitions via heartbeats from TEs.


For the new shuffle service component (for now called ThinShuffleMaster ClusterPartitionShuffleClient) living in the RM we require a new interface and have to extend the ShuffleServiceFactory to create this new component:


ClusterPartitionShuffleClient<SD extends ShuffleDescriptor>:

Collection<SD> getClusterPartitions()

void releasePartitionExternally(SD shuffleDescriptor)




createClusterPartitionShuffleClient(Configuration configuration)

With these changes there is a slight difference in how Flink interacts with the ShuffleMaster, as certain partitions may never be released on the ShuffleMaster after being registered.
