Status

Discussion threadhttps://lists.apache.org/thread/fngkk52kjbc6b6v9nn0lkfq6hhsbgb1h
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.16

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In order to support speculative execution for batch jobs(FLIP-168), we need a mechanism to block resources on nodes where the slow tasks are located. We propose to introduce a blocklist mechanism as follows:  Once a node is marked as blocked, future slots should not be allocated from the blocked node, but the slots that are already allocated will not be affected.

Proposed Changes

We introduce a new data structure: BlockedNode to record the information of a blocked node. This information will be recorded in a special component and affect the resource allocation of Flink clusters. However,the blocking of nodes is not permanent, there will be a timeout for it. Once it times out, the nodes will become available again. The overall structure of the blocklist mechanism is shown in the figure below. 

In JM, there will be a component (JobMasterBlocklistHandler) responsible for managing all BlockedNode(s) and performing them on SlotPool. 

In RM, there will also be a component (ResourceManagerBlocklistHandler) responsible for managing the cluster-level blocklist. SlotManager will be aware of the blocklist information and filter out resources in the blocklist when allocating slots from registered task managers. Similarly, when the ResourceManagerDriver requests new resources from an external resource manager (Yarn or Kubernetes), it also needs to filter out the blocked nodes.

BlockedNode

A BlockedNode corresponds to the blocked information of a specific node, including the following 4 fields:

  1. id: The identifier of the node.
  2. startTimestamp: The start time of the blocking.
  3. endTimestamp: The end time of the blocking (at which time the node will become available again). 
  4. cause: The cause for blocking this node.
BlockedNode
/**
 * This class represents a blocked node.
 */
public class BlockedNode {
    
    public long getStartTimestamp();

    public long getEndTimestamp();
    
    public String getCause();

    public String getId();
}

Blocklist on JobMaster

JobMasterBlocklistHandler

JobMasterBlocklistHandler is the component in JM responsible for managing all blocked node information and performing them on SlotPool. It consists of two sub-components:BlocklistTracker and BlocklistContext. When receiving a new blocked node information, the JobMasterBlocklistHandler will handle it as following:

  1. Add the new blocked nodes to the BlocklistTracker. 
  2. Synchronize the new blocked nodes to RM.
  3. Block the resources on newly added nodes via the BlocklistContext.

BlocklistTracker

BlocklistTracker is the component responsible for tracking blocked nodes. The tracker will regularly remove timeout blocked nodes.

BlocklistTracker
public interface BlocklistTracker {

    /**
     * Add or update blocked nodes.
     *
     * @param nodes The nodes to add or update
     * @return Newly added or updated nodes.
     */
    Collection<BlockedNode> addNewBlockedNodes(Collection<BlockedNode> nodes);

    /** Returns whether the given task manager is located on blocked nodes. */
    boolean isBlockedTaskManager(ResourceID taskManagerId);

    /** Get all blocked nodes. */
    Set<String> getBlockedNodes();

    /** Remove the timeout nodes. */
    void removeTimeoutNodes();
}
     

BlocklistContext

BlocklistContext is the component responsible for blocking slots in SlotPool, the details will be described in SlotPool.

BlocklistContext
public interface BlocklistContext {
    /** Block resources on the newly added nodes. */
    void blockResources(Collection<BlockedNode> newlyAddedOrUpdatedNodes);
}
   
BlocklistHandler & JobMasterBlocklistHandler
public interface BlocklistHandler {

    /** Add a new blocked node. */
    void blockNode(String nodeId, String cause, long startTimestamp, long endTimestamp);

    /** Returns whether the given task manager is located on blocked nodes. */
    boolean isBlockedTaskManager(ResourceID taskManagerId); 

    /** Get all blocked nodes. */
    Set<String> getBlockedNodes();    
}

public interface JobMasterBlocklistHandler extends BlocklistHandler {
}

SlotPool

SlotPool should avoid allocating slots that located on blocked nodes. To do that, our core idea is to keep the SlotPool in such a state: there is no slot in SlotPool that is free (no task assigned) and located on blocked nodes. Details are as following:

  1. When receiving slot offers from task managers located on blocked nodes, all offers should be rejected.
  2. When a node is newly blocked, we should release all free(no task assigned) slots on it. We need to find all task managers on blocked nodes and release all free slots on them by SlotPoolService#releaseFreeSlotsOnTaskManager.
  3. When a slot state changes from reserved(task assigned) to free(no task assigned), it will check whether the corresponding task manager is blocked. If yes, release the slot.

We will introduce a new slot pool implementation: BlocklistSlotPool, which extends the DefaultDeclarativeSlotPool and overrides some methods to implement the blocklist-related functions described above, and the blocklist information will be passed in the constructor. This new implementation will be only used when the blocklist is enabled.

SlotPoolService
public interface SlotPoolService {
    /**
     * Releases all free slots belonging to the owning TaskExecutor if it has been registered.
     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
     */
    void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);

    //...
}
BlocklistSlotPool
public class BlocklistSlotPool extends DefaultDeclarativeSlotPool {
    // ...
}

Blocklist on ResourceManager

ResourceManagerBlocklistHandler

ResourceManagerBlocklistHandler is a new component introduced in RM for the blocklist mechanism. It has only one sub-component: BlocklistTracker, which is responsible for managing cluster-level blocklist.

ResourceManagerBlocklistHandler
public interface ResourceManagerBlocklistHandler extends BlocklistHandler {
}

SlotManager

SlotManager should filter out blocked resources when allocating registered resources. To do that, we need following changes:

  1. When starting SlotManager, the BlockedTaskManagerChecker will be passed in to check whether a registered task manager is located on blocked nodes.
  2. When trying to fulfill the slot requirements by registered task managers, the task managers located on blocked nodes will be filtered out.
  3. SlotManager will request new task managers from external resource managers if the registered resources cannot fulfill the requirements. The blocklist also takes effect when requesting new task managers, the details will be described in ResourceManagerDriver.
BlockedTaskManagerChecker
/** This checker helps to query whether a given task manager is located on blocked nodes. */
public interface BlockedTaskManagerChecker {

    /** Returns whether the given task manager is located on blocked nodes. */
    boolean isBlockedTaskManager(ResourceID taskManagerId);
}
SlotManager
public interface SlotManager {

    /** Starts the slot manager with the given leader id and resource manager actions. */
    void start(
            ResourceManagerId newResourceManagerId,
            Executor newMainThreadExecutor,
            ResourceActions newResourceActions,
            BlockedTaskManagerChecker newBlockedTaskManagerChecker);

    //...
}

ResourceManagerDriver

When deploying Flink on Yarn or Kubernetes, the ResourceManagerDriver is responsible for requesting new task managers from the external resource manager. To avoid allocating task managers from blocked nodes, we need to pass the information of blocked nodes to ResourceManagerDriver at its initialization. 

ResourceManagerDriver uses following APIs to tell external resource managers about the information of blocked nodes:

  1. Yarn: AMRMClient#updateBlacklist 
  2. Kubernetes: NodeAffinity
ResourceManagerDriver
public interface BlockedNodeRetriever {

    /** Retrieve blocked nodes. */
    Set<String> getBlockedNodes();
}

public interface ResourceManagerDriver {

    /** Initialize the deployment specific components. */
    void initialize(
            ResourceEventHandler<WorkerType> resourceEventHandler,
            ScheduledExecutor mainThreadExecutor,
            Executor ioExecutor,
            BlockedNodeRetriever blockedNodeRetriever)
            throws Exception;

    //...
}

Synchronize Blocklist between JM & RM

The newly added/updated blocked nodes will be synchronized between JM and RM via RPC: 

  1. Once a few blocked nodes are newly added/updated to the JobMasterBlocklistHandler, RM will be notified of these nodes via ResourceManagerGateway#notifyNewBlockedNodes.
  2. When RM receives the blocked nodes notified by a JM, it will add them into ResourceManagerBlocklistHandler, and notify all JMs of the successfully added/updated nodes through JobMasterGateway#notifyNewBlockedNodes
  3. Similarly, when JM receives the blocked nodes notified by RM, it will also add them to JobMasterBlocklistHandler.
BlocklistListener
public interface BlocklistListener {

    /** Notify newly added/updated blocked nodes. */
    void notifyNewBlockedNodes(Collection<BlockedNode> newlyAddedOrUpdatedNodes);
}

public interface JobManagerGateway extends BlocklistListener {
    //...
}

public interface ResourceManagerGateway extends BlocklistListener {     
    //...
}  

Enrich TaskManagerLocation with node information

In order to support blocked nodes, it is necessary to know the node where the task is located. To do that, we need to add a node identifier into TaskManagerLocation. This node identifier should be set by the resource manager when requesting new task managers.

TaskManagerLocation
public class TaskManagerLocation {

    public String getNodeId();

    //...
}


Compatibility, Deprecation, and Migration Plan

The blocklist mechanism is only used when speculative execution is enabled, which entails that Flink's default behavior won't change.

Limitations

No integration with Flink's web UI

This FLIP does not modify the web UI, but it's in our plan. Currently, we have a preliminary idea of the UI intergration, including improve the slots information displaying and add a page to show the blocklist information.

No support for JM failover

Currently, the blocklist information will be lost after JM failover.  In the future, we may persist the blocklist information in HA to support JM failover.


Test Plan

  1. The changes will be covered by unit and IT cases.
  2. Test the functionality in a real Standanlone/Yarn/Kubernetes cluster.

Rejected Alternatives

No rejected alternatives yet.