You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

Status

Current state: "Under Discussion"

Discussion thread: https://lists.apache.org/thread/fngkk52kjbc6b6v9nn0lkfq6hhsbgb1h

JIRA:

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink jobs typically run in a distributed way. In large clusters, it’s very common for cluster nodes to encounter issues, such as insufficient disk space, bad hardware, network abnormalities. These problems may cause job failures. Flink will take care of the failures and redeploy the relevant tasks. However, due to data locality and limited resources, the new tasks are very likely to be redeployed to the same nodes, which will result in continuous task abnormalities and affect job progress. 

Currently, Flink users need to manually identify the problematic node and take it offline to solve this problem. But this approach has following disadvantages:

  1. Taking a node offline can be a heavy process. Users may need to contact cluster administors to do this. The operation can even be dangerous and not allowed during some important business events.
  2. Identifying and solving this kind of problems manually would be slow and a waste of human resources.

To solve this problem, we propose to introduce a blocklist mechanism for Flink to filter out problematic resources. Following two ways will be introduced to blocklist resources:

  1. Manually specify the blocklisted resources through REST API. When users find abnormal nodes/TMs, they can manually blocklist them in this way.
  2. Automatically detect abnormal resources and blocklist them. Users can specify a blocklist strategy which identifies abnormal resources. In the first version, we only introduce the blocklist strategy interface and a no-op implementation. In the future, we will introduce a configurable blocklist strategy and plugin mechanism to load user-defined blocklist strategy implementations.

Public Interfaces

We propose to introduce following configuration options for blocklist:

  • cluster.resource-blocklist.enabled: Whether to enable blocklist mechanism.
  • cluster.resource-blocklist.item.timeout: Timeout for blocklisted items in blocklist to be removed
  • cluster.resource-blocklist.item.timeout-check-interval: The check interval for blocklist items timeout.

Proposed Changes

In this design, two granularities of blocklisted resources are supported: task managers and nodes. A record of blocklist information is called a blocklisted item, which is generally generated by the scheduler according to the exception of the tasks. These blocklisted items will be recorded in a special component and affect the resource allocation of Flink clusters. However,the blocklist items are not permanent, there will be a timeout for it. Once an item times out, it will be removed, and the resource will become available again. The overall structure of the blocklist mechanism is shown in the figure below. 

In JM, there will be a component (JobMasterBlocklistHandler) responsible for generating blocklisted items according to the exceptions notified by Scheduler, and managing them. SlotPool will be aware of the blocklist information and filter out blocklisted resources when allocating slots.

In RM, there will also be a component (ResourceManagerBlocklistHandler) responsible for managing the cluster-level blocklist. SlotManager will be aware of the blocklist information  and filter out resources in the blocklist when allocating slots from registered task managers. Similarly, when the ResourceManagerDriver requests new resources from an external resource manager (Yarn or Kubernetes), it also needs to filter out the blocklisted nodes.

Blocklisted Item

A blocklisted item corresponds to the blocklist information of a specific task manager or node, including the following 5 fields:

  1. type: The blocklisted item type, TASK_MANAGER or NODE
  2. timestamp: The timestamp for creating this item, will be used to check timeout.
  3. cause: The cause for creating this item.
  4. identifier: The identifier of the blocklisted task manager or node.
  5. action: The action when a task manager/node is marked as blocklisted, including:
    1. MARK_BLOCKLISTED: Just mark the task manager or node as blocklisted. Future slots should not be allocated from the blocklisted task manager or node. But slots that are already allocated will not be affected.
    2. MARK_BLOCKLISTED_AND_EVACUATE_TASKS: Mark the task manager or node as blocklisted, and evacuate all tasks on it. Evacuated tasks will be restarted on non-blocklisted task managers.
BlacklistedItem
/**
 * This class represents a blocklisted item.
 *
 * @param <ID> Identifier of the blocklisted item.
 */
public abstract class BlocklistedItem<ID> {
    public BlocklistedItemType getType();

    public long getTimestamp();

    public BlocklistAction getAction();

    public Throwable getCause();

    public abstract ID getIdentifier();
}

/** This class represents a blocklisted node. */
public class BlocklistedNode extends BlocklistedItem<String> {
}

/** This class represents a blocklisted task manager. */
public class BlocklistedTaskManager extends BlocklistedItem<ResourceID> {
}

Blocklist on JobMaster

JobMasterBlocklistHandler

JobMasterBlocklistHandler is the component in JM responsible for generating and managing blocklisted items. It consists of three sub-components:BlocklistStrategy, BlocklistTracker and BlocklistContext. When receiving a new exception from Scheduler, the JobMasterBlocklistHandler will handle it as following:

  1. Generate new blocklisted items by notifying the exception to the BlocklistStrategy.
  2. Add the new blocklisted items to the BlocklistTracker. 
  3. Synchronize the new blocklisted items to RM.
  4. Perform blocklist actions on the resources via the BlocklistContext.

BlocklistStrategy

BlocklistStrategy is the component responsible for generating blocklist items according to the exceptions and their locations notified by Scheduler. We can introduce different BlocklistStrategy implementations to adapt to different scenarios. In the first version, we will introduce a no-op implementation as default implementation. In the future, we will introduce a configurable blocklist strategy and plugin mechanism to load user-defined blocklist strategy implementations, details will be described in Future improvements.

BlacklistStrategy
public interface BlocklistStrategy {
    /**
     * Generate blocklisted items according to the abnormal task's location and cause.
     *
     * @param locations the abnormal tasks’ locations.
     * @param cause the cause of blocklisted items.
     * @param timestamp the create timestamp of blocklisted items.
     * @return the generated blocklisted items.
     */
    Collection<BlocklistedItem<?>> generateBlocklistedItems(Collection<TaskManagerLocation> locations, Throwable cause, long timestamp);
}

BlocklistTracker

BlocklistTracker is the component responsible for tracking blocklist items. The tracker will regularly remove timeout blocklisted items.

BlacklistTracker
public interface BlocklistTracker {
    /** Starts the blocklist tracker. */
    void start(ComponentMainThreadExecutor mainThreadExecutor);

    /**
     * Add new blocklisted items or update existing items.
     *
     * @param items The items to add or update
     * @return Newly added or updated items.
     */
    Collection<BlocklistedItem<?>> addNewBlocklistedItems(Collection<BlocklistedItem<?>> items);

    /** Returns whether the given task manager is blocklisted. */
    boolean isBlocklistedTaskManager(ResourceID resourceID);

    /** Get all blocklisted nodes. */
    Set<String> getBlocklistedNodes();

    /** Close the blocklist tracker. */
    void close();
}
     

BlocklistContext

BlocklistContext is the component responsible for performing blocklist actions on SlotPool, the details will be described in SlotPool.

BlacklistContext
public interface BlocklistContext {
    /** Perform the newly added or updated blocklist items on resources. */
    void blocklistResources(Collection<BlocklistedItem<?>> newlyAddedOrUpdatedItems);
}
   
BlacklistHandler & JobMasterBlacklistHandler
public interface BlocklistHandler extends BlocklistTracker {
}

public interface JobMasterBlocklistHandler extends BlocklistHandler {

    /**
     * Notify an exception that may generate blocklist items.
     *
     * @param locations locations of the exception
     * @param cause the exception
     */
    void notifyException(Collection<TaskManagerLocation> locations, Throwable cause);
}

SlotPool

SlotPool should avoid allocating slots from blocklisted task managers. Blocklisted task managers include those directly blocklisted and those located on blocklisted nodes. To do that, our core idea is to keep the SlotPool in such a state: there is no slot in SlotPool that is free (no task assigned) and blocklisted. Details are as following:

  1. When SlotPool starts, BlocklistedTaskManagerChecker will be passed in to check whether a task manager is blocklisted.
  2. When receiving slot offers from blocklisted task managers (including task managers on blocklisted nodes), all offers should be rejected.
  3. When a task manager is newly blocklisted, BlocklistContext will perform the following on SlotPool:
    1. If the action is MARK_BLOCKLISTED, release all free(no task assigned) slots on the blocklisted task manager.
    2. If the action is MARK_BLOCKLISTED_AND_EVACUATE_TASKS, release all slots on the blocklisted task manager.
  4. When a slot state changes from reserved(task assigned) to free(no task assigned), it will check whether the corresponding task manager is blocklisted. If yes, release the slot.


SlotPoolService
public interface BlocklistedTaskManagerChecker {
    /**
     * Returns whether the given task manager is blocklisted.
     */
    boolean isBlocklistedTaskManager(ResourceID resourceID);
}


public interface SlotPoolService {

    /** Start the encapsulated slot pool implementation. */
    void start(
            JobMasterId jobMasterId,
            String address,
            ComponentMainThreadExecutor mainThreadExecutor,
            BlocklistedTaskManagerChecker blocklistedTaskManagerChecker)
            throws Exception;

   /**
     * Releases all slots belonging to the owning TaskExecutor if it has been registered.
     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
     */
    void releaseSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);

    /**
     * Releases all free slots belonging to the owning TaskExecutor if it has been registered.
     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
     */
    void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);

    //...
}

Blocklist on ResourceManager

ResourceManagerBlocklistHandler

ResourceManagerBlocklistHandler is a new component introduced in RM for the blocklist mechanism. It has only one sub-component: BlocklistTracker, which is responsible for managing cluster-level blocklisted items.

ResourceManagerBlacklistHandler
public interface ResourceManagerBlocklistHandler extends BlocklistHandler {
}

SlotManager

SlotManager should filter out blocklisted resources when allocating registered resources. To do that, we need following changes:

  1. When starting SlotManager, the BlocklistedTaskManagerChecker will be passed in to check whether a registered task manager is blocklisted.
  2. When trying to fulfill the slot requirements by registered task managers, the blocklisted ones will be filtered out.
  3. SlotManager will request new task managers from external resource managers if the registered resources cannot fulfill the requirements. The blocklist also takes effect when requesting new task managers, the details will be described in ResourceManagerDriver.
SlotManager
public interface SlotManager {

    /** Starts the slot manager with the given leader id and resource manager actions. */
    void start(
            ResourceManagerId newResourceManagerId,
            Executor newMainThreadExecutor,
            ResourceActions newResourceActions,
            BlocklistedTaskManagerChecker newBlocklistedTaskManagerChecker);

    //...
}

ResourceManagerDriver

When deploying Flink on Yarn or Kubernetes, the ResourceManagerDriver is responsible for requesting new task managers from the external resource manager. To avoid allocating task managers from blocklisted nodes, we need to pass the information of blocklisted nodes to ResourceManagerDriver at its initialization. 

ResourceManagerDriver uses following APIs to tell external resource managers about the information of blocklist nodes:

  1. Yarn: AMRMClient#updateBlacklist 
  2. Kubernetes: NodeAffinity
ResourceManagerDriver
public interface BlocklistedNodeRetriever {

    /** Retrieve blocklisted nodes. */
    Set<String> getBlocklistedNodes();
}

public interface ResourceManagerDriver {

    /** Initialize the deployment specific components. */
    void initialize(
            ResourceEventHandler<WorkerType> resourceEventHandler,
            ScheduledExecutor mainThreadExecutor,
            Executor ioExecutor,
            BlocklistedNodeRetriever blocklistedNodeRetriever)
            throws Exception;

    //...
}

Synchronize Blocklist between JM & RM

The newly added or updated blocklisted items will be synchronized between JM and RM via RPC: 

  1. Once a few blocklisted items are newly added (or updated) to the JobMasterBlocklistHandler, RM will be notified of these items via ResourceManagerGateway#notifyNewBlocklistedItems.
  2. When RM receives the blocklisted items notified by a JM, it will add them into ResourceManagerBlocklistHandler, and notify all JMs of the successfully added (or updated) items through JobMasterGateway#notifyNewBlocklistedItems. 
  3. Similarly, when JM receives the blocklisted items notified by RM, it will also add them to JobMasterBlocklistHandler.
BlacklistListener
public interface BlocklistListener {

    /** Notify new blocklisted items. */
    void notifyNewBlocklistedItems(Collection<BlocklistedItem<?>> newItems);
}

public interface JobManagerGateway extends BlocklistListener {
    //...
}

public interface ResourceManagerGateway extends BlocklistListener {     
    //...
}  

Enrich TaskManagerLocation with node information

In order to support blocklisted nodes, it is necessary to know the node where the task is located. To do that, we need to add a node identifier into TaskManagerLocation. This node identifier should be set by the resource manager when requesting new task managers.

TaskManagerLocation
public class TaskManagerLocation {

    public String getNodeId();

    //...
}

Metrics

We propose to introduce following Gauge metrics to ResourceManagerMetricGroup:

  1. numBlocklistedTaskMangers: The number of currently blocklisted task managers (including task managers on blocklisted nodes)
  2. numBlocklistedNodes: The number of currently blocklisted nodes

REST API

We will introduce following REST APIs for blocklist mechanism:

  1. REST API for querying blocklist information.
  2. REST API for adding new blocklisted items.
  3. REST API for removing existing blocklisted items.

Query

Add a REST API to obtain blocklist information. Each request will return all current blocklist items, which are obtained from ResourceManagerBlocklistHandler.

GET: http://{jm_rest_address:port}/blocklist

Request: {}

Response:

Response
{
  /** This group only contains directly blocklisted task managers */
  "blocklistedTaskManagers": [
      {
          "id" : "container_XXX_000002",
          "timestamp" : "XXX",
          "action" : "MARK_BLOCKLISTED"
      },
      {
          "id" : "container_XXX_000003",
          "timestamp" : "XXX",
          "action" : "MARK_BLOCKLISTED"
      }, 
      ...
  ],
  "blocklistedNodes": [
      {
          "id" : "node1",
          "timestamp" : "XXX",
          "action" : "MARK_BLOCKLISTED"
          "taskManagers" : [“container_XXX_000004”, “container_XXX_000005”, …]
      },
      ...
  ]
}


In order to get blocklist information, we need to add an interface to ResourceManagerGateway:

ResourceManagerGateway
public interface ResourceManagerGateway {
   CompletableFuture<BlocklistInfo> requestBlocklist(@RpcTimeout Time timeout);
   // ...
}

Add

POST: http://{jm_rest_address:port}/blocklist/add

Request:

Request
{
  "newBlocklistedTaskManagers": [
      {
          "id" : "container_XXX_000002",
          "action" : "MARK_BLOCKLISTED"
      },
      {
          "id" : "container_XXX_000003",
          "action" : "MARK_BLOCKLISTED"
      }, 
      ...
  ],
  "newBlocklistedNodes": [
      {
          "id" : "node1",
          "action" : "MARK_BLOCKLISTED"
      },
      ...
  ]
}

Response: {}

Remove

POST: http://{jm_rest_address:port}/blocklist/remove

Request:

Request
{
  "blocklistedTaskManagersToRemove": [
      {
          "id" : "container_XXX_000002",
          "action" : "MARK_BLOCKLISTED"
      },
      {
          "id" : "container_XXX_000003",
          "action" : "MARK_BLOCKLISTED"
      }, 
      ...
  ],
  "blocklistedNodesToRemove": [
      {
          "id" : "node1",
          "action" : "MARK_BLOCKLISTED"
      },
      ...
  ]
}

Response: {}

Compatibility, Deprecation, and Migration Plan

The blocklist mechanism will be an optional feature which the user has to activate explicitly by setting the config option cluster.resource-blocklist.enabled: true. This entails that Flink's default behavior won't change.

Future improvements

Introduce a configurable blocklist strategy implementation

We intend to introduce a configurable blocklist strategy in the future. Users can specify  exceptions for which the strategy should blocklist the resources. This requires adding several configuration options to configure exceptions and the generated blocklist item, which requires further design and discussion.

Introduce plugin mechanism for blocklist strategy

We may introduce a plugin mechanism in the future, which allows users to load their own blocklist strategy implementations. This means that BlocklistStrategy needs to be opened to users as a public interface, which requires more thought and discussion.

Test Plan

  1. The changes will be covered by unit and IT cases.
  2. Test the functionality in a real Standanlone/Yarn/Kubernetes cluster.

Rejected Alternatives

No rejected alternatives yet.

  • No labels