Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this design, only support manually specifying blocked resources via the REST API, an auto-detection may be introduced in the future.

Public Interfaces

Configurations

We propose to introduce following configuration options for blocklist:

  • cluster.resource-blocklist.enabled: Whether to enable blocklist mechanism.
  • cluster.resource-blocklist.item.timeout: Timeout for blocked items in blocklist to be removed

Proposed Changes

In this design, two granularities of blocked resources are supported: task managers and nodes. A record of blocklist information is called a blocked item, which is generally generated by the scheduler according to the exception of the tasks. These blocked items will be recorded in a special component and affect the resource allocation of Flink clusters. However,the blocked items are not permanent, there will be a timeout for it. Once an item times out, it will be removed, and the resource will become available again. The overall structure of the blocklist mechanism is shown in the figure below. 

Image Removed

In JM, there will be a component (JobMasterBlocklistHandler) responsible for managing all blocked items and performing them on SlotPool. 

In RM, there will also be a component (ResourceManagerBlocklistHandler) responsible for managing the cluster-level blocklist. SlotManager will be aware of the blocklist information  and filter out resources in the blocklist when allocating slots from registered task managers. Similarly, when the ResourceManagerDriver requests new resources from an external resource manager (Yarn or Kubernetes), it also needs to filter out the blocked nodes.

Blocked Item

A blocked item corresponds to the blocklist information of a specific task manager or node, including the following 6 fields:

  1. type: The blocked item type, TASK_MANAGER or NODE
  2. timestamp: The timestamp of creating this item.
  3. endTimestamp: The timestamp at which the item should be removed.
  4. cause: The cause for creating this item.
  5. identifier: The identifier of the blocked task manager or node.
  6. action: The action when a task manager/node is marked as blocked.
Code Block
titleBlocklistedItem
/**
 * This class represents a blocked item.
 *
 * @param <ID> Identifier of the blocked item.
 */
public abstract class BlockedItem<ID> {
    public BlockedItemType getType();

    public long getTimestamp();

    public long getEndTimestamp();
    
    public BlockAction getAction();

    public Throwable getCause();

    public abstract ID getIdentifier();
}

/** This class represents a blocked node. */
public class BlockedNode extends BlockedItem<String> {
}

/** This class represents a blocked task manager. */
public class BlockedTaskManager extends BlockedItem<ResourceID> {
}

Blocklist on JobMaster

JobMasterBlocklistHandler

JobMasterBlocklistHandler is the component in JM responsible for managing all blocked items and performing them on SlotPool. It consists of two sub-components:BlocklistTracker and BlocklistContext. When receiving a new blocked items, the JobMasterBlocklistHandler will handle it as following:

  1. Add the new blocked items to the BlocklistTracker. 
  2. Synchronize the new blocked items to RM.
  3. Perform block actions on the resources via the BlocklistContext.

BlocklistTracker

BlocklistTracker is the component responsible for tracking blocked items. The tracker will regularly remove timeout blocked items.

Code Block
titleBlocklistTracker
public interface BlocklistTracker {
    /** Starts the blocklist tracker. */
    void start(ComponentMainThreadExecutor mainThreadExecutor);

    /**
     * Add new blocked items or update existing items.
     *
     * @param items The items to add or update
     * @return Newly added or updated items.
     */
    Collection<BlockedItem<?>> addNewBlockedItems(Collection<BlockedItem<?>> items);

    /** Returns whether the given task manager is blocked. */
    boolean isBlockedTaskManager(ResourceID resourceID);

    /** Get all blocked nodes. */
    Set<String> getBlockedNodes();

    /** Close the blocklist tracker. */
    void close();
}
     

BlocklistContext

BlocklistContext is the component responsible for performing block actions on SlotPool, the details will be described in SlotPool.

Code Block
titleBlocklistContext
public interface BlocklistContext {
    /** Perform the newly added or updated blocked items on resources. */
    void blocklistResources(Collection<BlockedItem<?>> newlyAddedOrUpdatedItems);
}
   
Code Block
titleBlocklistHandler & JobMasterBlocklistHandler
public interface BlocklistHandler extends BlocklistTracker {
    /** Add a new blocked node. */
    void blockNode(String nodeId, BlockAction action, Throwable cause, long timestamp, long endTimestamp);

    /** Add a new blocked task manager. */
    void blockTaskManager(ResourceID taskManagerId, BlockAction action, Throwable cause, long timestamp, long endTimestamp);
 }

public interface JobMasterBlocklistHandler extends BlocklistHandler {
}

SlotPool

SlotPool should avoid allocating slots from blocked task managers. Blocked task managers include those directly blocked and those located on blocked nodes. To do that, our core idea is to keep the SlotPool in such a state: there is no slot in SlotPool that is free (no task assigned) and blocked. Details are as following:

  1. When SlotPool starts, BlockedTaskManagerChecker will be passed in to check whether a task manager is blocked.
  2. When receiving slot offers from blocked task managers (including task managers on blocked nodes), all offers should be rejected.
  3. When a task manager is newly blocked, BlocklistContext will perform the following on SlotPool:
    1. If the action is MARK_BLOCKED, release all free(no task assigned) slots on the blocked task manager.
    2. If the action is MARK_BLOCKED_AND_EVACUATE_TASKS, release all slots on the blocked task manager.
  4. When a slot state changes from reserved(task assigned) to free(no task assigned), it will check whether the corresponding task manager is blocked. If yes, release the slot.

Metrics

We propose to introduce following Gauge metrics:

  1. numBlockedTaskMangers: The number of currently blocked task managers (including task managers on blocked nodes)
  2. numBlockedNodes: The number of currently blocked nodes

REST API

We propose to introduce following REST APIs for blocklist mechanism:

  1. REST API for querying blocklist information.
  2. REST API for adding new blocked items.
  3. REST API for removing existing blocked items.

Query

Add a REST API to obtain blocklist information. Each request will return all current blocked items, which are obtained from ResourceManagerBlocklistHandler.

GET: http://{jm_rest_address:port}/blocklist

Request: {}

Response:

Code Block
titleResponse
{
  /** This group only contains directly blocked task managers */
  "blockedTaskManagers": [
      {
          "id" : "container_XXX_000002",
          "timestamp" : "XXX",
          "action" : "MARK_BLOCKED"
      },
      {
          "id" : "container_XXX_000003",
          "timestamp" : "XXX",
          "action" : "MARK_BLOCKED"
      }, 
      ...
  ],
  "blockedNodes": [
      {
          "id" : "node1",
          "timestamp" : "XXX",
          "action" : "MARK_BLOCKED"
          "taskManagers" : [“container_XXX_000004”, “container_XXX_000005”, …]
      },
      ...
  ]
}

Add

POST: http://{jm_rest_address:port}/blocklist/nodes

POST: http://{jm_rest_address:port}/blocklist/taskmanagers

Request:

Code Block
titleRequest
{
  [
      {
          "id" : "nodeXXX/container_XXX",
          "action" : "MARK_BLOCKED"
      },
      {           
          "id" : "nodeXXX/container_XXX",
          "action" : "MARK_BLOCKED"
      }, 
      ...
  ]
}

Response: {}

Remove

DELETE: http://{jm_rest_address:port}/blocklist/node/<id>/<action>

DELETE: http://{jm_rest_address:port}/blocklist/taskmanager/<id>/<action>

Request: {}

Response: {}

Proposed Changes

In this design, two granularities of blocked resources are supported: task managers and nodes. A record of blocklist information is called a blocked item, which is generally generated by the scheduler according to the exception of the tasks. These blocked items will be recorded in a special component and affect the resource allocation of Flink clusters. However,the blocked items are not permanent, there will be a timeout for it. Once an item times out, it will be removed, and the resource will become available again. The overall structure of the blocklist mechanism is shown in the figure below. 

Image Added

In JM, there will be a component (JobMasterBlocklistHandler) responsible for managing all blocked items and performing them on SlotPool. 

In RM, there will also be a component (ResourceManagerBlocklistHandler) responsible for managing the cluster-level blocklist. SlotManager will be aware of the blocklist information  and filter out resources in the blocklist when allocating slots from registered task managers. Similarly, when the ResourceManagerDriver requests new resources from an external resource manager (Yarn or Kubernetes), it also needs to filter out the blocked nodes.

Blocked Item

A blocked item corresponds to the blocklist information of a specific task manager or node, including the following 6 fields:

  1. type: The blocked item type, TASK_MANAGER or NODE
  2. timestamp: The timestamp of creating this item.
  3. endTimestamp: The timestamp at which the item should be removed.
  4. cause: The cause for creating this item.
  5. identifier: The identifier of the blocked task manager or node.
  6. action: The action when a task manager/node is marked as blocked.
Code Block
titleBlocklistedItem
/**
 * This class represents a blocked item.
 *
 * @param <ID> Identifier of the blocked item.
 */
public abstract class BlockedItem<ID> {
    public BlockedItemType getType();

    public long getTimestamp();

    public long getEndTimestamp();
    
    public BlockAction getAction();

    public Throwable getCause();

    public abstract ID getIdentifier();
}

/** This class represents a blocked node. */
public class BlockedNode extends BlockedItem<String> {
}

/** This class represents a blocked task manager. */
public class BlockedTaskManager extends BlockedItem<ResourceID> {
}

Blocklist on JobMaster

JobMasterBlocklistHandler

JobMasterBlocklistHandler is the component in JM responsible for managing all blocked items and performing them on SlotPool. It consists of two sub-components:BlocklistTracker and BlocklistContext. When receiving a new blocked items, the JobMasterBlocklistHandler will handle it as following:

  1. Add the new blocked items to the BlocklistTracker. 
  2. Synchronize the new blocked items to RM.
  3. Perform block actions on the resources via the BlocklistContext.

BlocklistTracker

BlocklistTracker is the component responsible for tracking blocked items. The tracker will regularly remove timeout blocked items.

Code Block
titleBlocklistTracker
public interface BlocklistTracker {
    /** Starts the blocklist tracker.
Code Block
titleSlotPoolService
public interface BlockedTaskManagerChecker {
    /**
     * Returns whether the given task manager is blocked.
     */
    boolean isBlockedTaskManager(ResourceID resourceID);
}


public interface SlotPoolService {

    /** Start the encapsulated slot pool implementation. */
    void start(
            JobMasterId jobMasterId,
            String address,
            ComponentMainThreadExecutor mainThreadExecutor,
            BlockedTaskManagerChecker blockedTaskManagerChecker)
            throws Exception;

   /**
     * Releases all slots belonging to the owning TaskExecutor if it has been registered.
     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
     */
    void releaseSlotsOnTaskManagerstart(ResourceID taskManagerId, Exception causeComponentMainThreadExecutor mainThreadExecutor);

    /**
     * ReleasesAdd allnew freeblocked slotsitems belongingor toupdate the owning TaskExecutor if it has been registeredexisting items.
     *
     * @param taskManagerIditems identifyingThe theitems TaskExecutor
to add or update
     * @param@return causeNewly causeadded foror failing the slotsupdated items.
     */
    voidCollection<BlockedItem<?>> releaseFreeSlotsOnTaskManageraddNewBlockedItems(ResourceID taskManagerId, Exception causeCollection<BlockedItem<?>> items);

    //...
}

Blocklist on ResourceManager

ResourceManagerBlocklistHandler

ResourceManagerBlocklistHandler is a new component introduced in RM for the blocklist mechanism. It has only one sub-component: BlocklistTracker, which is responsible for managing cluster-level blocked items.

Code Block
titleResourceManagerBlocklistHandler
public interface ResourceManagerBlocklistHandler extends BlocklistHandler {
}

SlotManager

SlotManager should filter out blocked resources when allocating registered resources. To do that, we need following changes:

  1. When starting SlotManager, the BlockedTaskManagerChecker will be passed in to check whether a registered task manager is blocked.
  2. When trying to fulfill the slot requirements by registered task managers, the blocked ones will be filtered out.
  3. SlotManager will request new task managers from external resource managers if the registered resources cannot fulfill the requirements. The blocklist also takes effect when requesting new task managers, the details will be described in ResourceManagerDriver.
** Returns whether the given task manager is blocked. */
    boolean isBlockedTaskManager(ResourceID resourceID);

    /** Get all blocked nodes. */
    Set<String> getBlockedNodes();

    /** Close the blocklist tracker. */
    void close();
}
     

BlocklistContext

BlocklistContext is the component responsible for performing block actions on SlotPool, the details will be described in SlotPool.

Code Block
titleBlocklistContext
public interface BlocklistContext {
Code Block
titleSlotManager
public interface SlotManager {

    /** StartsPerform the slotnewly manageradded withor theupdated givenblocked leaderitems id and resource manager actionson resources. */
    void start(blocklistResources(Collection<BlockedItem<?>> newlyAddedOrUpdatedItems);
}
   


Code Block
titleBlocklistHandler & JobMasterBlocklistHandler
public interface BlocklistHandler extends BlocklistTracker {
    ResourceManagerId newResourceManagerId,
     /** Add a new blocked node. */
    void blockNode(String nodeId, ExecutorBlockAction newMainThreadExecutoraction,
 Throwable cause, long timestamp, long endTimestamp);

    /** Add ResourceActionsa newResourceActions,
new blocked task manager. */
    void blockTaskManager(ResourceID taskManagerId, BlockAction BlockedTaskManagerChecker newBlockedTaskManagerChecker);

    //...
}

ResourceManagerDriver

When deploying Flink on Yarn or Kubernetes, the ResourceManagerDriver is responsible for requesting new task managers from the external resource manager. To avoid allocating task managers from blocked nodes, we need to pass the information of blocked nodes to ResourceManagerDriver at its initialization. 

ResourceManagerDriver uses following APIs to tell external resource managers about the information of blocked nodes:

  1. Yarn: AMRMClient#updateBlacklist 
  2. Kubernetes: NodeAffinity
action, Throwable cause, long timestamp, long endTimestamp);
 }

public interface JobMasterBlocklistHandler extends BlocklistHandler {
}

SlotPool

SlotPool should avoid allocating slots from blocked task managers. Blocked task managers include those directly blocked and those located on blocked nodes. To do that, our core idea is to keep the SlotPool in such a state: there is no slot in SlotPool that is free (no task assigned) and blocked. Details are as following:

  1. When SlotPool starts, BlockedTaskManagerChecker will be passed in to check whether a task manager is blocked.
  2. When receiving slot offers from blocked task managers (including task managers on blocked nodes), all offers should be rejected.
  3. When a task manager is newly blocked, BlocklistContext will perform the following on SlotPool:
    1. If the action is MARK_BLOCKED, release all free(no task assigned) slots on the blocked task manager.
    2. If the action is MARK_BLOCKED_AND_EVACUATE_TASKS, release all slots on the blocked task manager.
  4. When a slot state changes from reserved(task assigned) to free(no task assigned), it will check whether the corresponding task manager is blocked. If yes, release the slot.

Code Block
titleSlotPoolService
public interface BlockedTaskManagerChecker {
    /**
     * Returns whether the given task manager is blocked.
     */
    boolean isBlockedTaskManager(ResourceID resourceID);
}


public interface SlotPoolService
Code Block
titleResourceManagerDriver
public interface BlockedNodeRetriever {

    /** Retrieve blocked nodes. */
    Set<String> getBlockedNodes();
}

public interface ResourceManagerDriver {

    /** InitializeStart the encapsulated deploymentslot specificpool componentsimplementation. */
    void initializestart(
            ResourceEventHandler<WorkerType>JobMasterId resourceEventHandlerjobMasterId,
            ScheduledExecutorString mainThreadExecutoraddress,
            ExecutorComponentMainThreadExecutor ioExecutormainThreadExecutor,
            BlockedNodeRetrieverBlockedTaskManagerChecker blockedNodeRetrieverblockedTaskManagerChecker)
            throws Exception;

   /**
     * Releases all slots belonging to the owning TaskExecutor if it has been //..registered.
}

Synchronize Blocklist between JM & RM

The newly added or updated blocked items will be synchronized between JM and RM via RPC: 

  1. Once a few blocked items are newly added (or updated) to the JobMasterBlocklistHandler, RM will be notified of these items via ResourceManagerGateway#notifyNewBlockedItems.
  2. When RM receives the blocked items notified by a JM, it will add them into ResourceManagerBlocklistHandler, and notify all JMs of the successfully added (or updated) items through JobMasterGateway#notifyNewBlockedItems. 
  3. Similarly, when JM receives the blocked items notified by RM, it will also add them to JobMasterBlocklistHandler.
Code Block
titleBlocklistListener
public interface BlocklistListener {     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
     */
    void releaseSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);

    /**
 Notify new blocked items. */
 Releases all free void notifyNewBlockedItems(Collection<BlockedItem<?>> newItems);
}

public interface JobManagerGateway extends BlocklistListener {
    //...
}

public interface ResourceManagerGateway extends BlocklistListener {     
    //...
}  

Enrich TaskManagerLocation with node information

In order to support blocked nodes, it is necessary to know the node where the task is located. To do that, we need to add a node identifier into TaskManagerLocation. This node identifier should be set by the resource manager when requesting new task managers.

Code Block
titleTaskManagerLocation
public class TaskManagerLocation {

    public String getNodeId(slots belonging to the owning TaskExecutor if it has been registered.
     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
     */
    void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);

    //...
}


Blocklist on ResourceManager

Metrics

We propose to introduce following Gauge metrics to ResourceManagerMetricGroup:

  1. numBlockedTaskMangers: The number of currently blocked task managers (including task managers on blocked nodes)
  2. numBlockedNodes: The number of currently blocked nodes

REST API

We will introduce following REST APIs for blocklist mechanism:

  1. REST API for querying blocklist information.
  2. REST API for adding new blocked items.
  3. REST API for removing existing blocked items.

Query

Add a REST API to obtain blocklist information. Each request will return all current blocked items, which are obtained from ResourceManagerBlocklistHandler.

GET: http://{jm_rest_address:port}/blocklist

Request: {}

Response:

ResourceManagerBlocklistHandler

ResourceManagerBlocklistHandler is a new component introduced in RM for the blocklist mechanism. It has only one sub-component: BlocklistTracker, which is responsible for managing cluster-level blocked items.

Code Block
titleResourceManagerBlocklistHandler
public interface ResourceManagerBlocklistHandler extends BlocklistHandler {
}

SlotManager

SlotManager should filter out blocked resources when allocating registered resources. To do that, we need following changes:

  1. When starting SlotManager, the BlockedTaskManagerChecker will be passed in to check whether a registered task manager is blocked.
  2. When trying to fulfill the slot requirements by registered task managers, the blocked ones will be filtered out.
  3. SlotManager will request new task managers from external resource managers if the registered resources cannot fulfill the requirements. The blocklist also takes effect when requesting new task managers, the details will be described in ResourceManagerDriver.
Code Block
titleSlotManager
public interface SlotManager {

    /** Starts the slot manager with the given leader id and resource manager actions. */
    void start(
            ResourceManagerId newResourceManagerId,
            Executor newMainThreadExecutor
Code Block
titleResponse
{
  /** This group only contains directly blocked task managers */
  "blockedTaskManagers": [
      {
          "id" : "container_XXX_000002",
          "timestamp" : "XXX",
          "action" : "MARK_BLOCKED"
      },
      {
      ResourceActions newResourceActions,
   "id" : "container_XXX_000003",
       BlockedTaskManagerChecker newBlockedTaskManagerChecker);

  "timestamp" : "XXX",
          "action" : "MARK_BLOCKED"
      }, 
      ...
  ],
  "blockedNodes": [
      {
          "id" : "node1",
          "timestamp" : "XXX",
          "action" : "MARK_BLOCKED"
          "taskManagers" : [“container_XXX_000004”, “container_XXX_000005”, …]
      },
      ...
  ]
}

In order to get blocklist information, we need to add an interface to ResourceManagerGateway:

Code Block
titleResourceManagerGateway
public interface ResourceManagerGateway {
   CompletableFuture<BlocklistInfo> requestBlocklist(@RpcTimeout Time timeout);
   // ...
}

Add

POST: http://{jm_rest_address:port}/blocklist/nodes

POST: http://{jm_rest_address:port}/blocklist/taskmanagers

Request:

Code Block
titleRequest
{
  [
      {
          "id" : "nodeXXX/container_XXX",
          "action" : "MARK_BLOCKED"
      },
      {           
          "id" : "nodeXXX/container_XXX",
          "action" : "MARK_BLOCKED"
      }, 
      ...
  ]
}

Response: {}

Remove

DELETE: http://{jm_rest_address:port}/blocklist/node/<id>/<action>

DELETE: http://{jm_rest_address:port}/blocklist/taskmanager/<id>/<action>

Request: {}

...

 //...
}

ResourceManagerDriver

When deploying Flink on Yarn or Kubernetes, the ResourceManagerDriver is responsible for requesting new task managers from the external resource manager. To avoid allocating task managers from blocked nodes, we need to pass the information of blocked nodes to ResourceManagerDriver at its initialization. 

ResourceManagerDriver uses following APIs to tell external resource managers about the information of blocked nodes:

  1. Yarn: AMRMClient#updateBlacklist 
  2. Kubernetes: NodeAffinity
Code Block
titleResourceManagerDriver
public interface BlockedNodeRetriever {

    /** Retrieve blocked nodes. */
    Set<String> getBlockedNodes();
}

public interface ResourceManagerDriver {

    /** Initialize the deployment specific components. */
    void initialize(
            ResourceEventHandler<WorkerType> resourceEventHandler,
            ScheduledExecutor mainThreadExecutor,
            Executor ioExecutor,
            BlockedNodeRetriever blockedNodeRetriever)
            throws Exception;

    //...
}

Synchronize Blocklist between JM & RM

The newly added or updated blocked items will be synchronized between JM and RM via RPC: 

  1. Once a few blocked items are newly added (or updated) to the JobMasterBlocklistHandler, RM will be notified of these items via ResourceManagerGateway#notifyNewBlockedItems.
  2. When RM receives the blocked items notified by a JM, it will add them into ResourceManagerBlocklistHandler, and notify all JMs of the successfully added (or updated) items through JobMasterGateway#notifyNewBlockedItems. 
  3. Similarly, when JM receives the blocked items notified by RM, it will also add them to JobMasterBlocklistHandler.
Code Block
titleBlocklistListener
public interface BlocklistListener {

    /** Notify new blocked items. */
    void notifyNewBlockedItems(Collection<BlockedItem<?>> newItems);
}

public interface JobManagerGateway extends BlocklistListener {
    //...
}

public interface ResourceManagerGateway extends BlocklistListener {     
    //...
}  

Enrich TaskManagerLocation with node information

In order to support blocked nodes, it is necessary to know the node where the task is located. To do that, we need to add a node identifier into TaskManagerLocation. This node identifier should be set by the resource manager when requesting new task managers.

Code Block
titleTaskManagerLocation
public class TaskManagerLocation {

    public String getNodeId();

    //...
}


Compatibility, Deprecation, and Migration Plan

...