Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion

...

thread

...

...

...

...

c44pbzbl4c5qscmqj8k8m3h3123mggkp
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14474

Release1.11


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Terminology/Abbreviations

"Cluster datasets" denote collections of cluster partitions belonging to the same IntermediateDataSet.

"Cluster partitions" denote partitions that exist beyond the lifetime of a job.

...

This is in contrast to the majority of the internal partition lifecycle life-cycle management, which operates on ResultPartitionIDs,
the reasoning being that there is likely no use-case for partially deleting/reading results, as this would be inherently difficult to model in our APIs (which usually hide the concept of sub-tasks).

METHODURLResponseResponse Code
GET/partitionsdatasets
{ "partitionsdatasets": [
    { "id": "12345ABC",
      "job-namejobName": "ProducingJob",
      "job-idjobId": "123",
      "created-atcreatedAt": "2018-08-03",
      "isComplete": "true" }]}
200 OK
GET/partitionsdatasets/:partitioniddatasetid
{ "partitions": [
    { "id": "12345ABC",
      "job-namejobName": "ProducingJob",
      "job-idjobId": "123",
      "created-atcreatedAt": "2018-08-03",
"descriptors": [
<Based64 encoded byte array> ]}]}
200 OK
DELETE/partitionsdatasets/:partitioniddatasetid
{ "request-id": "54321CBA" }
202 ACCEPTED
GET/partitionsdatasets/delete-requests/:request-id
{ "status":
    { "id": "IN_PROGRESS"/"COMPLETED" }}
200 OK

Proposed Changes

On a high level, the partition life-cycle must be modified so that

...

The PartitionTracker  interface shall be extended by an additional method for the express purpose of handling the case of a successful job, which will be called from JobMaster#jobStatusChanged :

/**
* Releases all job partitions and promotes all cluster partitions for the given task executor ID, and stops the tracking of partitions that were released/promoted.
*/
void stopTrackingAndReleaseOrPromotePartitionsFor(ResourceID producingTaskExecutorId)

stopTrackingAndReleaseOrPromotePartitionsFor issues the release/promote call to the TE.

...

This RPC call must be extended to additionally pass a set of partitions to promote.

void releasePartitionsreleaseOrPromotePartitions(JobID jobId, Collection<ResultPartitionID> partitionsToRelease, Collection<ResultPartitionID> partitionsToPromote)

...

While it is technically feasible to do this as 2 RPC calls (one for releasing partitions that shouldn't be promoted, another RPC for promoting the remaining ones) this would potentially double the number of RPC invocations at the end of job.
This can be especially problematic given the possibility of many short lived jobs being executed in rapid succession.

The decision on whether to release or promote a partition can not be deferred to the TE, as it would not able to make the correct choice in case of a job restart, where all partitions should be released.

To store these partitions we can introduce another Set<ResultPartitionID> into the TE or PartitionTable.

...

For case 2) a new RPC method is required, which will be exclusively used by the RM. This method could be placed into a dedicated ClusterPartitionHolder interface that the TE implements.

void releaseClusterPartitions(Collection<ResultPartitionID> partitionsToRelease)

ResourceManager

The RM is informed about cluster partitions via heartbeats from TEs.

...

For the new shuffle service component (for now called ThinShuffleMaster ClusterPartitionShuffleClient) living in the RM we require a new interface and have to extend the ShuffleServiceFactory to create this new component:

ThinShuffleMaster<SD

ClusterPartitionShuffleClient<SD extends ShuffleDescriptor>:

Collection<SD> getClusterPartitions()

void releasePartitionExternally(SD shuffleDescriptor)

ShuffleServiceFactory:

ShuffleMaster<

ClusterPartitionShuffleClient<SD>

createThinShuffleMaster

createClusterPartitionShuffleClient(Configuration configuration)

With these changes there is a slight difference in how Flink interacts with the ShuffleMaster, as certain partitions may never be released on the ShuffleMaster after being registered.

...

We only require similar appropriate extensions to the ResourceManagerGateway for retrieving the ResultPartitionDeploymentDescriptors and listing or deleting partitionscluster datasets.

/**
 * Returns the {@link ResultPartitionDeploymentDescriptor} of each partition belonging to the {@link IntermediateResult}, identified by the given {@link IntermediateDatasetID}.
 */
CompletableFuture<Collection<ResultPartitionDeploymentDescriptor>> getResultPartitionDeploymentDescriptors(IntermediateDatasetID intermediateResultId)


/**
 * Release all partitions belonging to the {@link IntermediateResult}, identified by the given {@link IntermediateDatasetID}.
 */void
CompletableFuture<Void> releaseClusterPartition(IntermediateDatasetID intermediateResultId)

/**
* Returns all datasets for which partitions are being tracked.
*/
CompletableFuture<Map<IntermediateDataSetID, DataSetMetaInfo>> listDataSets();

Compatibility, Deprecation, and Migration Plan

...

Will be tested via unit/IT cases. IT cases can generate job graphs with vertices that create cluster partitions, which should not be cleaned up on job shutdown. The RM state can be verified through the REST API, whereas for the TE state we will have to resort to checking local storage.

Implementation notes

The core functionality required by FLIP-36 was implemented in FLINK-14474.

Some parts of the FLIP have been deferred to follow-ups:

  • extensive meta-data about producing job / creation dates
  • interactions of the ResourceManager with the ShuffleService
  • retrieval of partition descriptors through the REST API, as this may significantly warp the resource requirements which may render the heartbeat approach unfeasible

Rejected Alternatives

Single method for releasing partitions on the JM

...