Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder discussionAdopted.

Discussion threadhere

Vote thread: Not started yet. here

JIRAKAFKA-5505

Released: N/A AK 2.3.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Offering a first improvement to Connect's behavior under the above scenarios is the primary motivation of this KIP. 

Proposed Changes

Out of the policies that are listed in Incremental Cooperative Rebalancing, this KIP is proposing an implementation that applies deferred resolution of imbalance (Design II in Incremental Cooperative Rebalancing). This policies policy appears to strike a good balance for a first implementation of Incremental Cooperative Connect protocol, since it aims to help all the aforementioned use cases and keeps imbalance resolution simple across consecutive rebalance rounds. The necessary changes to Connect's rebalance protocol are described in the rest of the section. 

Changes to Connect's Rebalancing Process

In Connect, processing of rebalancing procedures happens independently and asynchronously of the Worker threads that run tasks and connectors. In order to drive group membership and coordinate rebalancing, the threads of the DistributedHerder are used instead. This decoupling of responsibilities facilitates the main proposition of this KIP, which is to change when revocation of resources (connectors and tasks) is happening in a Connect cluster. Following the general approach described in Incremental Cooperative Rebalancing, Connect Workers should now be allowed to keep running connectors and tasks until these resources are explicitly revoked by the Leader of the group. Briefly this leads to the following changes: 

It's worth mentioning here that, compared to other recent approaches that aim to alleviate the symptoms of an expensive rebalancing process basically by avoiding rebalancing altogether (for example see KIP-345), this KIP attempts to make rebalancing significantly lighter, even if that means that, in some cases, the number of actual rebalances increases. Connect provides a good platform for a first implementation of Incremental Cooperative Rebalancing and its reassignment heuristics, because connectors and tasks traditionally do not depend on local state that has to be restored upon restart. The changes necessary to implement an incremental and cooperative approach to Connect's rebalance protocol are described in the rest of the section. 

Changes to Connect's Rebalancing Process

In Connect, processing of rebalancing procedures happens independently and asynchronously of the Worker threads that run tasks and connectors. In order to drive group membership and coordinate rebalancing, the threads of the DistributedHerder are used instead. This decoupling of responsibilities facilitates the main proposition of this KIP, which is to change when revocation of resources (connectors and tasks) is happening in a Connect cluster. Following the general approach described in Incremental Cooperative Rebalancing, Connect Workers should now be allowed to keep running connectors and tasks until these resources are explicitly revoked by the Leader of the group. Briefly this leads to the following changes: 

  • When a Worker is prepared to join the group, When a Worker is prepared to join the group, it does not revoke (stop) any connectors or tasks that it currently runs (previously a Worker would stop all connectors and tasks in preparation of joining its group).
  • When a Worker sends its metadata as part of the join request, it includes its current assignment (previously the metadata included in a join request did not include any assignment information)
  • When a Worker is elected as Leader, it computes a new assignment, describing both assigned and revoked connectors and tasks (previously the Leader computed an assignment from scratch without defining revoked resources). 
  • When a Worker receives its assignment, if this assignment includes any revoked connectors or tasks, it stops (releases) these resources and then immediately rejoins the group with an assignment that excludes revoked resources (previously, upon receipt of assignment, the Worker started the connectors and tasks and operated in the new generation of the group protocol until the next rebalance some time in the future). 
  • Normally in the next assignment round, the Leader will assign resources according to its policy and there will be no revoked resources in any of the Workers. If that's not the case, the previous steps will be repeated until the group converges into an assignment without revocations.
  • When a Worker receives assignments without any revocations, it starts the assigned connector and tasks, and defers rejoining the group for time equal to the amount of time included in the ScheduledRebalanceDelay field of the assignment included in the sync response. 

Overall, as far as the rebalancing workflow is concerned, implementation of Incremental Cooperative Rebalancing does not change the definition of events that might trigger a rebalance. Rebalance can still be triggered by configuration updates, a Worker joining the group, or a Worker leaving the group - intentionally or due to a failure. 

Embedded Protocol Serialization

The current KIP is suggesting a significant change in the format of subscription and assignment data that define Connect's embedded protocol on top of Kafka's group management protocol. The enhancement proposed here is yet another demonstration of how versatile and powerful the definition of an embedded protocol can be. By adding the notion of revoked assignment, Connect Workers can coordinate to achieve incremental rebalancing. In the future, further enhancements in the protocol could allow Connect to apply more sophisticated assignment policies. 

However, the inclusion of a new nested field (revoked assignment) and a single field (delay) in the assignment request, along with the inclusion of assigned connectors and tasks as a nested field in the subscription request, show that extending the Connect protocol in a compatible way can be challenging. In the section Compatibility, Deprecation, and Migration Plan, we describe a process that allows for the live migration of Connect Workers to the new Connect protocol version. However, future extensions could benefit significantly from the definition of a more permissive and backwards compatible serialization format for the embedded protocol. Such benefits can be seen during implementation, when code needs to change only slightly to account for additional or missing fields, as well as during runtime, with protocols being able to get upgraded during shorter and less coordinated rolling bounces of the deployed instances (here the Connect Workers). 

To achieve a good balance between simplicity and extensibility, the current KIP is making the suggestion to use flatbuffers as the serialization format of the new and future versions of the Connect protocol. The main reasons for selecting this particular serialization format are: 

  • Ability to select only specific fields of interest without unpacking all the data
  • Unused fields are omitted entirely
  • Non-present fields can be detected easily and code can easily plan for their absence. 
  • Good evolution/compatibility story overall.
  • Good performance
  • Easy to process definition. Simple parsers. Wide language coverage. 

Fencing based on Rebalancing

Until now the process of rebalancing has also implied a global synchronization barrier among the members of the group. With respect to the shared resources among the group, this barrier imposed a happens-before relationship between the release and the acquisition of a resource. In Connect this simply means that a task had to stop execution on a specific worker and save its progress, before starting execution on another worker after a rebalance. This is a necessary property for resources and is still guaranteed under this proposal. However, instead of the rebalance being a flat global synchronization point for all the connectors and tasks, the happens-before relationships are enforced only on tasks that are revoked and subsequently reassigned, whenever this is required. This optimization, that allows resolution of the stop-the-world effect (as it's also known in the current form of rebalancing), is allowed because the barrier imposed by rebalancing is implicit, it concerns the Connect workers and is not exposed in Connect's API. Connect tasks run independently and when a task stops and resumes progress it is not required that all tasks of a connector do the same. Of course, incremental cooperative rebalancing can be applied not only on individual tasks but also in a group of tasks, such as the tasks of a specific connector. However, in the changes proposed in this KIP, each task remains independent and is scheduled (assigned/revoked/reassigned) independently of other tasks. 

Embedded Protocol Serialization

The new Connect protocol that will introduce incremental cooperative rebalancing will extend the existing protocol by appending its new fields as optional fields on the existing format. This is a backwards compatible and easy to integrate way to have both protocols handled and live in the same code base for as much as it is necessary. Introducing a different serialization format was also considered initially (read below under Rejected Alternatives a short summary). To read how the existing protocol is extended to support incremental cooperative rebalancing read below the section on Public InterfacesThe above points are not meant to serve as a general and exhaustive comparison between serialization protocols. Every software has its own requirements and idiosyncrasies in terms of data representation and such characteristics can lead to different format selection per use case. These points are presented here to demonstrate that flatbuffers is a sufficient candidate for Connect's embedded protocol and could be used to describe other embedded protocols in the future

Deferred Resolution of Imbalance under Different Rebalancing Scenarios 

...

Code Block
languagetext
titleNon-first new member joins
Initial group and assignment: W1([AC0, AT1, AT2, BC0, BT1])
Config topic contains: AC0, AT1, AT2, BC0, BT1
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered 
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, BC0, BT1]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1], revoked: [AT2, BC0, BT1])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])
W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1]
Rebalance is triggered
W2 joins with assignment: []
W3 joins with assignment: []
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1], revoked: [])
W2(delay: 0, assigned: [AT2, BC0], revoked: [])
W3(delay: 0, assigned: [BT1], revoked: [])

...

Code Block
languagetext
titleWorker leaves
Initial group and assignment: W1([AC0, AT1]), W2([AT2, BC0]), W3([BT1])
Config topic contains: AC0, AT1, AT2, BC0, BT1
W1 is current leader
W2 leaves
Rebalance is triggered 
W1 joins with assignment: [AC0, AT1]
W3 joins with assignment: [BT1]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: d, assigned: [AC0, AT1], revoked: [])
W3(delay: d, assigned: [BT1], revoked: [])
After delay d:
W1 joins with assignment: [AC0, AT1]
W3 joins with assignment: [BT1]
Rebalance is triggered
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, BC0], revoked: [])
W3(delay: 0, assigned: [BT1, AT2], revoked: [])

...

Code Block
languagetext
titleWorker bounces
Initial group and assignment: W1([AC0, AT1]), W2([AT2, BC0]), W3([BT1])
Config topic contains: AC0, AT1, AT2, BC0, BT1
W1 is current leader
W2 leaves
Rebalance is triggered 
W1 joins with assignment: [AC0, AT1]
W3 joins with assignment: [BT1]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: d, assigned: [AC0, AT1], revoked: [])
W3(delay: d, assigned: [BT1], revoked: [])
Before delay d expires:
W2 joins with assignment: []
Rebalance is triggered
W1 becomes leader
W1 computes and sends assignments:
W1(delay: d', assigned: [AC0, AT1], revoked: [])
W2(delay: d', assigned: [], revoked: [])
W3(delay: d', assigned: [BT1], revoked: [])
d' is the remaining delay
After delay d':
W1 joins with assignment: [AC0, AT1]
W2 joins with assignment: []
W3 joins with assignment: [BT1]
Rebalance is triggered
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1], revoked: [])
W2(delay: 0, assigned: [AT2, BC0], revoked: [])
W3(delay: 0, assigned: [BT1], revoked: [])

...

Code Block
languagetext
titleLeader leaves
Initial group and assignment: W1([AC0, AT1]), W2([AT2, BC0]), W3([BT1])
Config topic contains: AC0, AT1, AT2, BC0, BT1
W1 is current leader
W1, which is the leader, leaves
Rebalance is triggered 
W2 joins with assignment: [AT2, BC0]
W3 joins with assignment: [BT1]
W3 becomes leader. 
No delay is in progress.
W3 computes and sends assignments:
W2(delay: 0, assigned: [AT2, BC0, AC0], revoked: [])
W3(delay: 0, assigned: [BT1, AT1], revoked: [])

...

Code Block
languagetext
titleLeader bounces
Initial group and assignment: W1([AC0, AT1]), W2([AT2, BC0]), W3([BT1])
Config topic contains: AC0, AT1, AT2, BC0, BT1
W1 is current leader
W2 leaves
Rebalance is triggered 
W1 joins with assignment: [AC0, AT1]
W3 joins with assignment: [BT1]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: d, assigned: [AC0, AT1], revoked: [])
W3(delay: d, assigned: [BT1], revoked: [])
Before delay d expires:
W2 joins with assignment: []
Rebalance is triggered
W1 becomes leader
W1 computes and sends assignments:
W1(delay: d', assigned: [AC0, AT1], revoked: [])
W2(delay: d', assigned: [], revoked: [])
W3(delay: d', assigned: [BT1], revoked: [])
d' is the remaining delay
W1, which is the leader, leaves
Rebalance is triggered 
W2 joins with assignment: []
W3 joins with assignment: [BT1]
W3 becomes leader. 
There's an active delay in progress.
W3 computes and sends assignments:
W2(delay: d'', assigned: [], revoked: [])
W3(delay: d'', assigned: [BT1, AT1], revoked: [])
d'' is the remaining delay
Before delay d'' expires:
W1 joins with assignment: []
Rebalance is triggered
W3 becomes leader
W3 computes and sends assignments:
W1(delay: d''', assigned: [], revoked: [])
W2(delay: d''', assigned: [], revoked: [])
W3(delay: d''', assigned: [BT1], revoked: [])
After delay d''':
W1 joins with assignment: []
W2 joins with assignment: []
W3 joins with assignment: [BT1]
Rebalance is triggered
W3 becomes leader
W3 computes and sends assignments:
W1(delay: 0, assigned: [BC0, AT2], revoked: [])
W2(delay: 0, assigned: [AC0, AT1], revoked: [])
W3(delay: 0, assigned: [BT1], revoked: [])

...

Subscription (Member → Leader):

Code Block
 Subscription => Version SubscriptionFlatBuffer
   Version      Subscription => Version ConfigOffset Allocation
   Version         => Int16
   SubscriptionFlatBuffer  => [Bytes]
Code Block
 SubscriptionFlatBuffer => Url ConfigOffset ConnectAssignmentInt16
   Url	                   => [String]
   ConfigOffset            => Int64
   AssignedConnectorsAllocation      => [Connectors and Tasks]
     Connector     => [StringByte]
 (optional field, default null)



Allocation contains Tasksa serialized Assignment type, representing the current assignment of => [Int32]the worker.


Assignment (Leader → Member):

Code Block
 Assignment => Version AssignmentFlatBuffer
Error Leader LeaderUrl VersionAssignment Revoked ScheduledDelay
   Version            => Int16
   AssignmentFlatBuffer    => [Bytes]
Code Block
 AssignmentFlatBuffer =>Int16
   Error Leader LeaderUrl ScheduledRebalanceDelay AssignedConnectors RevokedConnectors
   Error                        => Int16
   Leader                       => [String]
   LeaderUrl    
   LeaderUrl             => [String]
   Assignment  ScheduledRebalanceDelay	=> Int32
   AssignedConnectors 		=> [Connector and TasksByte]
     Connector              => [String]
     Tasks                  => [int32]
   RevokedConnectorsRevoked			 		=> [Connector and Tasks]Byte] (optional field, default null)
     Connector              => [String] 
     Tasks                  => [Int32]
   ]

   ScheduledDelay		=> Int32 (optional field, default 0)

Configuration Properties

Along with the changes in Connect's protocol format, the addition of the following configuration properties is proposed:

  • scheduled.rebalance.max.delay.ms
    Type: Int32
    Default: 300000 (5min)
    This is a delay that the leader may set to tolerate departures of workers from the group by allowing a transient imbalance connector and task assignments. During this delay a worker has the opportunity to return to the group and get reassigned the same or similar amount of work as before. This property corresponds to the maximum delay that the Leader will leader may set in a single assignment. The actual delay used by the Leader can actual delay used by the leader to hold off redistribution of connectors and tasks and maintain imbalance may be less or equal to this value.

  • connect.protocol 
    Type: Enum 
    Values: eager, compatible, cooperative
    Default: eager compatible
    This property defines which Connect protocol is enabled.
    • eager corresponds to current functionality the initial non-cooperative protocol that resolves imbalance with an immediate redistribution of connectors and tasks (version 0). 
    • compatible corresponds to both eager (protocol version 0) and incremental cooperative (protocol version 1 or higher) protocols being enabled with Incremental Cooperative Connect the incremental cooperative protocol being preferred if both are supported (version 1 or version 0).cooperative cmeans that only Incremental Cooperative Connect protocol is enabled both are supported (version 1 or higherversion 0).

Compatibility, Deprecation, and Migration Plan

...

Migration of Connect Workers to the new version of the Connect protocol is supported without down time. In order to perform live migration a two phase rolling bounce process should be followedis preferred as follows:

  • Bounce each Worker one-by-one after setting:

    Code Block
    languagejava
    connect.protocol = compatible
    When all Workers are up and running with the property as set above in {{compat}} mode, repeat a rolling bounce round after setting on each Worker:


To downgrade your cluster to use protocol version 0 from version 1 or higher with eager rebalancing policy what is required is to switch one of the workers back to eager mode. 


Code Block
languagejava
connect.protocol = 

...

eager

Once this worker joins, the group will downgrade to protocol version 0 and eager rebalancing policy, with immediately release of resources upon joining the group. This process will require a one-time double rebalancing, with the leader detecting the downgrade and first sending a downgraded assignment with empty assigned connectors and tasks and from then on just regular downgraded assignments. 

Test Plan

  • Parameterize existing unit tests to test all Connect protocols and compatibility modes. 
  • Add additional unit tests covering the new Connect protocol for Incremental Cooperative Rebalancing.
  • Write the first integration tests for Connect protocols using the integration test framework for Kafka Connect: https://issues.apache.org/jira/browse/KAFKA-7503
  • Write system tests that exercise different scenarios.

...

Sizing a Connect cluster as well as selecting the number and type of connectors that run on it should be an architectural decision driven by business logic and not by system constraints as much as possible. Connect is a lightweight framework with a mission to be scalable and relieve operators from the burden of running individual services for every integration of Kafka with third-party systems. Connect's scalability has been demonstrated on numerous production deployments and this KIP aims to alleviate a pain point in Connect's scalability that might occur more frequently under the situations described in the motivation section. Therefore, seems preferable to address this gap rather than depend on ad-hoc workarounds and an exclusive dependence on external tooling. 

Extend the existing Connect protocol format by appending additional required fields at the end of current version's format

Use a more versatile serialization format for the new and subsequent Connect protocols.

Initially, this KIP included a suggestion to express the new protocol format for Connect on the serialization format defined by flatbuffers. This suggestion was made to set up the protocol for more flexible upgrades in the future. However, after discussion on the Kafka developer's mailing list, it became apparent that introducing such a dependency with this KIP did not seem absolutely necessary at this point. Since this suggestion was orthogonal to the - already significant - changes brought up by this proposal, consideration of a different serialization format for the Connect protocol was postponed for future versions, and only if that's deemed necessary then. Admittedly, the changes in the protocol fields are not very frequent, with version 1 aiming to succeed version 0 after more than 3 years since its initial release. The current proposal adds a number of non-trivial fields of variable size. At the same time, it's improbable that this is the final iteration on a protocol that enriches Connect's functionality in a powerful way. In anticipation of future extensions and in order to avoid rediscovering the wheel by implementing a general purpose serialization scheme with the current constructs, this KIP suggests that, starting with the next version, Connect will use a lightweight, performant and backwards compatible serialization format that will replace the format used in version 0.

Reduce number of rebalances and differentiate between initial and successive rebalances

The current proposal aims to offer a scalable and easy to configure and use solution to Connect's Worker rebalancing pain points under a wide range of use cases. Without ruling out the existence of use cases that would appreciate a more fine grain control over specific phases of the Worker rebalance process (e.g. avoid a redundant rebalance in some cases or use a different delay for initial vs successive group joins), this KIP errs on the side of retaining Connect's simplicity in terms of implementation and configuration demands, while addressing the majority of the use cases under which the current rebalance protocol is showing its limitations.