Table of Contents |
---|
Status
Current state: Under discussionAdopted.
Discussion thread: here
Vote thread: Not started yet. here
JIRA: KAFKA-5505
Released: N/A AK 2.3.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Offering a first improvement to Connect's behavior under the above scenarios is the primary motivation of this KIP.
Proposed Changes
Out of the policies that are listed in Incremental Cooperative Rebalancing, this KIP is proposing an implementation that applies deferred resolution of imbalance (Design II in Incremental Cooperative Rebalancing). This policies policy appears to strike a good balance for a first implementation of Incremental Cooperative Connect protocol, since it aims to help all the aforementioned use cases and keeps imbalance resolution simple across consecutive rebalance rounds. The necessary changes to Connect's rebalance protocol are described in the rest of the section.
Changes to Connect's Rebalancing Process
In Connect, processing of rebalancing procedures happens independently and asynchronously of the Worker threads that run tasks and connectors. In order to drive group membership and coordinate rebalancing, the threads of the DistributedHerder
are used instead. This decoupling of responsibilities facilitates the main proposition of this KIP, which is to change when revocation of resources (connectors and tasks) is happening in a Connect cluster. Following the general approach described in Incremental Cooperative Rebalancing, Connect Workers should now be allowed to keep running connectors and tasks until these resources are explicitly revoked by the Leader of the group. Briefly this leads to the following changes:
It's worth mentioning here that, compared to other recent approaches that aim to alleviate the symptoms of an expensive rebalancing process basically by avoiding rebalancing altogether (for example see KIP-345), this KIP attempts to make rebalancing significantly lighter, even if that means that, in some cases, the number of actual rebalances increases. Connect provides a good platform for a first implementation of Incremental Cooperative Rebalancing and its reassignment heuristics, because connectors and tasks traditionally do not depend on local state that has to be restored upon restart. The changes necessary to implement an incremental and cooperative approach to Connect's rebalance protocol are described in the rest of the section.
Changes to Connect's Rebalancing Process
In Connect, processing of rebalancing procedures happens independently and asynchronously of the Worker threads that run tasks and connectors. In order to drive group membership and coordinate rebalancing, the threads of the DistributedHerder
are used instead. This decoupling of responsibilities facilitates the main proposition of this KIP, which is to change when revocation of resources (connectors and tasks) is happening in a Connect cluster. Following the general approach described in Incremental Cooperative Rebalancing, Connect Workers should now be allowed to keep running connectors and tasks until these resources are explicitly revoked by the Leader of the group. Briefly this leads to the following changes:
- When a Worker is prepared to join the group, When a Worker is prepared to join the group, it does not revoke (stop) any connectors or tasks that it currently runs (previously a Worker would stop all connectors and tasks in preparation of joining its group).
- When a Worker sends its metadata as part of the join request, it includes its current assignment (previously the metadata included in a join request did not include any assignment information)
- When a Worker is elected as Leader, it computes a new assignment, describing both assigned and revoked connectors and tasks (previously the Leader computed an assignment from scratch without defining revoked resources).
- When a Worker receives its assignment, if this assignment includes any revoked connectors or tasks, it stops (releases) these resources and then immediately rejoins the group with an assignment that excludes revoked resources (previously, upon receipt of assignment, the Worker started the connectors and tasks and operated in the new generation of the group protocol until the next rebalance some time in the future).
- Normally in the next assignment round, the Leader will assign resources according to its policy and there will be no revoked resources in any of the Workers. If that's not the case, the previous steps will be repeated until the group converges into an assignment without revocations.
- When a Worker receives assignments without any revocations, it starts the assigned connector and tasks, and defers rejoining the group for time equal to the amount of time included in the
ScheduledRebalanceDelay
field of the assignment included in the sync response.
Overall, as far as the rebalancing workflow is concerned, implementation of Incremental Cooperative Rebalancing does not change the definition of events that might trigger a rebalance. Rebalance can still be triggered by configuration updates, a Worker joining the group, or a Worker leaving the group - intentionally or due to a failure.
Embedded Protocol Serialization
The current KIP is suggesting a significant change in the format of subscription and assignment data that define Connect's embedded protocol on top of Kafka's group management protocol. The enhancement proposed here is yet another demonstration of how versatile and powerful the definition of an embedded protocol can be. By adding the notion of revoked assignment, Connect Workers can coordinate to achieve incremental rebalancing. In the future, further enhancements in the protocol could allow Connect to apply more sophisticated assignment policies.
However, the inclusion of a new nested field (revoked assignment) and a single field (delay) in the assignment request, along with the inclusion of assigned connectors and tasks as a nested field in the subscription request, show that extending the Connect protocol in a compatible way can be challenging. In the section Compatibility, Deprecation, and Migration Plan, we describe a process that allows for the live migration of Connect Workers to the new Connect protocol version. However, future extensions could benefit significantly from the definition of a more permissive and backwards compatible serialization format for the embedded protocol. Such benefits can be seen during implementation, when code needs to change only slightly to account for additional or missing fields, as well as during runtime, with protocols being able to get upgraded during shorter and less coordinated rolling bounces of the deployed instances (here the Connect Workers).
To achieve a good balance between simplicity and extensibility, the current KIP is making the suggestion to use flatbuffers as the serialization format of the new and future versions of the Connect protocol. The main reasons for selecting this particular serialization format are:
- Ability to select only specific fields of interest without unpacking all the data
- Unused fields are omitted entirely
- Non-present fields can be detected easily and code can easily plan for their absence.
- Good evolution/compatibility story overall.
- Good performance
- Easy to process definition. Simple parsers. Wide language coverage.
Fencing based on Rebalancing
Until now the process of rebalancing has also implied a global synchronization barrier among the members of the group. With respect to the shared resources among the group, this barrier imposed a happens-before relationship between the release and the acquisition of a resource. In Connect this simply means that a task had to stop execution on a specific worker and save its progress, before starting execution on another worker after a rebalance. This is a necessary property for resources and is still guaranteed under this proposal. However, instead of the rebalance being a flat global synchronization point for all the connectors and tasks, the happens-before relationships are enforced only on tasks that are revoked and subsequently reassigned, whenever this is required. This optimization, that allows resolution of the stop-the-world effect (as it's also known in the current form of rebalancing), is allowed because the barrier imposed by rebalancing is implicit, it concerns the Connect workers and is not exposed in Connect's API. Connect tasks run independently and when a task stops and resumes progress it is not required that all tasks of a connector do the same. Of course, incremental cooperative rebalancing can be applied not only on individual tasks but also in a group of tasks, such as the tasks of a specific connector. However, in the changes proposed in this KIP, each task remains independent and is scheduled (assigned/revoked/reassigned) independently of other tasks.
Embedded Protocol Serialization
The new Connect protocol that will introduce incremental cooperative rebalancing will extend the existing protocol by appending its new fields as optional fields on the existing format. This is a backwards compatible and easy to integrate way to have both protocols handled and live in the same code base for as much as it is necessary. Introducing a different serialization format was also considered initially (read below under Rejected Alternatives a short summary). To read how the existing protocol is extended to support incremental cooperative rebalancing read below the section on Public InterfacesThe above points are not meant to serve as a general and exhaustive comparison between serialization protocols. Every software has its own requirements and idiosyncrasies in terms of data representation and such characteristics can lead to different format selection per use case. These points are presented here to demonstrate that flatbuffers is a sufficient candidate for Connect's embedded protocol and could be used to describe other embedded protocols in the future.
Deferred Resolution of Imbalance under Different Rebalancing Scenarios
...
Code Block | ||||
---|---|---|---|---|
| ||||
Initial group and assignment: W1([AC0, AT1, AT2, BC0, BT1])
Config topic contains: AC0, AT1, AT2, BC0, BT1
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, BC0, BT1]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1], revoked: [AT2, BC0, BT1])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])
W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1]
Rebalance is triggered
W2 joins with assignment: []
W3 joins with assignment: []
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1], revoked: [])
W2(delay: 0, assigned: [AT2, BC0], revoked: [])
W3(delay: 0, assigned: [BT1], revoked: []) |
...
Code Block | ||||
---|---|---|---|---|
| ||||
Initial group and assignment: W1([AC0, AT1]), W2([AT2, BC0]), W3([BT1])
Config topic contains: AC0, AT1, AT2, BC0, BT1
W1 is current leader
W2 leaves
Rebalance is triggered
W1 joins with assignment: [AC0, AT1]
W3 joins with assignment: [BT1]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: d, assigned: [AC0, AT1], revoked: [])
W3(delay: d, assigned: [BT1], revoked: [])
After delay d:
W1 joins with assignment: [AC0, AT1]
W3 joins with assignment: [BT1]
Rebalance is triggered
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, BC0], revoked: [])
W3(delay: 0, assigned: [BT1, AT2], revoked: []) |
...
Code Block | ||||
---|---|---|---|---|
| ||||
Initial group and assignment: W1([AC0, AT1]), W2([AT2, BC0]), W3([BT1])
Config topic contains: AC0, AT1, AT2, BC0, BT1
W1 is current leader
W2 leaves
Rebalance is triggered
W1 joins with assignment: [AC0, AT1]
W3 joins with assignment: [BT1]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: d, assigned: [AC0, AT1], revoked: [])
W3(delay: d, assigned: [BT1], revoked: [])
Before delay d expires:
W2 joins with assignment: []
Rebalance is triggered
W1 becomes leader
W1 computes and sends assignments:
W1(delay: d', assigned: [AC0, AT1], revoked: [])
W2(delay: d', assigned: [], revoked: [])
W3(delay: d', assigned: [BT1], revoked: [])
d' is the remaining delay
After delay d':
W1 joins with assignment: [AC0, AT1]
W2 joins with assignment: []
W3 joins with assignment: [BT1]
Rebalance is triggered
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1], revoked: [])
W2(delay: 0, assigned: [AT2, BC0], revoked: [])
W3(delay: 0, assigned: [BT1], revoked: []) |
...
Code Block | ||||
---|---|---|---|---|
| ||||
Initial group and assignment: W1([AC0, AT1]), W2([AT2, BC0]), W3([BT1])
Config topic contains: AC0, AT1, AT2, BC0, BT1
W1 is current leader
W1, which is the leader, leaves
Rebalance is triggered
W2 joins with assignment: [AT2, BC0]
W3 joins with assignment: [BT1]
W3 becomes leader.
No delay is in progress.
W3 computes and sends assignments:
W2(delay: 0, assigned: [AT2, BC0, AC0], revoked: [])
W3(delay: 0, assigned: [BT1, AT1], revoked: []) |
...
Code Block | ||||
---|---|---|---|---|
| ||||
Initial group and assignment: W1([AC0, AT1]), W2([AT2, BC0]), W3([BT1]) Config topic contains: AC0, AT1, AT2, BC0, BT1 W1 is current leader W2 leaves Rebalance is triggered W1 joins with assignment: [AC0, AT1] W3 joins with assignment: [BT1] W1 becomes leader W1 computes and sends assignments: W1(delay: d, assigned: [AC0, AT1], revoked: []) W3(delay: d, assigned: [BT1], revoked: []) Before delay d expires: W2 joins with assignment: [] Rebalance is triggered W1 becomes leader W1 computes and sends assignments: W1(delay: d', assigned: [AC0, AT1], revoked: []) W2(delay: d', assigned: [], revoked: []) W3(delay: d', assigned: [BT1], revoked: []) d' is the remaining delay W1, which is the leader, leaves Rebalance is triggered W2 joins with assignment: [] W3 joins with assignment: [BT1] W3 becomes leader. There's an active delay in progress. W3 computes and sends assignments: W2(delay: d'', assigned: [], revoked: []) W3(delay: d'', assigned: [BT1, AT1], revoked: []) d'' is the remaining delay Before delay d'' expires: W1 joins with assignment: [] Rebalance is triggered W3 becomes leader W3 computes and sends assignments: W1(delay: d''', assigned: [], revoked: []) W2(delay: d''', assigned: [], revoked: []) W3(delay: d''', assigned: [BT1], revoked: []) After delay d''': W1 joins with assignment: [] W2 joins with assignment: [] W3 joins with assignment: [BT1] Rebalance is triggered W3 becomes leader W3 computes and sends assignments: W1(delay: 0, assigned: [BC0, AT2], revoked: []) W2(delay: 0, assigned: [AC0, AT1], revoked: []) W3(delay: 0, assigned: [BT1], revoked: []) |
...
Subscription (Member → Leader):
Code Block |
---|
Subscription => Version SubscriptionFlatBuffer Version Subscription => Version ConfigOffset Allocation Version => Int16 SubscriptionFlatBuffer => [Bytes] |
Code Block |
SubscriptionFlatBuffer => Url ConfigOffset ConnectAssignmentInt16 Url => [String] ConfigOffset => Int64 AssignedConnectorsAllocation => [Connectors and Tasks] Connector => [StringByte] (optional field, default null) Allocation contains Tasksa serialized Assignment type, representing the current assignment of => [Int32]the worker. |
Assignment (Leader → Member):
Code Block |
---|
Assignment => Version AssignmentFlatBuffer Error Leader LeaderUrl VersionAssignment Revoked ScheduledDelay Version => Int16 AssignmentFlatBuffer => [Bytes] |
Code Block |
AssignmentFlatBuffer =>Int16 Error Leader LeaderUrl ScheduledRebalanceDelay AssignedConnectors RevokedConnectors Error => Int16 Leader => [String] LeaderUrl LeaderUrl => [String] Assignment ScheduledRebalanceDelay => Int32 AssignedConnectors => [Connector and TasksByte] Connector => [String] Tasks => [int32] RevokedConnectorsRevoked => [Connector and Tasks]Byte] (optional field, default null) Connector => [String] Tasks => [Int32] ] ScheduledDelay => Int32 (optional field, default 0) |
Configuration Properties
Along with the changes in Connect's protocol format, the addition of the following configuration properties is proposed:
scheduled.rebalance.max.delay.ms
Type: Int32
Default: 300000 (5min)
This is a delay that the leader may set to tolerate departures of workers from the group by allowing a transient imbalance connector and task assignments. During this delay a worker has the opportunity to return to the group and get reassigned the same or similar amount of work as before. This property corresponds to the maximum delay that the Leader will leader may set in a single assignment. The actual delay used by the Leader can actual delay used by the leader to hold off redistribution of connectors and tasks and maintain imbalance may be less or equal to this value.connect.protocol
Type: Enum
Values:eager
,compatible
, cooperative
Default: eagercompatible
This property defines which Connect protocol is enabled.eager
corresponds to current functionality the initial non-cooperative protocol that resolves imbalance with an immediate redistribution of connectors and tasks (version 0).compatible
corresponds to both eager (protocol version 0) and incremental cooperative (protocol version 1 or higher) protocols being enabled with Incremental Cooperative Connect the incremental cooperative protocol being preferred if both are supported (version 1 or version 0).cooperative
cmeans that only Incremental Cooperative Connect protocol is enabled both are supported (version 1 or higherversion 0).
Compatibility, Deprecation, and Migration Plan
...
Migration of Connect Workers to the new version of the Connect protocol is supported without down time. In order to perform live migration a two phase rolling bounce process should be followedis preferred as follows:
Bounce each Worker one-by-one after setting:
When all Workers are up and running with the property as set above in {{compat}} mode, repeat a rolling bounce round after setting on each Worker:Code Block language java connect.protocol = compatible
To downgrade your cluster to use protocol version 0 from version 1 or higher with
rebalancing policy what is required is to switch one of the workers back to eager
eager
mode.
Code Block | ||
---|---|---|
| ||
connect.protocol = |
...
eager |
Once this worker joins, the group will downgrade to protocol version 0 and eager
rebalancing policy, with immediately release of resources upon joining the group. This process will require a one-time double rebalancing, with the leader detecting the downgrade and first sending a downgraded assignment with empty assigned connectors and tasks and from then on just regular downgraded assignments.
Test Plan
- Parameterize existing unit tests to test all Connect protocols and compatibility modes.
- Add additional unit tests covering the new Connect protocol for Incremental Cooperative Rebalancing.
- Write the first integration tests for Connect protocols using the integration test framework for Kafka Connect: https://issues.apache.org/jira/browse/KAFKA-7503.
- Write system tests that exercise different scenarios.
...
Sizing a Connect cluster as well as selecting the number and type of connectors that run on it should be an architectural decision driven by business logic and not by system constraints as much as possible. Connect is a lightweight framework with a mission to be scalable and relieve operators from the burden of running individual services for every integration of Kafka with third-party systems. Connect's scalability has been demonstrated on numerous production deployments and this KIP aims to alleviate a pain point in Connect's scalability that might occur more frequently under the situations described in the motivation section. Therefore, seems preferable to address this gap rather than depend on ad-hoc workarounds and an exclusive dependence on external tooling.
Extend the existing Connect protocol format by appending additional required fields at the end of current version's format
Use a more versatile serialization format for the new and subsequent Connect protocols.
Initially, this KIP included a suggestion to express the new protocol format for Connect on the serialization format defined by flatbuffers. This suggestion was made to set up the protocol for more flexible upgrades in the future. However, after discussion on the Kafka developer's mailing list, it became apparent that introducing such a dependency with this KIP did not seem absolutely necessary at this point. Since this suggestion was orthogonal to the - already significant - changes brought up by this proposal, consideration of a different serialization format for the Connect protocol was postponed for future versions, and only if that's deemed necessary then. Admittedly, the changes in the protocol fields are not very frequent, with version 1 aiming to succeed version 0 after more than 3 years since its initial release. The current proposal adds a number of non-trivial fields of variable size. At the same time, it's improbable that this is the final iteration on a protocol that enriches Connect's functionality in a powerful way. In anticipation of future extensions and in order to avoid rediscovering the wheel by implementing a general purpose serialization scheme with the current constructs, this KIP suggests that, starting with the next version, Connect will use a lightweight, performant and backwards compatible serialization format that will replace the format used in version 0.
Reduce number of rebalances and differentiate between initial and successive rebalances
The current proposal aims to offer a scalable and easy to configure and use solution to Connect's Worker rebalancing pain points under a wide range of use cases. Without ruling out the existence of use cases that would appreciate a more fine grain control over specific phases of the Worker rebalance process (e.g. avoid a redundant rebalance in some cases or use a different delay for initial vs successive group joins), this KIP errs on the side of retaining Connect's simplicity in terms of implementation and configuration demands, while addressing the majority of the use cases under which the current rebalance protocol is showing its limitations.