Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • cluster.resource-blocklist.enabled: Whether to enable blacklist blocklist mechanism.
  • cluster.resource-blocklist.item.timeout: Timeout for blacklisted blocklisted items in blacklist blocklist to be removed
  • cluster.resource-blocklist.item.timeout-check-interval: The check interval for blacklist blocklist items timeout.

Proposed Changes

In this design, two granularities of blacklisted blocklisted resources are supported: task managers and nodes. A record of blacklist blocklist information is called a blacklisted blocklisted item, which is generally generated by the scheduler according to the exception of the tasks. These blacklisted blocklisted items will be recorded in a special component and affect the resource allocation of Flink clusters. However,the blacklist blocklist items are not permanent, there will be a timeout for it. Once an item times out, it will be removed, and the resource will become available again. The overall structure of the blacklist blocklist mechanism is shown in the figure below. 

...

In JM, there will be a component (JobMasterBlacklistHandlerJobMasterBlocklistHandler) responsible for generating blacklisted blocklisted items according to the exceptions notified by Scheduler, and managing them. SlotPool will be aware of the blacklist blocklist information and filter out blacklisted blocklisted resources when allocating slots.

In RM, there will also be a component (ResourceManagerBlacklistHandlerResourceManagerBlocklistHandler) responsible for managing the cluster-level blacklistblocklist. SlotManager will be aware of the blacklist blocklist information  and filter out resources in the blacklist blocklist when allocating slots from registered task managers. Similarly, when the ResourceManagerDriver requests new resources from an external resource manager (Yarn or Kubernetes), it also needs to filter out the blacklisted blocklisted nodes.

...

Blocklisted Item

A blacklisted blocklisted item corresponds to the blacklist blocklist information of a specific task manager or node, including the following 5 fields:

  1. type: The blacklisted blocklisted item type, TASK_MANAGER or NODE
  2. timestamp: The timestamp for creating this item, will be used to check timeout.
  3. cause: The cause for creating this item.
  4. identifier: The identifier of the blacklisted blocklisted task manager or node.
  5. action: The action when a task manager/node is marked as blacklistedblocklisted, including:
    1. MARK_BLACKLISTEDBLOCKLISTED: Just mark the task manager or node as blacklistedblocklisted. Future slots should not be allocated from the blacklisted blocklisted task manager or node. But slots that are already allocated will not be affected.
    2. MARK_BLACKLISTEDBLOCKLISTED_AND_EVACUATE_TASKS: Mark the task manager or node as blacklistedblocklisted, and evacuate all tasks on it. Evacuated tasks will be restarted on non-blacklisted blocklisted task managers.
Code Block
titleBlacklistedItem
/**
 * This class represents a blacklistedblocklisted item.
 *
 * @param <ID> Identifier of the blacklistedblocklisted item.
 */
public abstract class BlacklistedItem<ID>BlocklistedItem<ID> {
    public BlacklistedItemTypeBlocklistedItemType getType();

    public long getTimestamp();

    public BlacklistActionBlocklistAction getAction();

    public Throwable getCause();

    public abstract ID getIdentifier();
}

/** This class represents a blacklistedblocklisted node. */
public class BlacklistedNodeBlocklistedNode extends BlacklistedItem<String>BlocklistedItem<String> {
}

/** This class represents a blacklistedblocklisted task manager. */
public class BlacklistedTaskManagerBlocklistedTaskManager extends BlacklistedItem<ResourceID>BlocklistedItem<ResourceID> {
}

...

Blocklist on JobMaster

...

JobMasterBlocklistHandler

JobMasterBlacklistHandler JobMasterBlocklistHandler is the component in JM responsible for generating and managing blacklisted blocklisted items. It consists of three sub-components:BlacklistStrategycomponents:BlocklistStrategy, BlacklistTracker BlocklistTracker and BlacklistContextBlocklistContext. When receiving a new exception from Scheduler, the JobMasterBlacklistHandler JobMasterBlocklistHandler will handle it as following:

  1. Generate new blacklisted blocklisted items by notifying the exception to the BlacklistStrategyBlocklistStrategy.
  2. Add the new blacklisted blocklisted items to the BlacklistTrackerBlocklistTracker
  3. Synchronize the new blacklisted blocklisted items to RM.
  4. Perform blacklist blocklist actions on the resources via the BlacklistContextBlocklistContext.

...

BlocklistStrategy

BlacklistStrategy BlocklistStrategy is the component responsible for generating blacklist blocklist items according to the exceptions and their locations notified by Scheduler. We can introduce different BlacklistStrategy BlocklistStrategy implementations to adapt to different scenarios. In the first version, we will introduce a no-op implementation as default implementation. In the future, we will introduce a configurable blacklist blocklist strategy and plugin mechanism to load user-defined blacklist blocklist strategy implementations, details will be described in Future improvements.

Code Block
titleBlacklistStrategy
public interface BlacklistStrategyBlocklistStrategy {
    /**
     * Generate blacklistedblocklisted items according to the abnormal task's location and cause.
     *
     * @param locations the abnormal tasks’ locations.
     * @param cause the cause of blacklistedblocklisted items.
     * @param timestamp the create timestamp of blacklistedblocklisted items.
     * @return the generated blacklistedblocklisted items.
     */
    Collection<BlacklistedItem<Collection<BlocklistedItem<?>> generateBlacklistedItemsgenerateBlocklistedItems(Collection<TaskManagerLocation> locations, Throwable cause, long timestamp);
}

...

BlocklistTracker

BlacklistTracker BlocklistTracker is the component responsible for tracking blacklist blocklist items. The tracker will regularly remove timeout blacklisted blocklisted items.

Code Block
titleBlacklistTracker
public interface BlacklistTrackerBlocklistTracker {
    /** Starts the blacklistblocklist tracker. */
    void start(ComponentMainThreadExecutor mainThreadExecutor);

    /**
     * Add new blacklistedblocklisted items or update existing items.
     *
     * @param items The items to add or update
     * @return Newly added or updated items.
     */
    Collection<BlacklistedItem<Collection<BlocklistedItem<?>> addNewBlacklistedItemsaddNewBlocklistedItems(Collection<BlacklistedItem<Collection<BlocklistedItem<?>> items);

    /** Returns whether the given task manager is blacklistedblocklisted. */
    boolean isBlacklistedTaskManagerisBlocklistedTaskManager(ResourceID resourceID);

    /** Get all blacklistedblocklisted nodes. */
    Set<String> getBlacklistedNodesgetBlocklistedNodes();

    /** Close the blacklistblocklist tracker. */
    void close();
}
     

...

BlocklistContext

BlacklistContext BlocklistContext is the component responsible for performing blacklist blocklist actions on SlotPool, the details will be described in SlotPool.

Code Block
titleBlacklistContext
public interface BlacklistContextBlocklistContext {
    /** Perform the newly added or updated blacklistblocklist items on resources. */
    void blacklistResourcesblocklistResources(Collection<BlacklistedItem<Collection<BlocklistedItem<?>> newlyAddedOrUpdatedItems);
}
   

...

Code Block
titleBlacklistHandler & JobMasterBlacklistHandler
public interface BlacklistHandlerBlocklistHandler extends BlacklistTrackerBlocklistTracker {
}

public interface JobMasterBlacklistHandlerJobMasterBlocklistHandler extends BlacklistHandlerBlocklistHandler {

    /**
     * Notify an exception that may generate blacklistblocklist items.
     *
     * @param locations locations of the exception
     * @param cause the exception
     */
    void notifyException(Collection<TaskManagerLocation> locations, Throwable cause);
}

...

SlotPool should avoid allocating slots from blacklisted blocklisted task managers. Blacklisted Blocklisted task managers include those directly blacklisted blocklisted and those located on blacklisted blocklisted nodes. To do that, our core idea is to keep the SlotPool in such a state: there is no slot in SlotPool that is free (no task assigned) and blacklistedblocklisted. Details are as following:

  1. When SlotPool starts, BlacklistedTaskManagerChecker BlocklistedTaskManagerChecker will be passed in to check whether a task manager is blacklistedblocklisted.
  2. When receiving slot offers from blacklisted blocklisted task managers (including task managers on blacklisted blocklisted nodes), all offers should be rejected.
  3. When a task manager is newly blacklistedblocklisted, BlacklistContext BlocklistContext will perform the following on SlotPool:
    1. If the action is MARK_BLACKLISTEDBLOCKLISTED, release all free(no task assigned) slots on the blacklisted blocklisted task manager.
    2. If the action is MARK_BLACKLISTEDBLOCKLISTED_AND_EVACUATE_TASKS, release all slots on the blacklisted blocklisted task manager.
  4. When a slot state changes from reserved(task assigned) to free(no task assigned), it will check whether the corresponding task manager is blacklistedblocklisted. If yes, release the slot.

...

Code Block
titleSlotPoolService
public interface BlacklistedTaskManagerCheckerBlocklistedTaskManagerChecker {
    /**
     * Returns whether the given task manager is blacklistedblocklisted.
     */
    boolean isBlacklistedTaskManagerisBlocklistedTaskManager(ResourceID resourceID);
}


public interface SlotPoolService {

    /** Start the encapsulated slot pool implementation. */
    void start(
            JobMasterId jobMasterId,
            String address,
            ComponentMainThreadExecutor mainThreadExecutor,
            BlacklistedTaskManagerCheckerBlocklistedTaskManagerChecker blacklistedTaskManagerCheckerblocklistedTaskManagerChecker)
            throws Exception;

   /**
     * Releases all slots belonging to the owning TaskExecutor if it has been registered.
     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
     */
    void releaseSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);

    /**
     * Releases all free slots belonging to the owning TaskExecutor if it has been registered.
     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
     */
    void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);

    //...
}

...


Blocklist on ResourceManager

...

ResourceManagerBlocklistHandler

ResourceManagerBlacklistHandler ResourceManagerBlocklistHandler is a new component introduced in RM for the blacklist blocklist mechanism. It has only one sub-component: BlacklistTrackerBlocklistTracker, which is responsible for managing cluster-level blacklisted blocklisted items.

Code Block
titleResourceManagerBlacklistHandler
public interface ResourceManagerBlacklistHandlerResourceManagerBlocklistHandler extends BlacklistHandlerBlocklistHandler {
}

SlotManager

SlotManager should filter out blacklisted blocklisted resources when allocating registered resources. To do that, we need following changes:

  1. When starting SlotManager, the BlacklistedTaskManagerChecker BlocklistedTaskManagerChecker will be passed in to check whether a registered task manager is blacklistedblocklisted.
  2. When trying to fulfill the slot requirements by registered task managers, the blacklisted blocklisted ones will be filtered out.
  3. SlotManager will request new task managers from external resource managers if the registered resources cannot fulfill the requirements. The blacklist blocklist also takes effect when requesting new task managers, the details will be described in ResourceManagerDriver.
Code Block
titleSlotManager
public interface SlotManager {

    /** Starts the slot manager with the given leader id and resource manager actions. */
    void start(
            ResourceManagerId newResourceManagerId,
            Executor newMainThreadExecutor,
            ResourceActions newResourceActions,
            BlacklistedTaskManagerCheckerBlocklistedTaskManagerChecker newBlacklistedTaskManagerCheckernewBlocklistedTaskManagerChecker);

    //...
}

ResourceManagerDriver

When deploying Flink on Yarn or Kubernetes, the ResourceManagerDriver is responsible for requesting new task managers from the external resource manager. To avoid allocating task managers from blacklisted blocklisted nodes, we need to pass the information of blacklisted blocklisted nodes to ResourceManagerDriver at its initialization. 

ResourceManagerDriver uses following APIs to tell external resource managers about the information of blacklist blocklist nodes:

  1. Yarn: AMRMClient#updateBlacklist 
  2. Kubernetes: NodeAffinity
Code Block
titleResourceManagerDriver
public interface BlacklistedNodeRetrieverBlocklistedNodeRetriever {

    /** Retrieve blacklistedblocklisted nodes. */
    Set<String> getBlacklistedNodesgetBlocklistedNodes();
}

public interface ResourceManagerDriver {

    /** Initialize the deployment specific components. */
    void initialize(
            ResourceEventHandler<WorkerType> resourceEventHandler,
            ScheduledExecutor mainThreadExecutor,
            Executor ioExecutor,
            BlacklistedNodeRetrieverBlocklistedNodeRetriever blacklistedNodeRetrieverblocklistedNodeRetriever)
            throws Exception;

    //...
}

Synchronize

...

Blocklist between JM & RM

The newly added or updated blacklisted blocklisted items will be synchronized between JM and RM via RPC: 

  1. Once a few blacklisted blocklisted items are newly added (or updated) to the JobMasterBlacklistHandlerJobMasterBlocklistHandler, RM will be notified of these items via ResourceManagerGateway#notifyNewBlacklistedItemsResourceManagerGateway#notifyNewBlocklistedItems.
  2. When RM receives the blacklisted blocklisted items notified by a JM, it will add them into ResourceManagerBlacklistHandlerResourceManagerBlocklistHandler, and notify all JMs of the successfully added (or updated) items through JobMasterGateway#notifyNewBlacklistedItemsJobMasterGateway#notifyNewBlocklistedItems
  3. Similarly, when JM receives the blacklisted blocklisted items notified by RM, it will also add them to JobMasterBlacklistHandlerJobMasterBlocklistHandler.
Code Block
titleBlacklistListener
public interface BlacklistListenerBlocklistListener {

    /** Notify new blacklistedblocklisted items. */
    void notifyNewBlacklistedItemsnotifyNewBlocklistedItems(Collection<BlacklistedItem<Collection<BlocklistedItem<?>> newItems);
}

public interface JobManagerGateway extends BlacklistListenerBlocklistListener {
    //...
}

public interface ResourceManagerGateway extends BlacklistListenerBlocklistListener {     
    //...
}  

Enrich TaskManagerLocation with node information

In order to support blacklisting nodesblocklisted nodes, it is necessary to know the node where the task is located. To do that, we need to add a node identifier into TaskManagerLocation. This node identifier should be set by the resource manager when requesting new task managers.

...

We propose to introduce following Gauge metrics to ResourceManagerMetricGroup:

  1. numBlacklistedTaskMangersnumBlocklistedTaskMangers: The number of currently blacklisted blocklisted task managers (including task managers on blacklisted blocklisted nodes)
  2. numBlacklistedNodesnumBlocklistedNodes: The number of currently blacklisted blocklisted nodes

REST API

We will introduce following REST APIs for blacklist blocklist mechanism:

  1. REST API for querying blacklist blocklist information.
  2. REST API for adding new blacklisted blocklisted items.
  3. REST API for removing existing blacklisted blocklisted items.

Query

Add a REST API to obtain blacklist blocklist information. Each request will return all current blacklist blocklist items, which are obtained from ResourceManagerBlacklistHandlerResourceManagerBlocklistHandler.

GET: http://{jm_rest_address:port}/blacklistblocklist

Request: {}

Response:

Code Block
titleResponse
{
  /** This group only contains directly blacklistedblocklisted task managers */
  "blacklistedTaskManagersblocklistedTaskManagers": [
      {
          "id" : "container_XXX_000002",
          "timestamp" : "XXX",
          "action" : "MARK_BLACKLISTEDBLOCKLISTED"
      },
      {
          "id" : "container_XXX_000003",
          "timestamp" : "XXX",
          "action" : "MARK_BLACKLISTEDBLOCKLISTED"
      }, 
      ...
  ],
  "blacklistedNodesblocklistedNodes": [
      {
          "id" : "node1",
          "timestamp" : "XXX",
          "action" : "MARK_BLACKLISTEDBLOCKLISTED"
          "taskManagers" : [“container_XXX_000004”, “container_XXX_000005”, …]
      },
      ...
  ]
}


In order to get blacklist blocklist information, we need to add an interface to ResourceManagerGateway:

Code Block
titleResourceManagerGateway
public interface ResourceManagerGateway {
   CompletableFuture<BlacklistInfo>CompletableFuture<BlocklistInfo> requestBlacklistrequestBlocklist(@RpcTimeout Time timeout);
   // ...
}

...

POST: http://{jm_rest_address:port}/blacklistblocklist/add

Request:

Code Block
titleRequest
{
  "newBlacklistedTaskManagersnewBlocklistedTaskManagers": [
      {
          "id" : "container_XXX_000002",
          "action" : "MARK_BLACKLISTEDBLOCKLISTED"
      },
      {
          "id" : "container_XXX_000003",
          "action" : "MARK_BLACKLISTEDBLOCKLISTED"
      }, 
      ...
  ],
  "newBlacklistedNodesnewBlocklistedNodes": [
      {
          "id" : "node1",
          "action" : "MARK_BLACKLISTEDBLOCKLISTED"
      },
      ...
  ]
}

Response: {}

Remove

POST: http://{jm_rest_address:port}/blacklistblocklist/remove

Request:

Code Block
titleRequest
{
  "blacklistedTaskManagersToRemoveblocklistedTaskManagersToRemove": [
      {
          "id" : "container_XXX_000002",
          "action" : "MARK_BLACKLISTEDBLOCKLISTED"
      },
      {
          "id" : "container_XXX_000003",
          "action" : "MARK_BLACKLISTEDBLOCKLISTED"
      }, 
      ...
  ],
  "blacklistedNodesToRemoveblocklistedNodesToRemove": [
      {
          "id" : "node1",
          "action" : "MARK_BLACKLISTEDBLOCKLISTED"
      },
      ...
  ]
}

Response: {}

Compatibility, Deprecation, and Migration Plan

The blacklist blocklist mechanism will be an optional feature which the user has to activate explicitly by setting the config option cluster.resource-blacklistblocklist.enabled: true. This entails that Flink's default behavior won't change.

Future improvements

Introduce a configurable

...

blocklist strategy implementation

We intend to introduce a configurable blacklist blocklist strategy in the future. Users can specify  exceptions for which the strategy should blacklist blocklist the resources. This requires adding several configuration options to configure exceptions and the generated blacklist blocklist item, which requires further design and discussion.

Introduce plugin mechanism for

...

blocklist strategy

We may introduce a plugin mechanism in the future, which allows users to load their own blacklist blocklist strategy implementations. This means that BlacklistStrategy BlocklistStrategy needs to be opened to users as a public interface, which requires more thought and discussion.

...