Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Flink jobs typically run in a distributed way. In large clusters, it’s very common for cluster nodes to encounter issues, such as insufficient disk space, bad hardware, network abnormalities. These problems will cause tasks to run abnormally (failure or run slowly). The JM may cause job failures. Flink will take care of that the failures and redeploy the relevant tasks. However, due to data locality and limited resources, the new tasks are very likely to be redeployed to the same nodes, which will result in continuous task abnormalities and affect job progress. 

We propose to introduce the blacklist mechanism Currently, Flink users need to manually identify the problematic node and take it offline to solve this problem. But this approach has following disadvantages:

  1. Taking a node offline can be a heavy process. Users may need to contact cluster administors to do this. The operation can even be dangerous and not allowed during some important business events.
  2. Identifying and solving this kind of problems manually would be slow and a waste of human resources.

To solve this problem, we propose to introduce a blacklist mechanism for Flink to filter out problematic resources. Following two ways will be introduced to blacklist resources Blacklist is a mechanism to filter out problematic resources. Once a resource is judged to be abnormal, it will be blacklisted to avoid assigning tasks to it. We will introduce following two ways to blacklist resources:

  1. Manually specify the blacklisted resources through REST API. When users find abnormal nodes/TMs, they can manually blacklist them in this way.
  2. Automatically detect abnormal resources and blacklist them. Users can specify a blacklist strategy which identifies abnormal resources according to the received exception and related locations. In the first version, we only introduce the blacklist strategy interface and a no-op implementation. In the future, we will introduce a configurable blacklist strategy and plugin mechanism to load user-defined blacklist strategy implementations.

Public Interfaces

We propose to introduce following configuration options for blacklist:

...

BlacklistStrategy is the component responsible for generating blacklist items according to the exceptions and their locations notified by Scheduler. We can introduce different BlacklistStrategy implementations to adapt to different scenarios. In the first version, we will introduce a no-op implementation as default implementation. In the future, we will introduce a configurable blacklist strategy and plugin mechanism to load user-defined blacklist strategy implementations, details will be described in 

Code Block
Code Block
titleBlacklistStrategy
public interface BlacklistStrategy {
    /**
     * Generate blacklisted items according to the abnormal task's location and cause.
     *
     * @param locations the abnormal tasks’ locations.
     * @param cause the cause of blacklisted items.
     * @param timestamp the create timestamp of blacklisted items.
     * @return the generated blacklisted items.
     */
    Collection<BlacklistedItem<?>> generateBlacklistedItems(Collection<TaskManagerLocation> locations, Throwable cause, long timestamp);
}

...

Compatibility, Deprecation, and Migration PlanPlan

The blacklist mechanism will be an optional feature which the user has to activate explicitly by setting the config option cluster.resource-blacklist.enabled: true. This entails that Flink's default behavior won't change.

Future improvements

Introduce a configurable blacklist strategy implementation

We intend to introduce a configurable blacklist strategy in the future. Users can specify  exceptions for which the strategy should blacklist the resources. This requires adding several configuration options to configure exceptions and the generated blacklist item, which requires further design and discussion.

Introduce plugin mechanism for blacklist strategy

We may introduce a plugin mechanism in the future, which allows users to load their own blacklist strategy implementations. This means that BlacklistStrategy needs to be opened to users as a public interface, which requires more thought and discussionThe blacklist mechanism will be an optional feature which the user has to activate explicitly by setting the config option cluster.resource-blacklist.enabled: true. This entails that Flink's default behavior won't change.

Test Plan

  1. The changes will be covered by unit and IT cases.
  2. Test the functionality in a real Standanlone/Yarn/Kubernetes cluster.

...