Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRA:

Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28130

Release1.16

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Flink jobs typically run in a distributed way. In large clusters, it’s very common for cluster nodes to encounter issues, such as insufficient disk space, bad hardware, network abnormalities. These problems may cause job failures. Flink will take care of the failures and redeploy the relevant tasks. However, due to data locality and limited resources, the new tasks are very likely to be redeployed to the same nodes, which will result in continuous task abnormalities and affect job progress. 

Currently, Flink users need to manually identify the problematic node and take it offline to solve this problem. But this approach has following disadvantages:

  1. Taking a node offline can be a heavy process. Users may need to contact cluster administors to do this. The operation can even be dangerous and not allowed during some important business events.
  2. Identifying and solving this kind of problems manually would be slow and a waste of human resources.

To solve this problem, we propose to introduce a blacklist mechanism for Flink to filter out problematic resources. Following two ways will be introduced to blacklist resources:

  1. Manually specify the blacklisted resources through REST API. When users find abnormal nodes/TMs, they can manually blocklist them in this way.
  2. Automatically detect abnormal resources and blocklist them. Users can specify a blacklist strategy which identifies abnormal resources. In the first version, we only introduce the blacklist strategy interface and a no-op implementation. In the future, we will introduce a configurable blacklist strategy and plugin mechanism to load user-defined blacklist strategy implementations.

Public Interfaces

We propose to introduce following configuration options for blacklist:

  • cluster.resource-blacklist.enabled: Whether to enable blacklist mechanism.
  • cluster.resource-blacklist.item.timeout: Timeout for blacklisted items in blacklist to be removed
  • cluster.resource-blacklist.item.timeout-check-interval: The check interval for blacklist items timeout.

Proposed Changes

In this design, two granularities of blacklisted resources are supported: task managers and nodes. A record of blacklist information is called a blacklisted item, which is generally generated by the scheduler according to the exception of the tasks. These blacklisted items will be recorded in a special component and affect the resource allocation of Flink clusters. However,the blacklist items are not permanent, there will be a timeout for it. Once an item times out, it will be removed, and the resource will become available again. The overall structure of the blacklist mechanism is shown in the figure below. 

Image Removed

In JM, there will be a component (JobMasterBlacklistHandler) responsible for generating blacklisted items according to the exceptions notified by Scheduler, and managing them. SlotPool will be aware of the blacklist information and filter out blacklisted resources when allocating slots.

In RM, there will also be a component (ResourceManagerBlacklistHandler) responsible for managing the cluster-level blacklist. SlotManager will be aware of the blacklist information  and filter out resources in the blacklist when allocating slots from registered task managers. Similarly, when the ResourceManagerDriver requests new resources from an external resource manager (Yarn or Kubernetes), it also needs to filter out the blacklisted nodes.

Blacklisted Item

A blacklisted item corresponds to the blacklist information of a specific task manager or node, including the following 5 fields:

  1. type: The blacklisted item type, TASK_MANAGER or NODE
  2. timestamp: The timestamp for creating this item, will be used to check timeout.
  3. cause: The cause for creating this item.
  4. identifier: The identifier of the blacklisted task manager or node.
  5. action: The action when a task manager/node is marked as blacklisted, including:
    1. MARK_BLACKLISTED: Just mark the task manager or node as blacklisted. Future slots should not be allocated from the blacklisted task manager or node. But slots that are already allocated will not be affected.
    2. MARK_BLACKLISTED_AND_EVACUATE_TASKS: Mark the task manager or node as blacklisted, and evacuate all tasks on it. Evacuated tasks will be restarted on non-blacklisted task managers.
Code Block
titleBlacklistedItem
/**
 * This class represents a blacklisted item.
 *
 * @param <ID> Identifier of the blacklisted item.
 */
public abstract class BlacklistedItem<ID> {
    public BlacklistedItemType getType();

    public long getTimestamp();

    public BlacklistAction getAction();

    public Throwable getCause();

    public abstract ID getIdentifier();
}

/** This class represents a blacklisted node. */
public class BlacklistedNode extends BlacklistedItem<String> {
}

/** This class represents a blacklisted task manager. */
public class BlacklistedTaskManager extends BlacklistedItem<ResourceID> {
}

Blacklist on JobMaster

JobMasterBlacklistHandler

JobMasterBlacklistHandler is the component in JM responsible for generating and managing blacklisted items. It consists of three sub-components:BlacklistStrategy, BlacklistTracker and BlacklistContext. When receiving a new exception from Scheduler, the JobMasterBlacklistHandler will handle it as following:

  1. Generate new blacklisted items by notifying the exception to the BlacklistStrategy.
  2. Add the new blacklisted items to the BlacklistTracker. 
  3. Synchronize the new blacklisted items to RM.
  4. Perform blacklist actions on the resources via the BlacklistContext.

BlacklistStrategy

BlacklistStrategy is the component responsible for generating blacklist items according to the exceptions and their locations notified by Scheduler. We can introduce different BlacklistStrategy implementations to adapt to different scenarios. In the first version, we will introduce a no-op implementation as default implementation. In the future, we will introduce a configurable blacklist strategy and plugin mechanism to load user-defined blacklist strategy implementations, details will be described in Future improvements.

Code Block
titleBlacklistStrategy
public interface BlacklistStrategy {
    /**
     * Generate blacklisted items according to the abnormal task's location and cause.
     *
     * @param locations the abnormal tasks’ locations.
     * @param cause the cause of blacklisted items.
     * @param timestamp the create timestamp of blacklisted items.
     * @return the generated blacklisted items.
     */
    Collection<BlacklistedItem<?>> generateBlacklistedItems(Collection<TaskManagerLocation> locations, Throwable cause, long timestamp);
}

BlacklistTracker

BlacklistTracker is the component responsible for tracking blacklist items. The tracker will regularly remove timeout blacklisted items.

Code Block
titleBlacklistTracker
public interface BlacklistTracker {
    /** Starts the blacklist tracker. */
    void start(ComponentMainThreadExecutor mainThreadExecutor);

    /**
     * Add new blacklisted items or update existing items.
     *
     * @param items The items to add or update
     * @return Newly added or updated items.
     */
    Collection<BlacklistedItem<?>> addNewBlacklistedItems(Collection<BlacklistedItem<?>> items);

    /** Returns whether the given task manager is blacklisted. */
    boolean isBlacklistedTaskManager(ResourceID resourceID);

    /** Get all blacklisted nodes. */
    Set<String> getBlacklistedNodes();

    /** Close the blacklist tracker. */
    void close();
}
     

BlacklistContext

BlacklistContext is the component responsible for performing blacklist actions on SlotPool, the details will be described in SlotPool.

In order to support speculative execution for batch jobs(FLIP-168), we need a mechanism to block resources on nodes where the slow tasks are located. We propose to introduce a blocklist mechanism as follows:  Once a node is marked as blocked, future slots should not be allocated from the blocked node, but the slots that are already allocated will not be affected.

Proposed Changes

We introduce a new data structure: BlockedNode to record the information of a blocked node. This information will be recorded in a special component and affect the resource allocation of Flink clusters. However,the blocking of nodes is not permanent, there will be a timeout for it. Once it times out, the nodes will become available again. The overall structure of the blocklist mechanism is shown in the figure below. 

Image Added

In JM, there will be a component (JobMasterBlocklistHandler) responsible for managing all BlockedNode(s) and performing them on SlotPool. 

In RM, there will also be a component (ResourceManagerBlocklistHandler) responsible for managing the cluster-level blocklist. SlotManager will be aware of the blocklist information and filter out resources in the blocklist when allocating slots from registered task managers. Similarly, when the ResourceManagerDriver requests new resources from an external resource manager (Yarn or Kubernetes), it also needs to filter out the blocked nodes.

BlockedNode

A BlockedNode corresponds to the blocked information of a specific node, including the following 4 fields:

  1. id: The identifier of the node.
  2. startTimestamp: The start time of the blocking.
  3. endTimestamp: The end time of the blocking (at which time the node will become available again). 
  4. cause: The cause for blocking this node.
Code Block
titleBlockedNode
/**
 * This class represents a blocked node.
 */
public class BlockedNode {
    
    public long getStartTimestamp();

    public long getEndTimestamp();
    
    public String getCause();

    public String getId();
}

Blocklist on JobMaster

JobMasterBlocklistHandler

JobMasterBlocklistHandler is the component in JM responsible for managing all blocked node information and performing them on SlotPool. It consists of two sub-components:BlocklistTracker and BlocklistContext. When receiving a new blocked node information, the JobMasterBlocklistHandler will handle it as following:

  1. Add the new blocked nodes to the BlocklistTracker. 
  2. Synchronize the new blocked nodes to RM.
  3. Block the resources on newly added nodes via the BlocklistContext.

BlocklistTracker

BlocklistTracker is the component responsible for tracking blocked nodes. The tracker will regularly remove timeout blocked nodes.

Code Block
titleBlocklistTracker
public interface BlocklistTracker {

    /**
     * Add or update blocked nodes.
     *
     * @param nodes The nodes to add or update
     * @return Newly added or updated nodes.
     */
    Collection<BlockedNode> addNewBlockedNodes(Collection<BlockedNode> nodes);

    /** Returns whether the given task manager is located on blocked nodes. */
    boolean isBlockedTaskManager(ResourceID taskManagerId);

    /** Get all blocked nodes. */
    Set<String> getBlockedNodes();

    /** Remove the timeout nodes. */
    void removeTimeoutNodes();
}
     

BlocklistContext

BlocklistContext is the component responsible for blocking slots in SlotPool, the details will be described in SlotPool.

Code Block
titleBlocklistContext
public interface BlocklistContext {
    /** Block resources on the newly added nodes. */
    void blockResources(Collection<BlockedNode> newlyAddedOrUpdatedNodes);
}
   


Code Block
titleBlocklistHandler & JobMasterBlocklistHandler
public interface BlocklistHandler {

    /** Add a new blocked node. */
    void blockNode(String nodeId, String cause, long startTimestamp, long endTimestamp);

    /** Returns whether the given task manager is located on blocked nodes. */
    boolean isBlockedTaskManager(ResourceID taskManagerId); 

    /** Get all blocked nodes. */
    Set<String> getBlockedNodes();    
}

public interface JobMasterBlocklistHandler extends BlocklistHandler {
}

SlotPool

SlotPool should avoid allocating slots that located on blocked nodes. To do that, our core idea is to keep the SlotPool in such a state: there is no slot in SlotPool that is free (no task assigned) and located on blocked nodes. Details are as following:

  1. When receiving slot offers from task managers located on blocked nodes, all offers should be rejected.
  2. When a node is newly blocked, we should release all free(no task assigned) slots on it. We need to find all task managers on blocked nodes and release all free slots on them by SlotPoolService#releaseFreeSlotsOnTaskManager.
  3. When a slot state changes from reserved(task assigned) to free(no task assigned), it will check whether the corresponding task manager is blocked. If yes, release the slot.

We will introduce a new slot pool implementation: BlocklistSlotPool, which extends the DefaultDeclarativeSlotPool and overrides some methods to implement the blocklist-related functions described above, and the blocklist information will be passed in the constructor. This new implementation will be only used when the blocklist is enabled.

Code Block
titleSlotPoolService
public interface SlotPoolService {
    /**
     * Releases all free slots belonging to the owning TaskExecutor if it has been registered.
     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
    
Code Block
titleBlacklistContext
public interface BlacklistContext {
    /** Perform the newly added or updated blacklist items on resources. */
    void blacklistResources(Collection<BlacklistedItem<?>> newlyAddedOrUpdatedItemsreleaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);
}
    //...
}


Code Block
titleBlacklistHandler & JobMasterBlacklistHandlerBlocklistSlotPool
public interfaceclass BlacklistHandlerBlocklistSlotPool extends BlacklistTrackerDefaultDeclarativeSlotPool {
}

public interface JobMasterBlacklistHandler extends BlacklistHandler {

    /**
     * Notify an exception that may generate blacklist items.
     *
     * @param locations locations of the exception
     * @param cause the exception
     */
    void notifyException(Collection<TaskManagerLocation> locations, Throwable cause);
}

SlotPool

SlotPool should avoid allocating slots from blacklisted task managers. Blacklisted task managers include those directly blacklisted and those located on blacklisted nodes. To do that, our core idea is to keep the SlotPool in such a state: there is no slot in SlotPool that is free (no task assigned) and blacklisted. Details are as following:

  1. When SlotPool starts, BlacklistedTaskManagerChecker will be passed in to check whether a task manager is blacklisted.
  2. When receiving slot offers from blacklisted task managers (including task managers on blacklisted nodes), all offers should be rejected.
  3. When a task manager is newly blacklisted, BlacklistContext will perform the following on SlotPool:
    1. If the action is MARK_BLACKLISTED, release all free(no task assigned) slots on the blacklisted task manager.
    2. If the action is MARK_BLACKLISTED_AND_EVACUATE_TASKS, release all slots on the blacklisted task manager.
  4. When a slot state changes from reserved(task assigned) to free(no task assigned), it will check whether the corresponding task manager is blacklisted. If yes, release the slot.

// ...
}

Blocklist on ResourceManager

ResourceManagerBlocklistHandler

ResourceManagerBlocklistHandler is a new component introduced in RM for the blocklist mechanism. It has only one sub-component: BlocklistTracker, which is responsible for managing cluster-level blocklist.

Code Block
titleResourceManagerBlocklistHandler
public interface ResourceManagerBlocklistHandler extends BlocklistHandler {
}

SlotManager

SlotManager should filter out blocked resources when allocating registered resources. To do that, we need following changes:

  1. When starting SlotManager, the BlockedTaskManagerChecker will be passed in to check whether a registered task manager is located on blocked nodes.
  2. When trying to fulfill the slot requirements by registered task managers, the task managers located on blocked nodes will be filtered out.
  3. SlotManager will request new task managers from external resource managers if the registered resources cannot fulfill the requirements. The blocklist also takes effect when requesting new task managers, the details will be described in ResourceManagerDriver.
Code Block
titleBlockedTaskManagerChecker
/** This checker helps to query whether a given task manager is located on blocked nodes. */
public interface BlockedTaskManagerChecker {

    /** Returns whether the given task manager is located on blocked nodes. */
    boolean isBlockedTaskManager(ResourceID taskManagerId);
}


Code Block
titleSlotManager
public interface SlotManager {

    /** Starts the slot manager with the given leader id and resource manager actions. */
    void start(
            ResourceManagerId newResourceManagerId,
            Executor newMainThreadExecutor,
            ResourceActions newResourceActions,
Code Block
titleSlotPoolService
public interface BlacklistedTaskManagerChecker {
    /**
     * Returns whether the given task manager is blacklisted.
     */
    boolean isBlacklistedTaskManager(ResourceID resourceID);
}


public interface SlotPoolService {

    /** Start the encapsulated slot pool implementation. */
    void start(
            JobMasterId jobMasterId,BlockedTaskManagerChecker newBlockedTaskManagerChecker);

            String address,
            ComponentMainThreadExecutor mainThreadExecutor,
            BlacklistedTaskManagerChecker blacklistedTaskManagerChecker)
            throws Exception;

   /**
     * Releases all slots belonging to the owning TaskExecutor if it has been registered.
     *
     * @param taskManagerId identifying the TaskExecutor
     * @param cause cause for failing the slots
    //...
}

ResourceManagerDriver

When deploying Flink on Yarn or Kubernetes, the ResourceManagerDriver is responsible for requesting new task managers from the external resource manager. To avoid allocating task managers from blocked nodes, we need to pass the information of blocked nodes to ResourceManagerDriver at its initialization. 

ResourceManagerDriver uses following APIs to tell external resource managers about the information of blocked nodes:

  1. Yarn: AMRMClient#updateBlacklist 
  2. Kubernetes: NodeAffinity
Code Block
titleResourceManagerDriver
public interface BlockedNodeRetriever {

    /** Retrieve blocked nodes. */
    Set<String> getBlockedNodes();
}

public interface ResourceManagerDriver {

    /** Initialize the deployment specific components. */
    void releaseSlotsOnTaskManagerinitialize(ResourceID
 taskManagerId, Exception cause);

    /**
     *ResourceEventHandler<WorkerType> ReleasesresourceEventHandler,
 all free slots belonging to the owning TaskExecutor if it has beenScheduledExecutor registered.mainThreadExecutor,
     *
     * @param taskManagerId identifying the TaskExecutor
Executor ioExecutor,
      * @param cause cause for failing theBlockedNodeRetriever slotsblockedNodeRetriever)
     */
     void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId,throws Exception cause);

    //...
}

Blacklist on ResourceManager

ResourceManagerBlacklistHandler

ResourceManagerBlacklistHandler is a new component introduced in RM for the blacklist mechanism. It has only one sub-component: BlacklistTracker, which is responsible for managing cluster-level blacklisted items.

Code Block
titleResourceManagerBlacklistHandler
public interface ResourceManagerBlacklistHandler extends BlacklistHandler {
}

SlotManager

SlotManager should filter out blacklisted resources when allocating registered resources. To do that, we need following changes:

  1. When starting SlotManager, the BlacklistedTaskManagerChecker will be passed in to check whether a registered task manager is blacklisted.
  2. When trying to fulfill the slot requirements by registered task managers, the blacklisted ones will be filtered out.
  3. SlotManager will request new task managers from external resource managers if the registered resources cannot fulfill the requirements. The blacklist also takes effect when requesting new task managers, the details will be described in ResourceManagerDriver.
Code Block
titleSlotManager
public interface SlotManager {

    /** Starts the slot manager with the given leader id and resource manager actions. */
    void start(
            ResourceManagerId newResourceManagerId,
            Executor newMainThreadExecutor,
            ResourceActions newResourceActions,
            BlacklistedTaskManagerChecker newBlacklistedTaskManagerChecker);

    //...
}

ResourceManagerDriver

When deploying Flink on Yarn or Kubernetes, the ResourceManagerDriver is responsible for requesting new task managers from the external resource manager. To avoid allocating task managers from blacklisted nodes, we need to pass the information of blacklisted nodes to ResourceManagerDriver at its initialization. 

ResourceManagerDriver uses following APIs to tell external resource managers about the information of blacklist nodes:

  1. Yarn: AMRMClient#updateBlacklist 
  2. Kubernetes: NodeAffinity
Code Block
titleResourceManagerDriver
public interface BlacklistedNodeRetriever {

    /** Retrieve blacklisted nodes. */
    Set<String> getBlacklistedNodes();
}

public interface ResourceManagerDriver {

    /** Initialize the deployment specific components. */
    void initialize(
            ResourceEventHandler<WorkerType> resourceEventHandler,
            ScheduledExecutor mainThreadExecutor,
            Executor ioExecutor,
            BlacklistedNodeRetriever blacklistedNodeRetriever)
            throws Exception;

    //...
}

Synchronize Blacklist between JM & RM

The newly added or updated blacklisted items will be synchronized between JM and RM via RPC: 

  1. Once a few blacklisted items are newly added (or updated) to the JobMasterBlacklistHandler, RM will be notified of these items via ResourceManagerGateway#notifyNewBlacklistedItems.
  2. When RM receives the blacklisted items notified by a JM, it will add them into ResourceManagerBlacklistHandler, and notify all JMs of the successfully added (or updated) items through JobMasterGateway#notifyNewBlacklistedItems. 
  3. Similarly, when JM receives the blacklisted items notified by RM, it will also add them to JobMasterBlacklistHandler.
Code Block
titleBlacklistListener
public interface BlacklistListener {

    /** Notify new blacklisted items. */
    void notifyNewBlacklistedItems(Collection<BlacklistedItem<?>> newItems);
}

public interface JobManagerGateway extends BlacklistListener {
    //...
}

public interface ResourceManagerGateway extends BlacklistListener {     
    //...
}  

Enrich TaskManagerLocation with node information

In order to support blacklisting nodes, it is necessary to know the node where the task is located. To do that, we need to add a node identifier into TaskManagerLocation. This node identifier should be set by the resource manager when requesting new task managers.

Code Block
titleTaskManagerLocation
public class TaskManagerLocation {

    public String getNodeId();

    //...
}

Metrics

We propose to introduce following Gauge metrics to ResourceManagerMetricGroup:

  1. numBlacklistedTaskMangers: The number of currently blacklisted task managers (including task managers on blacklisted nodes)
  2. numBlacklistedNodes: The number of currently blacklisted nodes

REST API

We will introduce following REST APIs for blacklist mechanism:

  1. REST API for querying blacklist information.
  2. REST API for adding new blacklisted items.
  3. REST API for removing existing blacklisted items.

Query

Add a REST API to obtain blacklist information. Each request will return all current blacklist items, which are obtained from ResourceManagerBlacklistHandler.

GET: http://{jm_rest_address:port}/blacklist

Request: {}

Response:

Code Block
titleResponse
{
  /** This group only contains directly blacklisted task managers */
  "blacklistedTaskManagers": [
      {
          "id" : "container_XXX_000002",
          "timestamp" : "XXX",
          "action" : "MARK_BLACKLISTED"
      },
      {
          "id" : "container_XXX_000003",
          "timestamp" : "XXX",
          "action" : "MARK_BLACKLISTED"
      }, 
      ...
  ],
  "blacklistedNodes": [
      {
          "id" : "node1",
          "timestamp" : "XXX",
          "action" : "MARK_BLACKLISTED"
          "taskManagers" : [“container_XXX_000004”, “container_XXX_000005”, …]
      },
      ...
  ]
}

In order to get blacklist information, we need to add an interface to ResourceManagerGateway:

Code Block
titleResourceManagerGateway
public interface ResourceManagerGateway {
   CompletableFuture<BlacklistInfo> requestBlacklist(@RpcTimeout Time timeout);
   // ...
}

Add

POST: http://{jm_rest_address:port}/blacklist/add

Request:

Code Block
titleRequest
{
  "newBlacklistedTaskManagers": [
      {
          "id" : "container_XXX_000002",
          "action" : "MARK_BLACKLISTED"
      },
      {
          "id" : "container_XXX_000003",
          "action" : "MARK_BLACKLISTED"
      }, 
      ...
  ],
  "newBlacklistedNodes": [
      {
          "id" : "node1",
          "action" : "MARK_BLACKLISTED"
      },
      ...
  ]
}

Response: {}

Remove

POST: http://{jm_rest_address:port}/blacklist/remove

Request:

Code Block
titleRequest
{
  "blacklistedTaskManagersToRemove": [
      {
          "id" : "container_XXX_000002",
          "action" : "MARK_BLACKLISTED"
      },
      {
          "id" : "container_XXX_000003",
          "action" : "MARK_BLACKLISTED"
      }, 
      ...
  ],
  "blacklistedNodesToRemove": [
      {
          "id" : "node1",
          "action" : "MARK_BLACKLISTED"
      },
      ...
  ]
}

Response: {}


Synchronize Blocklist between JM & RM

The newly added/updated blocked nodes will be synchronized between JM and RM via RPC: 

  1. Once a few blocked nodes are newly added/updated to the JobMasterBlocklistHandler, RM will be notified of these nodes via ResourceManagerGateway#notifyNewBlockedNodes.
  2. When RM receives the blocked nodes notified by a JM, it will add them into ResourceManagerBlocklistHandler, and notify all JMs of the successfully added/updated nodes through JobMasterGateway#notifyNewBlockedNodes
  3. Similarly, when JM receives the blocked nodes notified by RM, it will also add them to JobMasterBlocklistHandler.
Code Block
titleBlocklistListener
public interface BlocklistListener {

    /** Notify newly added/updated blocked nodes. */
    void notifyNewBlockedNodes(Collection<BlockedNode> newlyAddedOrUpdatedNodes);
}

public interface JobManagerGateway extends BlocklistListener {
    //...
}

public interface ResourceManagerGateway extends BlocklistListener {     
    //...
}  

Enrich TaskManagerLocation with node information

In order to support blocked nodes, it is necessary to know the node where the task is located. To do that, we need to add a node identifier into TaskManagerLocation. This node identifier should be set by the resource manager when requesting new task managers.

Code Block
titleTaskManagerLocation
public class TaskManagerLocation {

    public String getNodeId();

    //...
}


Compatibility, Deprecation, and Migration Plan

The blacklist mechanism will be an optional feature which the user has to activate explicitly by setting the config option cluster.resource-blacklist.enabled: true. This entails blocklist mechanism is only used when speculative execution is enabled, which entails that Flink's default behavior won't change.

Future improvements

Introduce a configurable blacklist strategy implementation

We intend to introduce a configurable blacklist strategy in the future. Users can specify  exceptions for which the strategy should blacklist the resources. This requires adding several configuration options to configure exceptions and the generated blacklist item, which requires further design and discussion.

Introduce plugin mechanism for blacklist strategy

Limitations

No integration with Flink's web UI

This FLIP does not modify the web UI, but it's in our plan. Currently, we have a preliminary idea of the UI intergration, including improve the slots information displaying and add a page to show the blocklist information.

No support for JM failover

Currently, the blocklist information will be lost after JM failover.  In the future, we may persist the blocklist information in HA to support JM failoverWe may introduce a plugin mechanism in the future, which allows users to load their own blacklist strategy implementations. This means that BlacklistStrategy needs to be opened to users as a public interface, which requires more thought and discussion.


Test Plan

  1. The changes will be covered by unit and IT cases.
  2. Test the functionality in a real Standanlone/Yarn/Kubernetes cluster.

...