Status
Current state: "Under Discussion"
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
FLIP-36 proposes a new programming paradigm where jobs are built incrementally by the user.
To support this in an efficient manner the partition life-cycle should be extended to support the notion of global partitions, which are partitions that can exist beyond the life-time of a job.
These partitions could then be re-used by subsequent jobs in a fairly efficient manner, as they don't have to persisted to an external storage first and consuming tasks could be scheduled to exploit data-locality.
Note that this FLIP does not concern itself with the usage of global partitions, including client-side APIs, job-submission, scheduling and reading said partitions.
Terminology/Abbreviations
"Global partitions" denote partitions that exist beyond the lifetime of a job.
"Local partitions" denote partitions that only exist within the lifetime of a job.
"Promotion" denotes the process of marking a local partition as global.
"TE": TaskExecutor
"RM": ResourceManager
"JM": JobMaster
Public Interfaces
The REST API shall be extended to provide a listing of existing global partitions, and expose functionality for retrieving or deleting such a partition.
These calls operate on IntermediateDataSetID
s, identifying an IntermediateResult
which is a collection of all ResultPartitions
produced by a vertex.
This is in contrast to the majority of the internal partition lifecycle management, which operates on ResultPartitionIDs,
the reasoning being that there is likely no use-case for partially deleting/reading results, as this would be inherently difficult to model in our APIs (which usually hide the concept of sub-tasks).
METHOD | URL | Response | Response Code |
---|---|---|---|
GET | /partitions | { "partitions": [ | 200 OK |
GET | /partitions/:partitionid | { "partitions": [ | 200 OK |
DELETE | /partitions/:partitionid | { "request-id": "54321CBA" } | 202 ACCEPTED |
GET | /partitions/delete-requests/:request-id | { "id": "IN_PROGRESS"/"COMPLETED" } | 200 OK |
Proposed Changes
On a high level, the partition life-cycle must be modified so that
- TEs retain global partitions for successful jobs
- JobMasters hand the tracking information for global partitions to the ResourceManager
- the REST API needs to be extended to allow interactions with the ResourceManager
All changes are only relevant for cases where jobs finish successfully.
Details:
TaskExecutor
Currently, on job termination the JM sends a releasePartitions
RPC call to all TEs, releasing all blocking partitions they have stored for the respective job.
This RPC call must be extended to additionally pass a set of partitions to promote.
void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionsToRelease, Collection<ResultPartitionID> partitionsToPromote) |
The set of partitions to release may contain local and/or global partitions; the promotion set must only refer to local partitions.
While it is technically feasible to do this as 2 RPC calls (one for releasing partitions that shouldn't be promoted, another RPC for promoting the remaining ones) this would potentially double the number of RPC invocations at the end of job.
This can be especially problematic given the possibility of many short lived jobs being executed in rapid succession.
To store these partitions we can generalize the PartitionTable
in the TE to PartitionTable<AbstractID
> and store global partitions under the ResourceManagerId
of the RM.
Global partitions will be cleaned up in in these 3 cases:
- The TaskExecutor shuts down for whatever reason.
- An explicit release call was issued via RPC.
- A heartbeat to the RM times out. This is because the RM could've issued release calls for partitions which may have been lost.
An alternative for case 3) would be to acknowledge the release of partitions, and track on the RM which delete requests have been fully processed. This is a more complex solution however, and could also be added as a follow-up.
JobMaster
The release logic on the JM that is executed on job termination must be adjusted accordingly. At job termination, the JM iterates over all connected TEs and calls PartitionTracker#stopTrackingAndReleasePartitionsFor
, causing the tracker to remove all partitions hosted on that TE and issue a release call to said TE (and shuffle service).
The PartitionTracker
must be extended to be able to distinguish between successful/failed(,canceled) jobs, so that it can make the appropriate release call to the TE. Fortunately there is only a single case where this is relevant: If the job finishes.
The PartitionTracker
interface shall be extended by additional methods for the express purpose of handling the case of a successful job, which will be called from JobMaster#jobStatusChanged
:
/** /** |
stopTrackingAndReleaseOrPromotePartitionsFor
issues the release/promote call to the TE, while getPersistedBlockingPartitions
is required for a safe hand-off of the partition location data from JM to RM.
It must be ensured that at all times during the hand-off either the JM or RM is capable of cleaning up partitions on the TE should an error occur.
For example, releasing&promoting partitions on the TE must not be done before partitions were handed to the RM; if the JM crashes after the promotion but before the hand-off to the RM, the TE would neither release partitions due to the loss of the JM, nor would the RM ever issue release calls. This is problematic since the RM relies on the TE to determine whether it still hosts partitions (TaskExecutor#canBeReleased
); with some partitions never being released the cluster could never shut down. One could set things up so that the RM takes over this responsibility in the context of global partitions, but it would introduce a weird inconsistency, where for local partitions we care about the actual state on the TE, but for global partitions about the supposed state stored on the RM.
As such we must first retrieve the set of to-be-promoted partitions from the tracker, pass them to the RM and afterwards issue the release/promote call. This does imply that there is a window where the partition state is not in sync within the cluster: The TE still treats all partitions as local, whereas to the RM the TE is already hosting global partitions. This shouldn't be a problem, so long as a DELETE call for a global partition is executed correctly on the TE regardless of whether the partition is global or not according to the local state.
Since the job has already terminated we do not have to worry that the partition set changes between calls.
ResourceManager
To support handing over partitions the ResourceManagerGateway
must be extended to accept a set of PartitionInfos
. (a Tuple2
containing the ResourceID
of the producing TE and the ResultPartitionDeploymentDescriptor
)
void registerGlobalPartitions(Collection<PartitionInfo>) |
The given partitions should be stored in something akin to the PartitionTracker
used by the JM; the core requirements are as follows:
- support fast look-ups for the set of partitions for a given TE (identified by a ResourceID), for easier book-keeping updates in case a TE goes down
- support fast look-ups for the set of partitions for a given IntermediateResult, to facilitate performant deletions of descriptor look-ups
The RM will require access to the ShuffleMaster
for calling ShuffleMaster#releasePartitionsExternally
on release global partitions.
REST API
The extensions to the REST API do not require any changes to the existing REST code, as the WebMonitorEndpoint
already contains a retriever for the ResourceManagerGateway
.
We only require similar appropriate extensions to the ResourceManagerGateway
for retrieving the ResultPartitionDeploymentDescriptors
or deleting partitions.
/** |
Compatibility, Deprecation, and Migration Plan
This is an extension of the existing partition life-cycle and does not impact existing functionality.
Test Plan
Will be tested via unit/IT cases. IT cases can generate job graphs with vertices that create global partitions, which should not be cleaned up on job shutdown. The RM state can be verified through the REST API, whereas for the TE state we will have to resort to checking local storage.
Rejected Alternatives
Global partitions are only truly relevant for jobs that want to re-use partitions from previous jobs. From a pure functionality standpoint, all one really needs to do is persist the dataset to durable storage, like writing it to HDFS, and in subsequent jobs declare a source that reads that partition again.
The downsides to this approach are that an additional medium for persisting data is required (which will require a fair amount of maintenance overhead since these partitions should be relatively short-lived) and added latency due to writing/reading the data. In contrast, global partitions are just written into local storage, and, with tasks potentially being deployed onto the same machine as the partition, the process of reading the data can be as efficient as reading from any blocking partition on the same machine. Additionally, API's can trigger the cleanup of partitions through the REST API, thus not requiring any additional dependencies or permissions.