Discussion threadhttps://lists.apache.org/thread/qrl881wykob3jnmzsof5ho8b9fgkklpt
Vote threadhttps://lists.apache.org/thread/hdkz4v5phqwbr8b8971kqot31om8osfq
JIRA

Unable to render Jira issues macro, execution error.
(follow-up of Unable to render Jira issues macro, execution error. )

Release1.18.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

With FLINK-24038, we introduced a new way of handling leader election: Instead of a per-component leader election, we introduced a per-process (i.e. JobManager process) leader election. For Flink 1.15, we kept both approaches and made it possible for the user to opt-out from this feature through a configuration parameter high-availability.use-old-ha-services. This made it necessary to have an overly complicated implementation of the per-process approach in order to make it fit to the components/interfaces of the per-component implementation. 

With Flink 1.16, the old implementation was removed (FLINK-25806) but the complexity of the leader election implementation was preserved. This FLIP is about reducing the complexity of this part of the code. 

Additionally, with FLINK-24038 some ownership changed: Before FLINK-24038, the LeaderElectionDriver (i.e. the connection to the HA backend) was implicitly handled by the LeaderContender. If the LeaderContender closed the LeaderElectionService, the connection to the HA backend was closed. This changed with FLINK-24038. The lifecycle management of LeaderElectionDriver was moved into the HighAvailabilityServices. As a consequence, the connection to the HA backend was only closed when the HighAvailabilityServices were closed. This conceptual change was overlooked when introducing FLINK-24038. It actually would have deserved a FLIP. Writing this FLIP shall fix this shortcoming of FLINK-24038 by providing a proper proposal on how to integrate the FLINK-24038.

Public Interfaces

tl/dr: No public interfaces are affected.

The leader election is currently exposed by the HighAvailabilityServices interface. This interface is not considered to be public and, therefore, could be touched. Changing the methods of this interface might affect projects that rely on a custom implementation.

Proposed Changes

This section is separated into three sections:

  1. Per-component leader election (Flink 1.15-, pre-FLINK-24038) describes how leader election was implemented before FLINK-24038 was introduced.
  2. Per-process leader election (Flink 1.15+; FLINK-24038) describes the current state in Flink with FLINK-24038 being implemented and the legacy code being partially removed with FLINK-25806.

  3. Code Cleanup: Merge MultipleComponentLeaderElectionService into LeaderElectionService is the proposed change that integrates multi-component/single-process leader election more deeply into Flink.

The following class diagram illustrates the relationship between the classes and interfaces before FLINK-24038 was introduced.

DefaultLeaderElectionService orchestrates the communication between the LeaderContender and the LeaderElectionDriver. There’s a clear separation of concerns:

  • LeaderContender: The interface to inform the contender of the leadership change.
  • LeaderElectionService: The interface for confirming the leadership change and retrieving the current state. Additionally, the leader election can be started and stopped.
  • LeaderElectionEventHandler: This interface is used to communicate changes in the HA backend to the LeaderElectionSerivce (i.e. leadership is granted or lost or some other process changed the leader information).
  • LeaderElectionDriver: Interface for writing the component’s leader information to the HA backend and retrieving the information about whether leadership is currently acquired.

The following class diagram includes the changes introduced FLINK-24038 (sorry that it doesn't match the other two diagrams):

MultipleComponentLeaderElectionService is introduced to orchestrate the communication between the HA backend (MultipleComponentLeaderElectionDriver) and the LeaderElectionService instances of each component (i.e. ResourceManager, Dispatcher, …). MultipleComponentLeaderElectionDriverAdapter is introduced as an abstraction layer that makes the new implementation fit to the old interfaces LeaderElectionService and LeaderElectionDriver. This enabled us to use the per-component leader election and the per-process leader election side-by-side.

  • MultipleComponentLeaderElectionDriver is an interface analogous to the LeaderElectionDriver interface that can be used by MultipleComponentLeaderElectionService. It provides methods for changing LeaderInformation on a per-component level.
  • MultipleComponentLeaderElectionDriver.Listener is used by the MultipleComponentLeaderElectionDriver to inform the MultipleComponentLeaderElectionService about leadership changes for certain components.
  • MultipleComponentLeaderElectionService distributes the leader election events to the different LeaderContenders through their LeaderElectionServices.
  • MultipleComponentLeaderElectionDriverAdapter translates LeaderElectionDriver interfaces into something that can be processed by the MultipleComponentLeaderElectionService.

Code Cleanup: Merge MultipleComponentLeaderElectionService into LeaderElectionService

The flaw of the current implementation is that DefaultLeaderElectionService only supports a single contender. The lifecycle of the LeaderElectionDriver is closely coupled with the LeaderElectionService and LeaderContender. In contrast, the per-process leader election allows multiple contenders to be assigned to the same leader election instance. Adding LeaderContenders is independent from the lifecycle management of the LeaderElectionDriver through LeaderElectionService.start/stop.

This proposal is about making LeaderElectionService support multiple LeaderContender instances instead of a single one. This is achieved by introducing LeaderElection as an additional interface. The following class diagram illustrates the architecture:

This proposal brings back what was already provided in pre-FLINK-24038/1.15-: The interfaces are extended by specifying a componentId . This componentId  needs to be unique between the components of a Flink cluster because it is used by the LeaderElectionService to save the leading contender's RPC endpoint information in a dedicated entry (see ZooKeeperMultipleComponentLeaderElectionHaServices' JavaDoc for a reasonable illustration of the data structure). The componentId  is considered LeaderElectionService/HighAvailabilityServices-internal knowledge and doesn't need to be exposed to the LeaderContender.

The responsibilities of the interfaces are as follows::

  • LeaderElectionService provides lifecycle management methods for starting and stopping the internally used LeaderElectionDriver and the internally used executor. The driver will be instantiated as soon as the first contender is registered and closed as soon as the last contender is removed. Additionally, it provides a factory method for creating LeaderElection instances.
  • LeaderElection is used as an abstraction layer hiding the componentId  from the LeaderContender. The LeaderContender doesn't need to know about the componentId  as this is considered LeaderElectionService-internal knowledge. It has its own lifecycle methods for registering the LeaderContender in the LeaderElectionService (startLeaderElection(LeaderContender)) and deregistering it (close()). The LeaderElection interface provides all methods that are needed by the LeaderContender to communicate back to the LeaderElectionService.
  • LeaderContender: The LeaderContender is integrated as usual except that it accesses the LeaderElection instead of the LeaderElectionService. It's going to call startLeaderElection(LeaderContender) where, previously, LeaderElectionService.start(LeaderContender) was called.
  • LeaderElectionDriver is used to write the leader information into the HA backend. The interface matches more or less what MultipleComponentLeaderElectionDriver offered.
  • LeaderElectionDriver.Listener replaces LeaderElectionEventHandler and serves as the interface the driver uses to communicate with the LeaderElectionService. The interface matches more or less what MultipleComponentLeaderElectionDriver.Listener offered, i.e. passing in the componentId  along the leader information.

As mentioned above, this FLIP only refactors the current implementation. The ownership changes introduced in FLINK-24038 are not touched. Therefore, the lifecycle management of the LeaderElectionDriver would be handled by the HighAvailabilityServices through the methods provided by LeaderElectionService: The HA backend connection is created lazily when the first contender has been registered. Analogously, the connection is closed as soon as the last contender is removed. If the LeaderElectionService has leadership, the contender will be informed through the interface after registering itself. Analogously, deregistering/removing the contender doesn’t stop the leader election process (other contenders might still be participating in the leader election).

Compatibility, Deprecation, and Migration Plan

As mentioned previously already, the functionality doesn’t change. The LeaderElection code would just become more consistent. This should improve readability and testability.

Test Plan

We still have tests that were not migrated as part of FLINK-24038. The initial plan was to migrate them consider the MultipleComponent* classes (FLINK-30338). With this FLIP, we could, instead, keep all these tests (with slight modifications) and migrate the tests of the MultipleComponent* test classes to cover the new FLIP-285 LeaderElectionService implementation.

Rejected Alternatives

Early initialization of the LeaderElectionDriver

Initially, the plan was to initialize the LeaderElectionDriver early in the DefaultLeaderElectionService constructor. See the sequence diagram below.

The sequence diagram above illustrates how the instantiation process is separated from starting the leader election. The two processes became more complex due to the introduction of the LeaderElection adapter interface. As mentioned before, LeaderElection is meant to hide the componentId  ("resource_manager" in the above diagram) from the LeaderContender because this is an implementation detail of the LeaderElectionService and is owned by the HighAvailabilityServices. Only the HighAvailabilityServices "know" how the different components are organized, i.e. whether they all share the same LeaderElectionService (a per-JVM leader election) or whether individual components rely on their own LeaderElectionService (a more fine-grained leader election with more than one LeaderElectionDriver monitoring changes in the HA backend).

This approach was rejected during the implementation because creating the driver in the DefaultLeaderElectionService  meant instantiating the service lazily in the HighAvailabilityServices  implementations (rather than in the HAServices constructor) because we're using the same HAServices in the TaskManagers. Creating the DefaultLeaderElectionService in the constructor would initialize the driver of the LeaderElection and, therefore, make the TaskManager become a particant in the leader election. That's not what we want: The JobManager is the master process. The TaskManager should only monitor the leadership.

Reflect the per-process implementation in the interface

HighAvailabilityServices offers LeaderElectionService factory methods per component right now. The internal implementation does not support that. An initial proposal which was discussed on the mailing list suggested reducing the number of factory methods in HighAvailabilityServices. All components could rely on the same LeaderElectionService (analogously for the LeaderRetrievalService). This was rejected because it reduces the flexibility of future changes where we might want to introduce HighAvailabilityServices with per-component leader election again.

Updating the HA Backend Data Schema

There was an initial idea to reduce the connection information that is stored in the HA backend. The current data structure appeared to be redundant considering that we have a per-process instead of a per-component leader election with FLIP-285. The proposal included compacting the data structure as it is right now (ZooKeeperMultipleComponentLeaderElectionHaServices' JavaDoc for a the following visualization of the current data structure):

 * /flink
 *      +/cluster_id_1/leader/latch
 *      |            |       /resource_manager/connection_info
 *      |            |       /dispatcher/connection_info
 *      |            |       /rest_server/connection_info
 *      |            |       /job-id-1/connection_info
 *      |            |       /job-id-2/connection_info
 *      |            |
 *      |            |
 *      |            +jobgraphs/job-id-1
 *      |            |         /job-id-2
 *      |            +jobs/job-id-1/checkpoints/latest
 *      |                 |                    /latest-1
 *      |                 |                    /latest-2
 *      |                 |       /checkpoint_id_counter

There was a plan to merge the connection_info of the components being collected under one leader election service instance assuming that they all include redundant information:

 * /flink
 *      +/cluster_id_1/leaders/leader-service-0/latch
 *      |            |        |                /connection_info
 *      |            |        +/leader-service-1/latch
 *      |            |                          /connection_info
 *      |            +jobgraphs/job-id-1
 *      |            |         /job-id-2
 *      |            +jobs/job-id-1/checkpoints/latest
 *      |                 |                    /latest-1
 *      |                 |                    /latest-2
 *      |                 |       /checkpoint_id_counter 

A similar thing could have been achieved for the Kubernetes implementation.

But: This approach does not work because the connection information is actually different between the components even-though they run in the same JVM. The connection information is coming from the RPC system which most certainly require to differentiate between the RPC endpoints for each of the components (e.g. through different ports). Therefore, each connection information entry is unique. Therefore, touching the HA backend data structure is rejected.

Component ID (sometimes still refered to as contender ID in the following text) lives in the LeaderContender

The flaw of the current implementation is that DefaultLeaderElectionService only supports a single contender. The lifecycle of the LeaderElectionDriver is closely coupled with the LeaderElectionService and LeaderContender. In contrast, the per-process leader election allows multiple contenders to be assigned to the same leader election instance. Adding LeaderContenders is independent from the lifecycle management of the LeaderElectionDriver through LeaderElectionService.start/stop.

This proposal is about making LeaderElectionService support multiple LeaderContender instances instead of a single one. What is now LeaderElectionService.start/stop would become something like register/remove for registering and deregistering a single LeaderContender instance. In contrast, start/stop would be in charge of starting/stopping the HA backend leader election client (i.e. LeaderElectionDriver). We’re going to disconnect the lifecycle management of the LeaderElectionDriver from adding the LeaderContender: LeaderElectionDriver’s lifecycle is managed indirectly through the HighAvailabilityServices. The LeaderContender only registers and deregisters itself.

The LeaderElectionService now requires to know what LeaderContender checks for leadership or confirms the leadership: Only registered LeaderContenders are allowed to access this information. Some unique contender ID is introduced that functions as an identifier for the contender and will be used in the HA backend for storing this contender's connection information (analogously to how it's done in the MultipleComponentLeaderElectionService right now). There's a nice visualization on the data structure used in ZooKeeper right now (see ZooKeeperMultipleComponentLeaderElectionHaServices' JavaDoc).

The following class diagram shows the proposal (it's close to the pre-FLINK-24038 class diagram):

This proposal gets back to how it was pre-FLINK-24038/1.15-: The only change is that the MultipleComponent* interfaces are merged into the corresponding legacy interfaces (e.g. considering the componentId  now).

  • LeaderContender: The getDescription() will be removed. It was previously used for log messages. This very same can be done using the componentId . Additionally, the componentId  will be used as a reference for the connection information of this specific contender. The flaw of this approach is that the componentId  is specified in the LeaderContender instead of the HighAvailabilityServices.
  • LeaderElectionService provides lifecycle management methods for starting and stopping the internally used LeaderElectionDriver (start(), close()). These methods are used by the HighAvailabilityServices implementation which owns the LeaderElectionService. Additionally, it provides methods for registering and removing LeaderContender instances. These methods are essentially called whereever LeaderElectionService.start(LeaderContender)/stop() are called right now. The leadership management methods (confirmLeadership, hasLeadership) are extended to also pass in the componentId  to identify the exact connection information for the LeaderContender in question.
  • LeaderEventHandler slightly changes in onLeaderInformationChange where the LeaderElectionDriver is now able to pass in not only the LeaderInformation but also the componentId  for which the leader information was changed.
  • LeaderElectionDriver changes for writing out the leader information where we now also need the componentId  to perform the update of the leader information for that specific contender.