You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current state: "Accepted"

Discussion threadhere

JIRAFLINK-14474

Released: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-36 proposes a new programming paradigm where jobs are built incrementally by the user.

To support this in an efficient manner the partition life-cycle should be extended to support the notion of cluster partitions, which are partitions that can exist beyond the life-time of a job.

These partitions could then be re-used by subsequent jobs in a fairly efficient manner, as they don't have to persisted to an external storage first and consuming tasks could be scheduled to exploit data-locality.

Note that this FLIP does not concern itself with the usage of cluster partitions, including client-side APIs, job-submission, scheduling and reading said partitions.

Terminology/Abbreviations

"Cluster partitions" denote partitions that exist beyond the lifetime of a job.

"Job partitions" denote partitions that only exist within the lifetime of a job.

"Promotion" denotes the process of marking a job partition as a cluster partition.

"TE": TaskExecutor

"RM": ResourceManager

"JM": JobMaster

Public Interfaces

The REST API shall be extended to provide a listing of existing cluster partitions, and expose functionality for retrieving or deleting such a partition.

These calls operate on IntermediateDataSetIDs, identifying an IntermediateResult which is a collection of all ResultPartitions produced by a vertex.

This is in contrast to the majority of the internal partition lifecycle management, which operates on ResultPartitionIDs,
the reasoning being that there is likely no use-case for partially deleting/reading results, as this would be inherently difficult to model in our APIs (which usually hide the concept of sub-tasks).

METHODURLResponseResponse Code
GET/partitions
{ "partitions": [
    { "id": "12345ABC",
      "job-name": "ProducingJob",
      "job-id": "123",
      "created-at": "2018-08-03" }]}
200 OK
GET/partitions/:partitionid
{ "partitions": [
    { "id": "12345ABC",
      "job-name": "ProducingJob",
      "job-id": "123",
      "created-at": "2018-08-03",
"descriptors": [
<Based64 encoded byte array> ]}]}
200 OK
DELETE/partitions/:partitionid
{ "request-id": "54321CBA" }
202 ACCEPTED
GET/partitions/delete-requests/:request-id
{ "id": "IN_PROGRESS"/"COMPLETED" }
200 OK

Proposed Changes

On a high level, the partition life-cycle must be modified so that

  1. TEs retain cluster partitions for successful jobs
  2. TEs inform the ResourceManager about available cluster partitions via heartbeats
  3. the REST API needs to be extended to allow interactions with the ResourceManager

All changes are only relevant for cases where jobs finish successfully.

Details:

JobMaster

Currently, at job termination, the JM iterates over all connected TEs and calls PartitionTracker#stopTrackingAndReleasePartitionsFor, causing the tracker to remove all partitions hosted on that TE and issue a release call to said TE (and shuffle service).

This behavior must be modified so that, if a job finishes successfully, cluster partitions are not released but promoted. This logic is encapsulated in the PartitionTracker.

The PartitionTracker must be extended to be able to distinguish between successful/failed(,canceled) jobs, so that it can make the appropriate release call to the TE.

The PartitionTracker  interface shall be extended by an additional method for the express purpose of handling the case of a successful job, which will be called from JobMaster#jobStatusChanged :

/**
* Releases all job partitions and promotes all cluster partitions for the given task executor ID, and stops the tracking of partitions that were released/promoted.
*/
void stopTrackingAndReleaseOrPromotePartitionsFor(ResourceID producingTaskExecutorId)

stopTrackingAndReleaseOrPromotePartitionsFor issues the release/promote call to the TE.

TaskExecutor

Currently, on job termination the JM sends a releasePartitions RPC call to all TEs, releasing all blocking partitions they have stored for the respective job.

This RPC call must be extended to additionally pass a set of partitions to promote.

void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionsToRelease, Collection<ResultPartitionID> partitionsToPromote)

The partitions sets may only refer to job partitions. This effectively means that the JM cannot issue release calls for cluster partitions.

While it is technically feasible to do this as 2 RPC calls (one for releasing partitions that shouldn't be promoted, another RPC for promoting the remaining ones) this would potentially double the number of RPC invocations at the end of job.
This can be especially problematic given the possibility of many short lived jobs being executed in rapid succession.

To store these partitions we can introduce another Set<ResultPartitionID> into the TE or PartitionTable.


The RM is regularly informed about promoted partitions, piggybacking on the heartbeats that are already being exchanged between the two. This makes it easy to keep the state of the RM and TE in sync.


Cluster partitions will be cleaned up in in these 3 cases:

  1. The TaskExecutor shuts down for whatever reason.
  2. An explicit release call was issued via RPC from the RM.
  3. A heartbeat to the RM times out. This is because the RM could've issued release calls for partitions which may have been lost.

An alternative for case 3) would be to acknowledge the release of partitions, and track on the RM which delete requests have been fully processed. This is a more complex solution however, and could also be added as a follow-up.

For case 2) a new RPC method is required, which will be exclusively used by the RM. This method could be placed into a dedicated ClusterPartitionHolder interface that the TE implements.

void releaseClusterPartitions(Collection<ResultPartitionID> partitionsToRelease)

ResourceManager

The RM is informed about cluster partitions via heartbeats from TEs.

The given partitions should be stored in something akin to the PartitionTracker used by the JM; the core requirements are as follows:

  • support fast look-ups for the set of partitions for a given TE (identified by a ResourceID), for easier book-keeping updates in case a TE goes down
  • support fast look-ups for the set of partitions for a given IntermediateResult, to facilitate performant deletions of descriptor look-ups

To fully support external shuffle services the RM will require access to trimmed down ShuffleMaster-like component to issue release calls and list available partitions. A first version may omit this part, limiting support to the NettyShuffleService.

ShuffleService

For the new shuffle service component (for now called ClusterPartitionShuffleClient) living in the RM we require a new interface and have to extend the ShuffleServiceFactory to create this new component:

ClusterPartitionShuffleClient<SD extends ShuffleDescriptor>:

Collection<SD> getClusterPartitions()

void releasePartitionExternally(SD shuffleDescriptor)

ShuffleServiceFactory:

ClusterPartitionShuffleClient<SD> createClusterPartitionShuffleClient(Configuration configuration)

With these changes there is a slight difference in how Flink interacts with the ShuffleMaster, as certain partitions may never be released on the ShuffleMaster after being registered.

Additionally, in contrast to the ShuffleMaster the new component is not informed pre-emptively about a partition to be released (in contrast to the ShuffleMaster, on which all partitions are first registered), the implication being that the release of partitions must be possible without relying on local state.

REST API

The extensions to the REST API do not require any changes to the existing REST code, as the WebMonitorEndpoint already contains a retriever for the ResourceManagerGateway.

We only require similar appropriate extensions to the ResourceManagerGateway for retrieving the ResultPartitionDeploymentDescriptors or deleting partitions.

/**
 * Returns the {@link ResultPartitionDeploymentDescriptor} of each partition belonging to the {@link IntermediateResult}, identified by the given {@link IntermediateDatasetID}.
 */
CompletableFuture<Collection<ResultPartitionDeploymentDescriptor>> getResultPartitionDeploymentDescriptors(IntermediateDatasetID intermediateResultId)


/**
 * Release all partitions belonging to the {@link IntermediateResult}, identified by the given {@link IntermediateDatasetID}.
 */
void releaseClusterPartition(IntermediateDatasetID intermediateResultId)

Compatibility, Deprecation, and Migration Plan

This is an extension of the existing partition life-cycle and does not impact existing functionality.

Test Plan

Will be tested via unit/IT cases. IT cases can generate job graphs with vertices that create cluster partitions, which should not be cleaned up on job shutdown. The RM state can be verified through the REST API, whereas for the TE state we will have to resort to checking local storage.

Rejected Alternatives

Single method for releasing partitions on the JM

Releasing partitions could be facilitated via a single method on the TE, but this needlessly gives the JM/RM more capabilities than required.

Pass partition info from JM to RM instead of via heartbeat from TM to RM

Instead of having the TE inform the RM about hosted partitions via heartbeat messages it would also be possible for the JM to inform the RM about partitions that have been promoted.

This approach requires careful handling of the handoff to ensure that either the JM or RM are at all times in a position to release partitions on the TE in case of errors, which implies interface constraints on the side of the TE since the JM/RM may issue release calls irrespective of whether partitions have been already promoted or not.

Crucially however this approach requires the JM to connect to the RM at the end of the job, which currently is only required at the very start. This makes the JM fairly independent of the RM, which is a nice property.

Persist dataset to external FS

Cluster partitions are only truly relevant for jobs that want to re-use partitions from previous jobs. From a pure functionality standpoint, all one really needs to do is persist the dataset to durable storage, like writing it to HDFS, and in subsequent jobs declare a source that reads that partition again.

The downsides to this approach are that an additional medium for persisting data is required (which will require a fair amount of maintenance overhead since these partitions should be relatively short-lived) and added latency due to writing/reading the data. In contrast, cluster partitions are just written into local storage, and, with tasks potentially being deployed onto the same machine as the partition, the process of reading the data can be as efficient as reading from any blocking partition on the same machine. Additionally, API's can trigger the cleanup of partitions through the REST API, thus not requiring any additional dependencies or permissions.

  • No labels