Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To solve the above problems, we propose to introduce a blocklist mechanism for Flink to filter out problematic resources. Two granularities of blocked resources will be supported: task managers and nodes, and two block resources on problematic nodes. Two block actions will be introduced:

  1. MARK_BLOCKED: Just mark a task manager or node as blocked. Future slots should not be allocated from the blocked task manager or node. But slots that are already allocated will not be affected.
  2. MARK_BLOCKED_AND_EVACUATE_TASKS: Mark the task manager or a node as blocked, and evacuate all tasks on it. Evacuated tasks will be restarted on non-blocked task managersnodes.

In this design, only support manually specifying blocked resources via the REST API, auto-detection may be introduced in the future.

...

Metrics

...

We propose to introduce following a new Gauge metrics:

...

numBlockedNodes to report the number of currently blocked nodes

REST API

We propose to introduce following REST APIs for blocklist mechanism:

  1. REST API for querying blocklist informationall blocked nodes.
  2. REST API for adding new blocked itemsnodes.
  3. REST API for removing existing blocked itemsnodes.

Query

GET: http://{jm_rest_address:port}/blocklist

...

Code Block
titleResponse Example
{
    /** This group only contains directly blocked task managers */
    "blockedTaskManagersnode1":{
        "container1":{
            "action":"MARK_BLOCKED",
            "startTimestamp":"1652313600000",
            "endTimestamp":"1652317200000",
            "cause":"Hot machine",
        },/** The task managers on this blocked node */
        "container2taskManagers":{[
            "action":"MARK_BLOCKED_AND_EVACUATE_TASKS"container1",
            "startTimestamp":"1652315400000",container2"
        ]
    "endTimestamp":"1652319000000"},
    "node2":{
        "causeaction":"No space left on device"MARK_BLOCKED_AND_EVACUATE_TASKS",
        }"startTimestamp":"1652315400000",
        ..."endTimestamp":"1652319000000",
    },
    "blockedNodescause":{
"No space left on device",
    "node1":{
    /** The task managers on this blocked  "action":"MARK_BLOCKED",
    node */
        "startTimestamptaskManagers":"1652313600000",[
            "endTimestamp":container3"1652317200000",
            "cause":"Hot machine",container4"
        ]
    /** The task managers on this blocked node */
            "taskManagers":[
                "container3",
                "container4"
            ]
        },
        ...
    }
}},
    ...
}

Field meanings in responses:

  1. id: A string value that represents the identifier identifier of the blocked task manager or node.
  2. action: An enum value(MARK_BLOCKED or MARK_BLOCKED_AND_EVACUATE_TASKS) that represents the block action when a task manager/ node is marked as blocked.
  3. startTimestamp: A long value that represents the (unix timestamp (in milliseconds) of creating this item. that represents the start time of the blocking.
  4. endTimestamp:   A  A long value (unix timestamp in milliseconds) that represents the unix timestamp(milliseconds) at which the item should be removed. If the blocked item is the end time of the blocking (at which time the node will become available again). If the blocking is permanent, this value will be Long.MAX_VALUE(9223372036854775807).
  5. cause: A string value that represents the cause for blocking this task manager or node.

Add

PUT: http://{jm_rest_address:port}/blocklist/nodes/<id>

PUT: http://{jm_rest_address:port}/blocklist/taskmanagers/<id>

Request

Request body:

Code Block
titleRequest Example
{
    "action":"MARK_BLOCKED",
    "endTimestamp":"1652317200000",
    "cause":"Hot machine",
    "allowMerge":"true"
}

...

  1. id: A string value that specifies the identifier of the blocked task manager or node.
  2. action: An enum value(MARK_BLOCKED or MARK_BLOCKED_AND_EVACUATE_TASKS) that specifies the block action when a task manager/ node is marked as blocked.
  3. endTimestamp(optional): A long value that specifies the (unix timestamp (in milliseconds) that represents the end time of the blocking (at which the item should be removed. time the node will become available again). If not specified, it means that the blocked item blocking is permanent and will not be removed
  4. cause: A string value that specifies the cause for blocking this task manager or node.
  5. allowMerge(optional): A boolean value that specifies whether to merge when a conflict occurs. The default value is false.

When trying to add block a taskmanager or node, if the corresponding taskmanager/ node already exists in blocklist, we propose to introduce two processing behaviors:

  1. If field allowMerge is false, return error.
  2. If field allowMerge is true. The newly added item one and the existing item one will be merged into one. Regarding the 3 fields, the merging algorithm:

...

  1. If no conflict, the response code is will be 201(CREATED), the response body is the added itemswill be the information of node.
  2. If conflict occurs:
    1. If allowMerge is false, the response code is will be 409(CONFLICT), and returns return error.
    2. if allowMerge is true, the response code is will be 202(ACCEPTED), the response body is the merged items.

Remove

...

    1. will be the merged information of node.

Remove

DELETE: http://{jm_rest_address:port}/blocklist/taskmanagernode/<id>

Request

Request body: {}

Response

if the item node identified by id does not exist, the response code is will be 404(NOT FOUND), and returns return error. Else, the response code is will be 200(OK), and returns return an empty response body.

Proposed Changes

In this design, two granularities of blocked resources are supported: task managers and nodes. A record of blocklist information is called a blocked item, which is generally generated by the scheduler according to the exception of the tasks. These blocked items We introduce a new data structure: BlockedNode to record the information of a blocked node. This information will be recorded in a special component and affect the resource allocation of Flink clusters. However,the blocked items are blocking of nodes is not permanent, there will be a timeout for it. Once an item it times out, it will be removed, and the resource nodes will become available again. The overall structure of the blocklist mechanism is shown in the figure below. 

...

In JM, there will be a component (JobMasterBlocklistHandler) responsible for managing all blocked items BlockedNode(s) and performing them on SlotPool. 

In RM, there will also be a component (ResourceManagerBlocklistHandler) responsible for managing the cluster-level blocklist. SlotManager will be aware of the blocklist information  information and filter out resources in the blocklist when allocating slots from registered task managers. Similarly, when the ResourceManagerDriver requests new resources from an external resource manager (Yarn or Kubernetes), it also needs to filter out the blocked nodes.

...

BlockedNode

A blocked item BlockedNode corresponds to the blocklist blocked information of a specific task manager or node, including the following 6 fields:

...

:

  1. id: The identifier of the blocked task manager or node.
  2. action: The block action when a task manager/ node is marked as blocked, MARK_BLOCKED or MARK_BLOCKED_AND_EVACUATE_TASKS
  3. startTimestamp: The timestamp of creating this item start time of the blocking.
  4. endTimestamp: The timestamp  end time of the blocking (at which the item should be removed.time the node will become available again). 
  5. cause: The cause for creating this item.blocking this node.
Code Block
titleBlockedNode
/**
 * This class represents a blocked node.
 */
public class BlockedNode {
Code Block
titleBlocklistedItem
/**
 * This class represents a blocked item.
 *
 * @param <ID> Identifier of the blocked item.
 */
public abstract class BlockedItem<ID> {
    public BlockedItemType getType();

    public BlockAction getAction();
    
    public long getStartTimestamp();

    public longBlockAction getEndTimestampgetAction();
     
    public Stringlong getCausegetStartTimestamp();

    public abstractlong ID getId();
}

/** This class represents a blocked node. */
public class BlockedNode extends BlockedItem<String> {
}

/** This class represents a blocked task manager. */
public class BlockedTaskManager extends BlockedItem<ResourceID> {getEndTimestamp();
    
    public String getCause();

    public String ID getId();
}

Blocklist on JobMaster

JobMasterBlocklistHandler

JobMasterBlocklistHandler is the component in JM responsible for managing all blocked items node information and performing them on SlotPool. It consists of two sub-components:BlocklistTracker and BlocklistContext. When receiving a new blocked itemsnode information, the JobMasterBlocklistHandler will handle it as following:

  1. Add the new blocked items nodes to the BlocklistTracker. 
  2. Synchronize the new blocked items to nodes to RM.
  3. Perform block actions on the resources via the BlocklistContext.

...

BlocklistTracker is the component responsible for tracking blocked itemsnodes. The tracker will regularly remove timeout blocked itemsnodes.

Code Block
titleBlocklistTracker
public interface BlocklistTracker {

    /**
     * Add new blocked* itemsAdd or update existingblocked itemsnodes.
     *
     * @param itemsnodes The itemsnodes to add or update
     * @return Newly added or updated itemsnodes.
     */
    Collection<BlockedItem<?>>Collection<BlockedNode> addNewBlockedItemsaddNewBlockedNodes(Collection<BlockedItem<?>>Collection<BlockedNode> itemsnodes);

    /** Returns whether the given task manager is located on blocked nodes. */
    boolean isBlockedTaskManager(ResourceID resourceIDtaskManagerId);

    /** Get all blocked nodes. */
    Set<String> getBlockedNodes();

    /** Remove the timeout items all blocked nodes. */
    voidSet<String> removeTimeoutItemsgetBlockedNodes();
}
     

BlocklistContext

BlocklistContext is the component responsible for performing block actions on SlotPool, the details will be described in SlotPool.

Code Block
titleBlocklistContext
public interface BlocklistContext {
    /** Perform the newly added or updated blocked items on resourcesRemove the timeout nodes. */
    void blockResourcesremoveTimeoutNodes(Collection<BlockedItem<?>> newlyAddedOrUpdatedItems);
}
     

BlocklistContext

BlocklistContext is the component responsible for performing block actions on SlotPool, the details will be described in SlotPool.

Code Block
titleBlocklistHandler & JobMasterBlocklistHandlerBlocklistContext
public interface BlocklistHandlerBlocklistContext {

    /** Add a new Perform block actions on resources on blocked nodenodes. */
    void blockResources(Collection<BlockedNode> newlyAddedOrUpdatedNodes);
}
   


Code Block
titleBlocklistHandler & JobMasterBlocklistHandler
public interface BlocklistHandler { blockNode(String nodeId, BlockAction action, String cause, long startTimestamp, long endTimestamp);

    /** Add a new blocked task managernode. */
    void blockTaskManagerblockNode(ResourceIDString taskManagerIdnodeId, BlockAction action, String cause, long startTimestamp, long endTimestamp);

    /** Returns whether the given task manager is located on blocked nodes. */
    boolean isBlockedTaskManager(ResourceID resourceIDtaskManagerId); 

    /** Get all blocked nodes. */
    Set<String> getBlockedNodes();    
}

public interface JobMasterBlocklistHandler extends BlocklistHandler {
}

...

SlotPool should avoid allocating slots from blocked task managers. Blocked task managers include those directly blocked and those that located on blocked nodes. To do that, our core idea is to keep the SlotPool in such a state: there is no slot in SlotPool that is free (no task assigned) and located on blocked nodes. Details are as following:

  1. When receiving slot offers from blocked task managers (including task managers located on blocked nodes), all offers should be rejected.
  2. When a task manager node is newly blocked, BlocklistContext will perform the following on SlotPool:
    1. If the action is MARK_BLOCKED, release all free(no task assigned) slots on the blocked task manager(it needs to introduce a new method: SlotPoolService#releaseFreeSlotsOnTaskManager)node. We need to find all task managers on blocked nodes and release all free slots on them by SlotPoolService#releaseFreeSlotsOnTaskManager.
    2. If the action is MARK_BLOCKED_AND_EVACUATE_TASKS, release all slots on the blocked task manager(it needs to introduce a new method: SlotPoolService#releaseSlotsOnTaskManager)node. We need to find all task managers on blocked nodes and release all slots on them by SlotPoolService#releaseSlotsOnTaskManager. Note that we should not unregister the TM task managers directly, because in the case of batch jobs, there may be unconsumed result partitions on itthem.
  3. When a slot state changes from reserved(task assigned) to free(no task assigned), it will check whether the corresponding task manager is blocked. If yes, release the slot.

...

ResourceManagerBlocklistHandler is a new component introduced in RM for the blocklist mechanism. It has only one sub-component: BlocklistTracker, which is responsible for managing cluster-level blocked items.

Code Block
titleResourceManagerBlocklistHandler
public interface ResourceManagerBlocklistHandler extends BlocklistHandler {
}

SlotManager

SlotManager should filter out blocked resources when allocating registered resources. To do that, we need following changes:

...

. It has only one sub-component: BlocklistTracker, which is responsible for managing cluster-level blocklist.

Code Block
titleResourceManagerBlocklistHandler
public interface ResourceManagerBlocklistHandler extends BlocklistHandler {
}

SlotManager

SlotManager should filter out blocked resources when allocating registered resources. To do that, we need following changes:

  1. When starting SlotManager, the BlockedTaskManagerChecker will be passed in to check whether a registered task manager is located on blocked nodes.
  2. When trying to fulfill the slot requirements by registered task managers, the task managers located on blocked nodes will be filtered out.
  3. SlotManager will request new task managers from external resource managers if the registered resources cannot fulfill the requirements. The blocklist also takes effect when requesting new task managers, the details will be described in ResourceManagerDriver.
Code Block
titleBlockedTaskManagerChecker
/** This checker helps to query whether a given task manager is located on blocked nodes. */
public interface BlockedTaskManagerChecker {

    /** Returns whether the given task manager is located on blocked nodes. */
    boolean isBlockedTaskManager(ResourceID taskManagerId);
}

...


Code Block
titleSlotManager
public interface SlotManager {

    /** Starts the slot manager with the given leader id and resource manager actions. */
    void start(
            ResourceManagerId newResourceManagerId,
            Executor newMainThreadExecutor,
            ResourceActions newResourceActions,
            BlockedTaskManagerChecker newBlockedTaskManagerChecker);

    //...
}

...

Synchronize Blocklist between JM & RM

The newly added or /updated blocked items nodes will be synchronized between JM and RM via RPC: 

  1. Once a few blocked items nodes are newly added (or /updated ) to the JobMasterBlocklistHandler, RM will be notified of these items via ResourceManagerGateway#notifyNewBlockedItemsnodes via ResourceManagerGateway#notifyNewBlockedNodes.
  2. When RM receives the blocked items nodes notified by a JM, it will add them into ResourceManagerBlocklistHandler, and notify all JMs of the successfully added (or updated) items through JobMasterGateway#notifyNewBlockedItems/updated nodes through JobMasterGateway#notifyNewBlockedNodes
  3. Similarly, when JM receives the blocked items nodes notified by RM, it will also add them to JobMasterBlocklistHandler.
Code Block
titleBlocklistListener
public interface BlocklistListener {

    /** Notify newly newadded/updated blocked itemsnodes. */
    void notifyNewBlockedItemsnotifyNewBlockedNodes(Collection<BlockedItem<?>>Collection<BlockedNode> newItemsnewlyAddedOrUpdatedNodes);
}

public interface JobManagerGateway extends BlocklistListener {
    //...
}

public interface ResourceManagerGateway extends BlocklistListener {     
    //...
}  

Enrich TaskManagerLocation with node information

In order to support blocked nodesblocked nodes, it is necessary to know the node where the task is located. To do that, we need to add a node identifier into TaskManagerLocation. This node identifier should be set by the resource manager when requesting new task managers.

...