Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To solve this problem, we propose to introduce a blocklist mechanism for Flink to filter out problematic resources. Following two ways will be introduced to blocklist resources:

  1. Manually specify the blocklisted blocked resources through REST API. When users find abnormal nodes/TMs, they can manually blocklist them in this way.
  2. Automatically detect abnormal resources and blocklist them. Users can specify a blocklist strategy which identifies abnormal resources. In the first version, we only introduce the blocklist strategy interface and a no-op implementation. In the future, we will introduce a configurable blocklist strategy and plugin mechanism to load user-defined blocklist strategy implementations.

...

  • cluster.resource-blocklist.enabled: Whether to enable blocklist mechanism.
  • cluster.resource-blocklist.item.timeout: Timeout for blocklisted blocked items in blocklist to be removed

...

In this design, two granularities of blocklisted blocked resources are supported: task managers and nodes. A record of blocklist information is called a blocklisted blocked item, which is generally generated by the scheduler according to the exception of the tasks. These blocklisted blocked items will be recorded in a special component and affect the resource allocation of Flink clusters. However,the blocklist items are not permanent, there will be a timeout for it. Once an item times out, it will be removed, and the resource will become available again. The overall structure of the blocklist mechanism is shown in the figure below. 

...

In JM, there will be a component (JobMasterBlocklistHandler) responsible for generating blocklisted blocked items according to the exceptions notified by Scheduler, and managing them. SlotPool will be aware of the blocklist information and filter out blocklisted blocked resources when allocating slots.

In RM, there will also be a component (ResourceManagerBlocklistHandler) responsible for managing the cluster-level blocklist. SlotManager will be aware of the blocklist information  and filter out resources in the blocklist when allocating slots from registered task managers. Similarly, when the ResourceManagerDriver requests new resources from an external resource manager (Yarn or Kubernetes), it also needs to filter out the blocklisted blocked nodes.

...

Blocked Item

A blocklisted blocked item corresponds to the blocklist information of a specific task manager or node, including the following 5 fields:

  1. type: The blocklisted blocked item type, TASK_MANAGER or NODE
  2. timestamp: The timestamp for creating this item, will be used to check timeout.
  3. cause: The cause for creating this item.
  4. identifier: The identifier of the blocklisted blocked task manager or node.
  5. action: The action when a task manager/node is marked as blocklistedblocked, including:
    1. MARK_BLOCKLISTEDBLOCKED: Just mark the task manager or node as blocklistedblocked. Future slots should not be allocated from the blocklisted blocked task manager or node. But slots that are already allocated will not be affected.
    2. MARK_BLOCKLISTEDBLOCKED_AND_EVACUATE_TASKS: Mark the task manager or node as blocklistedblocked, and evacuate all tasks on it. Evacuated tasks will be restarted on non-blocklisted blocked task managers.
Code Block
titleBlocklistedItem
/**
 * This class represents a blocklistedblocked item.
 *
 * @param <ID> Identifier of the blocklistedblocked item.
 */
public abstract class BlocklistedItem<ID>BlockedItem<ID> {
    public BlocklistedItemTypeBlockedItemType getType();

    public long getTimestamp();

    public BlocklistAction getAction();

    public Throwable getCause();

    public abstract ID getIdentifier();
}

/** This class represents a blocklistedblocked node. */
public class BlocklistedNodeBlockedNode extends BlocklistedItem<String>BlockedItem<String> {
}

/** This class represents a blocklistedblocked task manager. */
public class BlocklistedTaskManagerBlockedTaskManager extends BlocklistedItem<ResourceID>BlockedItem<ResourceID> {
}

Blocklist on JobMaster

...

JobMasterBlocklistHandler is the component in JM responsible for generating and managing blocklisted blocked items. It consists of three sub-components:BlocklistStrategy, BlocklistTracker and BlocklistContext. When receiving a new exception from Scheduler, the JobMasterBlocklistHandler will handle it as following:

  1. Generate new blocklisted blocked items by notifying the exception to the BlocklistStrategy.
  2. Add the new blocklisted blocked items to the BlocklistTracker. 
  3. Synchronize the new blocklisted blocked items to RM.
  4. Perform blocklist actions on the resources via the BlocklistContext.

...

Code Block
titleBlocklistStrategy
public interface BlocklistStrategy {
    /**
     * Generate blocklistedblocked items according to the abnormal task's location and cause.
     *
     * @param locations the abnormal tasks’ locations.
     * @param cause the cause of blocklistedblocked items.
     * @param timestamp the create timestamp of blocklistedblocked items.
     * @return the generated blocklistedblocked items.
     */
    Collection<BlocklistedItem<Collection<BlockedItem<?>> generateBlocklistedItemsgenerateBlockedItems(Collection<TaskManagerLocation> locations, Throwable cause, long timestamp);
}

...

BlocklistTracker is the component responsible for tracking blocklist items. The tracker will regularly remove timeout blocklisted blocked items.

Code Block
titleBlocklistTracker
public interface BlocklistTracker {
    /** Starts the blocklist tracker. */
    void start(ComponentMainThreadExecutor mainThreadExecutor);

    /**
     * Add new blocklistedblocked items or update existing items.
     *
     * @param items The items to add or update
     * @return Newly added or updated items.
     */
    Collection<BlocklistedItem<Collection<BlockedItem<?>> addNewBlocklistedItemsaddNewBlockedItems(Collection<BlocklistedItem<Collection<BlockedItem<?>> items);

    /** Returns whether the given task manager is blocklistedblocked. */
    boolean isBlocklistedTaskManagerisBlockedTaskManager(ResourceID resourceID);

    /** Get all blocklistedblocked nodes. */
    Set<String> getBlocklistedNodesgetBlockedNodes();

    /** Close the blocklist tracker. */
    void close();
}
     

...

Code Block
titleBlocklistContext
public interface BlocklistContext {
    /** Perform the newly added or updated blocklist items on resources. */
    void blocklistResources(Collection<BlocklistedItem<Collection<BlockedItem<?>> newlyAddedOrUpdatedItems);
}
   

...

Code Block
titleBlocklistHandler & JobMasterBlocklistHandler
public interface BlocklistHandler extends BlocklistTracker {
}

public  interface JobMasterBlocklistHandler extends/** BlocklistHandlerAdd {

a new blocked node. */**
    void * Notify an exception that may generate blocklist items.blockNode(String nodeId, BlacklistAction action, Throwable cause);

     /**
 Add a new blocked *task @param locations locations of the exception
     * @param cause the exception
     */
    void notifyException(Collection<TaskManagerLocation> locations, Throwable cause);manager. */
    void blockTaskManager(ResourceID taskManagerId, BlacklistAction action, Throwable cause);
 }

public interface JobMasterBlocklistHandler extends BlocklistHandler {
}

SlotPool

SlotPool should avoid allocating slots from blocklisted blocked task managers. Blocklisted Blocked task managers include those directly blocklisted blocked and those located on blocklisted blocked nodes. To do that, our core idea is to keep the SlotPool in such a state: there is no slot in SlotPool that is free (no task assigned) and blocklistedblocked. Details are as following:

  1. When SlotPool starts, BlocklistedTaskManagerChecker BlockedTaskManagerChecker will be passed in to check whether a task manager is blocklistedblocked.
  2. When receiving slot offers from blocklisted blocked task managers (including task managers on blocklisted blocked nodes), all offers should be rejected.
  3. When a task manager is newly blocklistedblocked, BlocklistContext will perform the following on SlotPool:
    1. If the action is MARK_BLOCKLISTEDBLOCKED, release all free(no task assigned) slots on the blocklisted blocked task manager.
    2. If the action is MARK_BLOCKLISTEDBLOCKED_AND_EVACUATE_TASKS, release all slots on the blocklisted blocked task manager.
  4. When a slot state changes from reserved(task assigned) to free(no task assigned), it will check whether the corresponding task manager is blocklistedblocked. If yes, release the slot.

...

Code Block
titleSlotPoolService
public interface BlocklistedTaskManagerCheckerBlockedTaskManagerChecker {
    /**
     * Returns whether the given task manager is blocklistedblocked.
     */
    boolean isBlocklistedTaskManagerisBlockedTaskManager(ResourceID resourceID);
}


public interface SlotPoolService {

    /** Start the encapsulated slot pool implementation. */
    void start(
            JobMasterId jobMasterId,
            String address,
            ComponentMainThreadExecutor mainThreadExecutor,
            BlocklistedTaskManagerCheckerBlockedTaskManagerChecker blocklistedTaskManagerCheckerblockedTaskManagerChecker)
            throws Exception;

   /**
     * Releases all slots belonging to the owning TaskExecutor if it has been registered.
     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
     */
    void releaseSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);

    /**
     * Releases all free slots belonging to the owning TaskExecutor if it has been registered.
     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
     */
    void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);

    //...
}

...

ResourceManagerBlocklistHandler is a new component introduced in RM for the blocklist mechanism. It has only one sub-component: BlocklistTracker, which is responsible for managing cluster-level blocklisted blocked items.

Code Block
titleResourceManagerBlocklistHandler
public interface ResourceManagerBlocklistHandler extends BlocklistHandler {
}

...

SlotManager should filter out blocklisted blocked resources when allocating registered resources. To do that, we need following changes:

  1. When starting SlotManager, the BlocklistedTaskManagerChecker BlockedTaskManagerChecker will be passed in to check whether a registered task manager is blocklistedblocked.
  2. When trying to fulfill the slot requirements by registered task managers, the blocklisted blocked ones will be filtered out.
  3. SlotManager will request new task managers from external resource managers if the registered resources cannot fulfill the requirements. The blocklist also takes effect when requesting new task managers, the details will be described in ResourceManagerDriver.
Code Block
titleSlotManager
public interface SlotManager {

    /** Starts the slot manager with the given leader id and resource manager actions. */
    void start(
            ResourceManagerId newResourceManagerId,
            Executor newMainThreadExecutor,
            ResourceActions newResourceActions,
            BlocklistedTaskManagerCheckerBlockedTaskManagerChecker newBlocklistedTaskManagerCheckernewBlockedTaskManagerChecker);

    //...
}

ResourceManagerDriver

When deploying Flink on Yarn or Kubernetes, the ResourceManagerDriver is responsible for requesting new task managers from the external resource manager. To avoid allocating task managers from blocklisted blocked nodes, we need to pass the information of blocklisted blocked nodes to ResourceManagerDriver at its initialization. 

...

Code Block
titleResourceManagerDriver
public interface BlocklistedNodeRetrieverBlockedNodeRetriever {

    /** Retrieve blocklistedblocked nodes. */
    Set<String> getBlocklistedNodesgetBlockedNodes();
}

public interface ResourceManagerDriver {

    /** Initialize the deployment specific components. */
    void initialize(
            ResourceEventHandler<WorkerType> resourceEventHandler,
            ScheduledExecutor mainThreadExecutor,
            Executor ioExecutor,
            BlocklistedNodeRetrieverBlockedNodeRetriever blocklistedNodeRetrieverblockedNodeRetriever)
            throws Exception;

    //...
}

...

The newly added or updated blocklisted blocked items will be synchronized between JM and RM via RPC: 

  1. Once a few blocklisted blocked items are newly added (or updated) to the JobMasterBlocklistHandler, RM will be notified of these items via ResourceManagerGateway#notifyNewBlocklistedItemsResourceManagerGateway#notifyNewBlockedItems.
  2. When RM receives the blocklisted blocked items notified by a JM, it will add them into ResourceManagerBlocklistHandler, and notify all JMs of the successfully added (or updated) items through JobMasterGateway#notifyNewBlocklistedItemsJobMasterGateway#notifyNewBlockedItems
  3. Similarly, when JM receives the blocklisted blocked items notified by RM, it will also add them to JobMasterBlocklistHandler.
Code Block
titleBlocklistListener
public interface BlocklistListener {

    /** Notify new blocklistedblocked items. */
    void notifyNewBlocklistedItemsnotifyNewBlockedItems(Collection<BlocklistedItem<Collection<BlockedItem<?>> newItems);
}

public interface JobManagerGateway extends BlocklistListener {
    //...
}

public interface ResourceManagerGateway extends BlocklistListener {     
    //...
}  

Enrich TaskManagerLocation with node information

In order to support blocklisted nodesblocked nodes, it is necessary to know the node where the task is located. To do that, we need to add a node identifier into TaskManagerLocation. This node identifier should be set by the resource manager when requesting new task managers.

...

We propose to introduce following Gauge metrics to ResourceManagerMetricGroup:

  1. numBlocklistedTaskMangersnumBlockedTaskMangers: The number of currently blocklisted blocked task managers (including task managers on blocklisted blocked nodes)
  2. numBlocklistedNodesnumBlockedNodes: The number of currently blocklisted blocked nodes

REST API

We will introduce following REST APIs for blocklist mechanism:

  1. REST API for querying blocklist information.
  2. REST API for adding new blocklisted blocked items.
  3. REST API for removing existing blocklisted blocked items.

Query

Add a REST API to obtain blocklist information. Each request will return all current blocklist items, which are obtained from ResourceManagerBlocklistHandler.

...

Code Block
titleResponse
{
  /** This group only contains directly blocklistedblocked task managers */
  "blocklistedTaskManagersblockedTaskManagers": [
      {
          "id" : "container_XXX_000002",
          "timestamp" : "XXX",
          "action" : "MARK_BLOCKLISTEDBLOCKED"
      },
      {
          "id" : "container_XXX_000003",
          "timestamp" : "XXX",
          "action" : "MARK_BLOCKLISTEDBLOCKED"
      }, 
      ...
  ],
  "blocklistedNodesblockedNodes": [
      {
          "id" : "node1",
          "timestamp" : "XXX",
          "action" : "MARK_BLOCKLISTEDBLOCKED"
          "taskManagers" : [“container_XXX_000004”, “container_XXX_000005”, …]
      },
      ...
  ]
}

...

Code Block
titleRequest
{
  [
      {
          "id" : "nodeXXX/container_XXX",
          "action" : "MARK_BLOCKLISTEDBLOCKED"
      },
      {           
          "id" : "nodeXXX/container_XXX",
          "action" : "MARK_BLOCKLISTEDBLOCKED"
      }, 
      ...
  ]
}

Response: {}

Remove

...