Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

JIRA:

...

ASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28130

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Flink jobs typically run in a distributed way. In a large cluster, cluster nodes often encounter the following issues that affect jobs running:

  1. Unrecoverable problems, such as insufficient disk space, bad hardware, network abnormalities. These problems will result in continuous job failures. Currently, Flink users need to take the problematic node offline to solve this problem. However, taking a node offline can be a heavy process. Users may need to contact cluster administors to do this. The operation can even be dangerous and not allowed during some important business events.
  2. Recoverable problems, such as temporary node hotspots. These problems can slow the jobs running on it, but it can resume after a period of time. In this case, users may just want to limit the load of the node and do not want to kill all the processes on it. Unfortunately, currently neither Flink itself nor external resource management systems can do this.

To solve the above problems, we propose to introduce a blocklist mechanism for Flink to block resources on problematic nodes. Two block actions will be introduced:

  1. MARK_BLOCKED: Just mark a node as blocked. Future slots should not be allocated from the blocked node. But slots that are already allocated will not be affected.
  2. MARK_BLOCKED_AND_EVACUATE_TASKS: Mark a node as blocked, and evacuate all tasks on it. Evacuated tasks will be restarted on non-blocked nodes.

In this design, only support manually specifying blocked resources via the REST API, auto-detection may be introduced in the future.

Public Interfaces

Configurations

We propose to introduce following configuration options for blocklist:

  • cluster.resource-blocklist.enabled: Whether to enable blocklist mechanism.

Metrics

We propose to introduce a new Gauge metrics: numBlockedNodes to report the number of currently blocked nodes

REST API

We propose to introduce following REST APIs for blocklist mechanism:

  1. REST API for querying all blocked nodes.
  2. REST API for adding new blocked nodes.
  3. REST API for removing existing blocked nodes.

Query

GET: http://{jm_rest_address:port}/blocklist

Request

Request body: {}

Response

Response code: 200(OK)

Response body:

Code Block
titleResponse Example
{
    "node1":{
        "action":"MARK_BLOCKED",
        "startTimestamp":"1652313600000",
        "endTimestamp":"1652317200000",
        "cause":"Hot machine",
        /** The task managers on this blocked node */
        "taskManagers":[
            "container1",
            "container2"
        ]
    },
    "node2":{
        "action":"MARK_BLOCKED_AND_EVACUATE_TASKS",
        "startTimestamp":"1652315400000",
        "endTimestamp":"1652319000000",
        "cause":"No space left on device",
        /** The task managers on this blocked node */
        "taskManagers":[
            "container3",
            "container4"
        ]
    },
    ...
}

Field meanings in responses:

  1. id: A string value that represents the identifier of the blocked node.
  2. action: An enum value(MARK_BLOCKED or MARK_BLOCKED_AND_EVACUATE_TASKS) that represents the block action when a node is marked as blocked.
  3. startTimestamp: A long value (unix timestamp in milliseconds) that represents the start time of the blocking.
  4. endTimestamp:  A long value (unix timestamp in milliseconds) that represents the end time of the blocking (at which time the node will become available again). If the blocking is permanent, this value will be Long.MAX_VALUE(9223372036854775807).
  5. cause: A string value that represents the cause for blocking this node.

Add

PUT: http://{jm_rest_address:port}/blocklist/nodes/<id>

Request

Request body:

Code Block
titleRequest Example
{
    "action":"MARK_BLOCKED",
    "endTimestamp":"1652317200000",
    "cause":"Hot machine",
    "allowMerge":"true"
}

Field meanings in requests:

  1. id: A string value that specifies the identifier of the blocked node.
  2. action: An enum value(MARK_BLOCKED or MARK_BLOCKED_AND_EVACUATE_TASKS) that specifies the block action when a node is marked as blocked.
  3. endTimestamp(optional): A long value (unix timestamp in milliseconds) that represents the end time of the blocking (at which time the node will become available again). If not specified, it means that the blocking is permanent. 
  4. cause: A string value that specifies the cause for blocking this node.
  5. allowMerge(optional): A boolean value that specifies whether to merge when a conflict occurs. The default value is false.

When trying to block a node, if the corresponding node already exists in blocklist, we propose to introduce two processing behaviors:

  1. If field allowMerge is false, return error.
  2. If field allowMerge is true. The newly added one and the existing one will be merged. Regarding the 3 fields, the merging algorithm:
    1. For action, merge(MARK_BLOCKED, MARK_BLOCKED_AND_EVACUATE_TASKS) = MARK_BLOCKED_AND_EVACUATE_TASKS
    2. For endTimestamp, merge(endTimestampA, endTimestampB) = max(endTimestampA, endTimestampB)
    3. For cause, we will combine all causes, merge("causeA", "causeB") = "causeA,causeB". If these two causes are the same, we will keep only one.

Response

  1. If no conflict, the response code will be 201(CREATED), the response body will be the information of node.
  2. If conflict occurs:
    1. If allowMerge is false, the response code will be 409(CONFLICT), and return error.
    2. if allowMerge is true, the response code will be 202(ACCEPTED), the response body will be the merged information of node.

Remove

DELETE: http://{jm_rest_address:port}/blocklist/node/<id>

Request

Request body: {}

Response

if the node identified by id does not exist, the response code will be 404(NOT FOUND), and return error. Else, the response code will be 200(OK), and return an empty response body.

Proposed Changes

We introduce a new data structure: BlockedNode to record the information of a blocked node. This information will be recorded in a special component and affect the resource allocation of Flink clusters. However,the blocking of nodes is not permanent, there will be a timeout for it. Once it times out, the nodes will become available again. The overall structure of the blocklist mechanism is shown in the figure below. 

Image Removed

In JM, there will be a component (JobMasterBlocklistHandler) responsible for managing all BlockedNode(s) and performing them on SlotPool. 

In RM, there will also be a component (ResourceManagerBlocklistHandler) responsible for managing the cluster-level blocklist. SlotManager will be aware of the blocklist information and filter out resources in the blocklist when allocating slots from registered task managers. Similarly, when the ResourceManagerDriver requests new resources from an external resource manager (Yarn or Kubernetes), it also needs to filter out the blocked nodes.

BlockedNode

A BlockedNode corresponds to the blocked information of a specific node, including the following 6 fields:

  1. id: The identifier of the node.
  2. action: The block action when a node is marked as blocked, MARK_BLOCKED or MARK_BLOCKED_AND_EVACUATE_TASKS
  3. startTimestamp: The start time of the blocking.
  4. endTimestamp: The end time of the blocking (at which time the node will become available again). 
  5. cause: The cause for blocking this node.
Code Block
titleBlockedNode
/**
 * This class represents a blocked node.
 */
public class BlockedNode {

    public BlockAction getAction();
    
    public long getStartTimestamp();

    public long getEndTimestamp();
    
    public String getCause();

    public String ID getId();
}

Blocklist on JobMaster

JobMasterBlocklistHandler

In order to support speculative execution for batch jobs(FLIP-168), we need a mechanism to block resources on nodes where the slow tasks are located. We propose to introduce a blocklist mechanism as follows:  Once a node is marked as blocked, future slots should not be allocated from the blocked node, but the slots that are already allocated will not be affected.

Proposed Changes

We introduce a new data structure: BlockedNode to record the information of a blocked node. This information will be recorded in a special component and affect the resource allocation of Flink clusters. However,the blocking of nodes is not permanent, there will be a timeout for it. Once it times out, the nodes will become available again. The overall structure of the blocklist mechanism is shown in the figure below. 

Image Added

In JM, there will be a component (JobMasterBlocklistHandler) responsible for managing all BlockedNode(s) and performing them on SlotPool. 

In RM, there will also be a component (ResourceManagerBlocklistHandler) responsible for managing the cluster-level blocklist. SlotManager will be aware of the blocklist information and filter out resources in the blocklist when allocating slots from registered task managers. Similarly, when the ResourceManagerDriver requests new resources from an external resource manager (Yarn or Kubernetes), it also needs to filter out the blocked nodes.

BlockedNode

A BlockedNode corresponds to the blocked information of a specific node, including the following 4 fields:

  1. id: The identifier of the node.
  2. startTimestamp: The start time of the blocking.
  3. endTimestamp: The end time of the blocking (at which time the node will become available again). 
  4. cause: The cause for blocking this node.
Code Block
titleBlockedNode
/**
 * This class represents a blocked node.
 */
public class BlockedNode {
    
    public long getStartTimestamp();

    public long getEndTimestamp();
    
    public String getCause();

    public String getId();
}

Blocklist on JobMaster

JobMasterBlocklistHandler

JobMasterBlocklistHandler is the component in JM responsible for managing all blocked node information and performing them on SlotPoolJobMasterBlocklistHandler is the component in JM responsible for managing all blocked node information and performing them on SlotPool. It consists of two sub-components:BlocklistTracker and BlocklistContext. When receiving a new blocked node information, the JobMasterBlocklistHandler will handle it as following:

  1. Add the new blocked nodes to the BlocklistTracker. 
  2. Synchronize the new blocked nodes to RM.
  3. Perform block actions on Block the resources on newly added nodes via the BlocklistContext.

...

BlocklistContext is the component responsible for performing block actions on blocking slots in SlotPool, the details will be described in SlotPool.

Code Block
titleBlocklistContext
public interface BlocklistContext {
    /** PerformBlock block actionsresources on resourcesthe onnewly blockedadded nodes. */
    void blockResources(Collection<BlockedNode> newlyAddedOrUpdatedNodes);
}
   

...

Code Block
titleBlocklistHandler & JobMasterBlocklistHandler
public interface BlocklistHandler {

    /** Add a new blocked node. */
    void blockNode(String nodeId, BlockAction actionnodeId, String cause, long startTimestamp, long endTimestamp);

    /** Returns whether the given task manager is located on blocked nodes. */
    boolean isBlockedTaskManager(ResourceID taskManagerId); 

    /** Get all blocked nodes. */
    Set<String> getBlockedNodes();    
}

public interface JobMasterBlocklistHandler extends BlocklistHandler {
}

...

SlotPool should avoid allocating slots that located on blocked nodes. To do that, our core idea is to keep the SlotPool in such a state: there is no slot in SlotPool that is free (no task assigned) and located on blocked nodes. Details are as following::

  1. When receiving slot offers from task managers located on blocked nodes, all offers should be rejected.
  2. When a node is newly blocked, we should release all free(no task assigned) slots on it
  3. When receiving slot offers from task managers located on blocked nodes, all offers should be rejected.
  4. When a node is newly blocked, BlocklistContext will perform the following on SlotPool:
  5. If the action is MARK_BLOCKED, release all free(no task assigned) slots on the blocked node. We need to find all task managers on blocked nodes and release all free slots on them by SlotPoolService#releaseFreeSlotsOnTaskManager.
  6. If the action is MARK_BLOCKED_AND_EVACUATE_TASKS, release all slots on the blocked node. We need to find all task managers on blocked nodes and release all free slots on them by SlotPoolService#releaseSlotsOnTaskManager. Note that we should not unregister the task managers directly, because in the case of batch jobs, there may be unconsumed result partitions on them. SlotPoolService#releaseFreeSlotsOnTaskManager.
  7. When a slot state changes from reserved(task assigned) to free(no task assigned), it will check whether the corresponding task manager is blocked. If yes, release the slot.

We will introduce a new slot pool implementation: BlocklistSlotPool, which extends the DefaultDeclarativeSlotPool and overrides some methods to implement the blocklist-related functions described above, and the blocklist information will be passed in the constructor. This new implementation will be only used when the blocklist is enabled.the DefaultDeclarativeSlotPool and overrides some methods to implement the blocklist-related functions described above, and the blocklist information will be passed in the constructor. This new implementation will be only used when the blocklist is enabled.

Code Block
titleSlotPoolService
public interface SlotPoolService {
Code Block
titleSlotPoolService
public interface SlotPoolService {

   /**
     * Releases all slots belonging to the owning TaskExecutor if it has been registered.
     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
     */
    void releaseSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);

    /**
     * Releases all free slots belonging to the owning TaskExecutor if it has been registered.
     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
     */
    void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);

    //...
}

...

  1. Once a few blocked nodes are newly added/updated to the JobMasterBlocklistHandler, RM will be notified of these nodes via ResourceManagerGateway#notifyNewBlockedNodes.
  2. When RM receives the blocked nodes notified by a JM, it will add them into ResourceManagerBlocklistHandler, and notify all JMs of the successfully added/updated nodes through JobMasterGateway#notifyNewBlockedNodes
  3. Similarly, when JM receives the blocked nodes notified by RM, it will also add them to JobMasterBlocklistHandler.
  4. notify all JMs of the successfully added/updated nodes through JobMasterGateway#notifyNewBlockedNodes
  5. Similarly, when JM receives the blocked nodes notified by RM, it will also add them to JobMasterBlocklistHandler.
Code Block
titleBlocklistListener
public interface BlocklistListener {

    /** Notify newly added/updated blocked nodes. */
    void notifyNewBlockedNodes(Collection<BlockedNode> newlyAddedOrUpdatedNodes);
}

public interface JobManagerGateway extends BlocklistListener {
    //...
}

public interface ResourceManagerGateway extends BlocklistListener {     
    //...
}  

Enrich TaskManagerLocation with node information

In order to support blocked nodes, it is necessary to know the node where the task is located. To do that, we need to add a node identifier into TaskManagerLocation. This node identifier should be set by the resource manager when requesting new task managers.

Code Block
titleTaskManagerLocation
public class TaskManagerLocation {

    public String getNodeId();
Code Block
titleBlocklistListener
public interface BlocklistListener {

    /** Notify newly added/updated blocked nodes. */
    void notifyNewBlockedNodes(Collection<BlockedNode> newlyAddedOrUpdatedNodes);
}

public interface JobManagerGateway extends BlocklistListener {
    //...
}

public interface ResourceManagerGateway extends BlocklistListener {     
    //...
}  

Enrich TaskManagerLocation with node information

In order to support blocked nodes, it is necessary to know the node where the task is located. To do that, we need to add a node identifier into TaskManagerLocation. This node identifier should be set by the resource manager when requesting new task managers.

Code Block
titleTaskManagerLocation
public class TaskManagerLocation {

    public String getNodeId();

    //...
}

Compatibility, Deprecation, and Migration Plan

The blocklist mechanism will be an optional feature which the user has to activate explicitly by setting the config option cluster.resource-blocklist.enabled: true. This entails that Flink's default behavior won't change.

Future improvements

Automatically detect abnormal resources


Compatibility, Deprecation, and Migration Plan

The blocklist mechanism is only used when speculative execution is enabled, which entails that Flink's default behavior won't change.

Limitations

No integration with Flink's web UI

This FLIP does not modify the web UI, but it's in our plan. Currently, we have a preliminary idea of the UI intergration, including improve the slots information displaying and add a page to show the blocklist information.

No support for JM failover

Currently, the blocklist information will be lost after JM failover.  In the future, we may persist the blocklist information in HA to support JM failoverWe may introduce an auto-detection mechanism in the future. To do that, we need to introduce a pluggable abnormal resources detector and allow users to load their own implementations. This means the detector needs to be opened to users as a public interface, which requires more thought and discussion.


Test Plan

  1. The changes will be covered by unit and IT cases.
  2. Test the functionality in a real Standanlone/Yarn/Kubernetes cluster.

...