Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/03j5gk8d4vyr6jos005p18ym5bp6wds0

JIRA: Unable to render Jira issues macro, execution error.

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Flink provides the highly-available setup in an "all or nothing" manner. In highly-available setups, Flink offers two mechanisms: leader election/retrieval services for JobManager and persistence services for job metadata. The relevant interfaces for these two mechanisms are defined in the HighAvailabilityServices interface. At runtime, Flink will construct different implementations of HighAvailabilityServices based on user configuration, e.g. KubernetesLeaderElectionHaServices and ZooKeeperLeaderElectionHaServices. This means that these two mechanisms can only be enabled or disabled together.
However, in OLAP scenarios, we only need the leader election/retrieval services for components in JobManager. In our production environment, users submit a lot of short queries through the SQL Gateway. These jobs are typically completed within a few seconds. When an error occurs during job execution, users simply need to resubmit the job. Therefore, there is no need to restart the job from a JobManager failure, persist its state for recovery, or perform leader election for a specific job. On the contrary, the persistence of job states can lead to a decrease in the cluster's throughput for short query, which has been demonstrated by the HighAvailabilityServiceBenchmark(258 qps without HA v.s. 69 qps with ZK HA). At the same time, in this scenario, we consider the Flink session cluster as a service. To ensure the SLA (Service Level Agreement), we utilize JobManager's failover mechanism to minimize service downtime. Therefore, we need to enable the leader election for the components in the JobManager process, e.g. ResourceManager and Dispatcher.
In this FLIP, we propose to allow users to independently adjust the high availability strategies related to jobs through configuration.

Public Interfaces

  • Introduce the high-availability.enable-job-recovery to control the implementation of leader services and persistence services for JobMaster. This config option should only be valid in session mode and true by default.

  • Introduce the high-availability.blob-store.enabled to control the implementation of blob services. This config should set to true by default. It should be set to false if it is not manually configured and if
    high-availability.job-recovery.enabled is set to false.

Note: I don't mention HighAvailabilityServices here because it is not labeled as public, even though we do expose configurations to allow users to configure their own implementations.

Proposed Changes

We proposed to introduce high-availability.enable-job-recovery to control the behavior related to job recovery when HA enabled. The HA Services should be splitted into LeaderServices and PersistenceServices in the future. But as discussed in rejected alternative section, this refactoring should be out of the scope of this FLIP.

HighAvailability for OLAP Scenarios

As mentioned above, in OLAP scenarios, we only require the leader election services for the Dispatcher / ResourceManager and RestEndpoint in the JobManager process. Leader election services and persistence services are redundant for jobs and may impact cluster performance. Thus, we propose to:

  • To generate HA services suitable for OLAP scenarios, we introduce the high-availability.enable-job-recovery parameter. When users enable HA with Kubernetes or ZooKeeper and set this option to false, we will:
    • Use the embedded version of CheckpointStore, JobGraphStore and JobResultStore
    • Set the high-availability.blob-store.enabled to false if it is not manually configured
  • After JM successfully grant the leadership, it no longer publishes leader information to the underlying system. Other components will determine the leader status of JM by listening to the leader information from RM.

Compatibility, Deprecation, and Migration Plan

As mentioned before, the functionality of HA services doesn’t change. Besides, this should improve readability and testability.

Test Plan

The existing tests need to be refactored according to the new hierarchy.

Rejected Alternatives

Refactoring of HighAvailabilityServices


The following class diagram illustrates the relationship between the classes and interfaces related to the HighAvailabilityServices:

  • The implementation of HighAvailabilityServices can be classified into two categories: HaServices and NonHaServices. It includes interfaces related to leader election/retrieval, job information persistence services, and interfaces for service shutdown and cleanup.

  • HaServices includes a BlobStore and JobResultStore implemented based on the Filesystem. It has two specific implementations: Kubernetes and ZooKeeper, which include the specific logic of the environment-related Checkpoint Store, JobGraph Store, and LeaderRetrievalDriver.

  • NonHaServices has two implementations: Embedded and Standalone. Embedded is used for Minicluster and implements in-memory election logic when multiple components are present within the same process. Standalone is the default option when the high availability is disabled, and it only returns the given leader address.

However, in fact, the difference between HighAvailabilityServices implementations lies only in the choice of leader service and persistence service. Different combinations result in implementations with different names. Thus, we can abstract these two parts of services as LeaderServices and PersistenceServices, and keep only one implementation for HighAvailabilityServices. Then, we can combine different LeaderServices and PersistenceServices to meet the requirements of various scenarios.
The following class diagram illustrates the classes and interfaces after refactoring:

  • The previous StandaloneHaServices and EmbeddedHaServices will be replaced by the combination of EmbeddedPersistenceServices with StandaloneLeaderServices or EmbeddedLeaderServices, respectively.

  • The previous Kubernetes and ZooKeeper scenarios will be unified using DefaultLeaderServices and DefaultPersistenceServices. The relevant materials for the scenarios, such as LeaderElectionDriverFactory and leader paths for each component, will be provided through the LeaderServiceMaterialGenerator. The specific implementations for this are KubernetesHaServicesMaterialProvider and ZooKeeperHaServicesMaterialProvider. These two classes also provide CheckpointRecoveryFactory and JobGraphStore.

Please note that this is pure refactoring and should not affect the functionality of current HA mechanism.

After discussion, we found that the refactoring of HA services contains the following issues:
- Splitting LeaderServices and PersistenceServices; As Matthias
mentioned, this allows for easier testing.
- Removal of deprecated interfaces, such as getWebMonitorLeaderElectionService.
- Reviewing existing multiple close and cleanup interfaces.
- Integration of StandaloneHaServices and EmbeddedHaServices.

We decide to move this refactoring out of the scope of this FLIP as it is big enough to have a separate discussion
thread.

  • No labels