Discussion thread | https://lists.apache.org/thread/ocssfxglpc8z7cto3k8p44mrjxwr67r9 |
---|---|
Vote thread | |
JIRA | |
Release |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
We introduced FineGrainedSlotManager in FLIP-56 to allow users to set different resources for slot requests. The FineGrainedSlotManager is the same as DeclarativeSlotManager if the user does not configure the resource profiles of SlotSharingGroup.
Now these two SlotManager have some duplicated codes(eg, the delay check of checkResourceRequirements ) and some similar component names(eg, TaskManagerTracker, TaskExecutorManager). This causes duplicate development of features, such as Make ResourceActions declarative.
This FLIP aims to unify the implementation of SlotManager. Since FineGrainedSlotManager has almost all the capability of DeclarativeSlotManager, we will implement all the lacks of FineGrainedSlotManager and enable it by default.
Public Interfaces
For now, we will set ‘cluster.fine-grained-resource-management.enabled’ to true by default. This configuration option will be preserved for user fallback if there are some issues with FineGrainedSlotManager.
For long term plans, the DeclarativeSlotManager will be completely removed in the next release after the default value is changed.
Proposed Changes
Overview of FineGrainedSlotManager and DeclarativeSlotManager
Functionality supports
functionality | FineGrainedSlotManager | DeclarativeSlotManager |
---|---|---|
Allocate new task managers when resource not enough | YES | YES |
Release idle task managers if there are no tasks/resultPartitions | YES | YES |
Keep some redundant task managers | NO | YES |
Max limitations of slots number | YES | YES |
Filter out blocked resources | YES | YES |
Track requirements of multiple jobs | YES | YES |
Fulfill requirements by evenly strategy | NO | YES |
Reclaim inactive slots when job finished | NO | YES |
Different slot resources in the same task manager | YES | NO |
sub-components
DeclarativeSlotManager
There are tree subcomponents in DeclarativeSlotManager
- ResourceTracker: Track all resource requirements of multiple jobs
- SlotTracker: Track slot status(free/allocated). It's used for fulfill resource requirements with free slots.
- TaskExecutorManager: Manage the lifecycle of task managers(allocated/release/keep redundant).
The main process flow of DeclarativeSlotManager is:
- When new/updated resource requirements arrived, DeclarativeSlotManager will send it to ResourceTracker.
- When dealing with all resource requirements tracked by ResourceTracker. DeclarativeSlotManager get all free slots from SlotTracker to fulfill requirements.
- If there are also some unfulfilled requirements, DeclarativeSlotManager get all pending task manager slots from TaskExecutorManager to fulfill it.
- And the DeclarativeSlotManager will try to allocate new task managers when pending task manager slot is not enough.
FineGrainedSlotManager
There are four subcomponents in FineGrainedSlotManager
- ResourceTracker: The same as DeclarativeSlotManager#ResourceTracker.
- TaskManagerTracker: Track task managers and their resources.
- SlotStatusSyncer: Sync the slot status with task manager, and communicate with task manager to allocate/free slots.
- ResourceAllocationStrategy: Try fulfill the resource requirements with available/pending resources.
The main process flow of FineGrainedSlotManager is:
- When new/updated resource requirements arrived, FineGrainedSlotManager will send it to ResourceTracker.
- When dealing with all resource requirements tracked by ResourceTracker. FineGrainedSlotManager invoke ResourceAllocationStrategy with TaskManagerTracker.
- The ResourceAllocationStrategy will try to fulfill the resource requirements by available/pending resources orderly. It will create new pending resources if not enough.
- Then FineGrainedSlotManager will allocate new task managers according to the result of ResourceAllocationStrategy.
Add the missing capability of FineGrainedSlotManager
Use different slot matching strategy to spread out slots
SlotMatchingStrategy was introduced by FLINK-12122. It’s used for spread out slots across all registered TaskManagers. In FineGrainedSlotManager, this logic should be:
- Introduce SlotMatchingStrategy to DefaultResourceAllocationStrategy
Introduce new Interface to SlotMatchingStrategy to find the expected instance
Optional<InstanceID> findMatchingSlot( Predicate<InstanceID> isResourceMatching, Collection<InstanceID> availableTaskManagers, Function<InstanceID, Number> instanceScoreLookup);
- Add totalProfile to DefaultResourceAllocationStrategy#InternalResourceInfo to calculate the score(totalProfile.subtract(availableProfile)) of Instance.
- DefaultResourceAllocationStrategy#tryFulfillRequirementsForJobWithResources invoke SlotMatchingStrategy to find the best TaskManager to allocate resources.
Keep some redundant task managers to speed up failover
The redundant task managers are used to speed up failover.
FineGrainedSlotManager has reserved the interface of heterogeneous task managers, but there are only one implementation which will requests task managers in same resources currently. Therefore, the current redundant task managers will not consider heterogeneity. This could be considered in detail when we decide to support heterogeneous task managers.
The logic in FineGrainedSlotManager should be:
Introduce redundantTaskManagerNum to DefaultResourceAllocationStrategy
Invoke tryFulFillRedundantResourceProfiles at the end of tryFulfillRequirements. it should use the remaining registeredResources and pendingFreeResources to fulfill the redundant slot requirements(defaultSlotResourceProfile * redundantTaskManagerNum * numSlotsPerWorker) and try to add new PendingTaskManagers to resultBuilder if the resource is not enough.
tryFulFillRedundantResourceProfiles( Collection<InternalResourceInfo> registeredResources, List<InternalResourceInfo> pendingFreeResources, ResourceAllocationResult.Builder resultBuilder){}
Try reclaim inactive slots when job terminated
As described in FLINK-21751, the task manager may report free slots to RM earlier than JM when a job finishes, which causes RM to reassign slots to the finished job. It’s hard to keep a strict order for TM/JM, so we need to try to reclaim inactive slots when the job is terminated.
- Introduce freeInactiveSlots to SlotStatusSyncer
- Try to reclaim inactive slots in FineGrainedSlotManager#clearResourceRequirements
Use FineGrainedSlotManager as default SlotManager
Therefore, The FineGrainedSlotManager has the full capability of DeclarativeSlotManager. We can change the default value of cluster.fine-grained-resource-management.enabled from false to true. This option will be preserved in case some corner cases.
The DeclarativeSlotManager and related configs will be completely removed in the next release after the default value is changed.
Test Plan
Our current CI pipeline includes a stage dedicated to fine-grained resource management. To avoid adding too much overhead, this stage only runs tests from the 'flink-runtime' and 'flink-test' modules.
Before we deprecated the declarative slot manager. The FineGrainedSlotManager should be the default SlotManager, and it should pass all the tests in ci.
In addition, we have specific unit tests for the DeclarativeSlotManager, to ensure that removing it did not negatively impact the codebase. We will review all unit tests to ensure that there are no gaps in the coverage provided by the FineGrainedSlotManager.