Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder discussionAdopted.

Discussion threadhere

Vote thread: Not started yet. here

JIRAKAFKA-5505

Released: N/A AK 2.3.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Overall, as far as the rebalancing workflow is concerned, implementation of Incremental Cooperative Rebalancing does not change the definition of events that might trigger a rebalance. Rebalance can still be triggered by configuration updates, a Worker joining the group, or a Worker leaving the group - intentionally or due to a failure. 

Embedded Protocol Serialization

The current KIP is suggesting a significant change in the format of subscription and assignment data that define Connect's embedded protocol on top of Kafka's group management protocol. The enhancement proposed here is yet another demonstration of how versatile and powerful the definition of an embedded protocol can be. By adding the notion of revoked assignment, Connect Workers can coordinate to achieve incremental rebalancing. In the future, further enhancements in the protocol could allow Connect to apply more sophisticated assignment policies. 

However, the inclusion of a new nested field (revoked assignment) and a single field (delay) in the assignment request, along with the inclusion of assigned connectors and tasks as a nested field in the subscription request, show that extending the Connect protocol in a compatible way can be challenging. In the section Compatibility, Deprecation, and Migration Plan, we describe a process that allows for the live migration of Connect Workers to the new Connect protocol version. However, future extensions could benefit significantly from the definition of a more permissive and backwards compatible serialization format for the embedded protocol. Such benefits can be seen during implementation, when code needs to change only slightly to account for additional or missing fields, as well as during runtime, with protocols being able to get upgraded during shorter and less coordinated rolling bounces of the deployed instances (here the Connect Workers). 

To achieve a good balance between simplicity and extensibility, the current KIP is making the suggestion to use flatbuffers as the serialization format of the new and future versions of the Connect protocol. The main reasons for selecting this particular serialization format are: 

  • Ability to select only specific fields of interest without unpacking all the data
  • Unused fields are omitted entirely
  • Non-present fields can be detected easily and code can easily plan for their absence. 
  • Good evolution/compatibility story overall.
  • Good performance
  • Easy to process definition. Simple parsers. Wide language coverage. 

Fencing based on Rebalancing

Until now the process of rebalancing has also implied a global synchronization barrier among the members of the group. With respect to the shared resources among the group, this barrier imposed a happens-before relationship between the release and the acquisition of a resource. In Connect this simply means that a task had to stop execution on a specific worker and save its progress, before starting execution on another worker after a rebalance. This is a necessary property for resources and is still guaranteed under this proposal. However, instead of the rebalance being a flat global synchronization point for all the connectors and tasks, the happens-before relationships are enforced only on tasks that are revoked and subsequently reassigned, whenever this is required. This optimization, that allows resolution of the stop-the-world effect (as it's also known in the current form of rebalancing), is allowed because the barrier imposed by rebalancing is implicit, it concerns the Connect workers and is not exposed in Connect's API. Connect tasks run independently and when a task stops and resumes progress it is not required that all tasks of a connector do the same. Of course, incremental cooperative rebalancing can be applied not only on individual tasks but also in a group of tasks, such as the tasks of a specific connector. However, in the changes proposed in this KIP, each task remains independent and is scheduled (assigned/revoked/reassigned) independently of other tasks. 

Embedded Protocol Serialization

The new Connect protocol that will introduce incremental cooperative rebalancing will extend the existing protocol by appending its new fields as optional fields on the existing format. This is a backwards compatible and easy to integrate way to have both protocols handled and live in the same code base for as much as it is necessary. Introducing a different serialization format was also considered initially (read below under Rejected Alternatives a short summary). To read how the existing protocol is extended to support incremental cooperative rebalancing read below the section on Public InterfacesThe above points are not meant to serve as a general and exhaustive comparison between serialization protocols. Every software has its own requirements and idiosyncrasies in terms of data representation and such characteristics can lead to different format selection per use case. These points are presented here to demonstrate that flatbuffers is a sufficient candidate for Connect's embedded protocol and could be used to describe other embedded protocols in the future

Deferred Resolution of Imbalance under Different Rebalancing Scenarios 

...

Connect protocol's format

Subscription (Member → Leader):

...

→ Leader):

Code Block
 SubscriptionFlatBufferSubscription => UrlVersion ConfigOffset ConnectAssignmentAllocation
   Url	Version                   => [String]Int16
   ConfigOffset            => Int64
   AssignedConnectorsUrl	       => [Connectors and Tasks]
     Connector     => [String]
   ConfigOffset  Tasks         => [Int32]

Assignment (Leader → Member):

Code Block
 Assignment      => Version AssignmentFlatBufferInt64
   VersionAllocation              => [Byte]  => Int16
   AssignmentFlatBuffer    => [Bytes]
(optional field, default null)



Allocation contains a serialized Assignment type, representing the current assignment of the worker.


Assignment (Leader → Member):

Code Block
 AssignmentFlatBufferAssignment => Version Error Leader LeaderUrl ScheduledRebalanceDelayAssignment AssignedConnectorsRevoked RevokedConnectorsScheduledDelay
   Version  Error            => Int16
   Error        => Int16
   Leader    => Int16
   Leader               => [String]
   LeaderUrl                    => [String]
   Assignment ScheduledRebalanceDelay	=> Int32
     AssignedConnectors 		=> [Connector and TasksByte]
     Connector              => [String]
     Tasks                  => [int32]
   RevokedConnectorsRevoked			 		=> [Connector and Tasks]
Byte] (optional field, default null)
     Connector     Connector         => [String] 
     Tasks                  => [Int32]

   ScheduledDelay		=> Int32 (optional field, default 0)

Configuration Properties

Along with the changes in Connect's protocol format, the addition of the following configuration properties is proposed:

  • scheduled.rebalance.max.delay.ms
    Type: Int32
    Default: 300000 (5min)
    This is a delay that the leader may set to tolerate departures of workers from the group by allowing a transient imbalance connector and task assignments. During this delay a worker has the opportunity to return to the group and get reassigned the same or similar amount of work as before. This property corresponds to the maximum delay that the leader may set in a single assignment. The actual delay used by the leader to hold off redistribution of connectors and tasks and maintain imbalance may be less or equal to this value.

  • connect.protocol 
    Type: Enum 
    Values: eager, compatible, cooperative
    Default: eager compatible
    This property defines which Connect protocol is enabled.
    • eager corresponds to the initial non-cooperative protocol that resolves imbalance with an immediate redistribution of connectors and tasks (version 0).
    • compatible corresponds to both eager and cooperative protocols being enabled with the (protocol version 0) and incremental cooperative (protocol being preferred if both are supported ( version 1 or version 0).cooperative means that only an incremental cooperative protocol is enabled that tolerates imbalances in connectors and tasks to a certain maximum delay higher) protocols being enabled with the incremental cooperative protocol being preferred if both are supported (version 1 or higherversion 0).

Compatibility, Deprecation, and Migration Plan

...

Migration of Connect Workers to the new version of the Connect protocol is supported without down time. In order to perform live migration a two phase rolling bounce process should be followedis preferred as follows:

  • Bounce each Worker one-by-one after setting:

    Code Block
    languagejava
    connect.protocol = compatible
    When all Workers are up and running with the property as set above in {{compat}} mode, repeat a rolling bounce round after setting on each Worker:


To downgrade your cluster to use protocol version 0 from version 1 or higher with eager rebalancing policy what is required is to switch one of the workers back to eager mode. 


Code Block
languagejava
connect.protocol = 

...

eager

Once this worker joins, the group will downgrade to protocol version 0 and eager rebalancing policy, with immediately release of resources upon joining the group. This process will require a one-time double rebalancing, with the leader detecting the downgrade and first sending a downgraded assignment with empty assigned connectors and tasks and from then on just regular downgraded assignments. 

Test Plan

  • Parameterize existing unit tests to test all Connect protocols and compatibility modes. 
  • Add additional unit tests covering the new Connect protocol for Incremental Cooperative Rebalancing.
  • Write the first integration tests for Connect protocols using the integration test framework for Kafka Connect: https://issues.apache.org/jira/browse/KAFKA-7503
  • Write system tests that exercise different scenarios.

...

Sizing a Connect cluster as well as selecting the number and type of connectors that run on it should be an architectural decision driven by business logic and not by system constraints as much as possible. Connect is a lightweight framework with a mission to be scalable and relieve operators from the burden of running individual services for every integration of Kafka with third-party systems. Connect's scalability has been demonstrated on numerous production deployments and this KIP aims to alleviate a pain point in Connect's scalability that might occur more frequently under the situations described in the motivation section. Therefore, seems preferable to address this gap rather than depend on ad-hoc workarounds and an exclusive dependence on external tooling. 

Extend the existing Connect protocol format by appending additional required fields at the end of current version's format

Use a more versatile serialization format for the new and subsequent Connect protocols.

Initially, this KIP included a suggestion to express the new protocol format for Connect on the serialization format defined by flatbuffers. This suggestion was made to set up the protocol for more flexible upgrades in the future. However, after discussion on the Kafka developer's mailing list, it became apparent that introducing such a dependency with this KIP did not seem absolutely necessary at this point. Since this suggestion was orthogonal to the - already significant - changes brought up by this proposal, consideration of a different serialization format for the Connect protocol was postponed for future versions, and only if that's deemed necessary then. Admittedly, the changes in the protocol fields are not very frequent, with version 1 aiming to succeed version 0 after more than 3 years since its initial release. The current proposal adds a number of non-trivial fields of variable size. At the same time, it's improbable that this is the final iteration on a protocol that enriches Connect's functionality in a powerful way. In anticipation of future extensions and in order to avoid rediscovering the wheel by implementing a general purpose serialization scheme with the current constructs, this KIP suggests that, starting with the next version, Connect will use a lightweight, performant and backwards compatible serialization format that will replace the format used in version 0.

Reduce number of rebalances and differentiate between initial and successive rebalances

The current proposal aims to offer a scalable and easy to configure and use solution to Connect's Worker rebalancing pain points under a wide range of use cases. Without ruling out the existence of use cases that would appreciate a more fine grain control over specific phases of the Worker rebalance process (e.g. avoid a redundant rebalance in some cases or use a different delay for initial vs successive group joins), this KIP errs on the side of retaining Connect's simplicity in terms of implementation and configuration demands, while addressing the majority of the use cases under which the current rebalance protocol is showing its limitations.