Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Embedded Protocol Serialization

The current KIP is suggesting a significant change in the format of subscription and assignment data that define Connect's embedded protocol on top of Kafka's group management protocol. The enhancement proposed here is yet another demonstration of how versatile and powerful the definition of an embedded protocol can be. By adding the notion of revoked assignment, Connect Workers can coordinate to achieve incremental rebalancing. In the future, further enhancements in the protocol could allow Connect to apply more sophisticated assignment policies. 

However, the inclusion of a new nested field (revoked assignment) and a single field (delay) in the assignment request, along with the inclusion of assigned connectors and tasks as a nested field in the subscription request, show that extending the Connect protocol in a compatible way can be challenging. In the section Compatibility, Deprecation, and Migration Plan, we describe a process that allows for the live migration of Connect Workers to the new Connect protocol version. However, future extensions could benefit significantly from the definition of a more permissive and backwards compatible serialization format for the embedded protocol. Such benefits can be seen during implementation, when code needs to change only slightly to account for additional or missing fields, as well as during runtime, with protocols being able to get upgraded during shorter and less coordinated rolling bounces of the deployed instances (here the Connect Workers). 

To achieve a good balance between simplicity and extensibility, the current KIP is making the suggestion to use flatbuffers as the serialization format of the new and future versions of the Connect protocol. The main reasons for selecting this particular serialization format are: 

  • Ability to select only specific fields of interest without unpacking all the data
  • Unused fields are omitted entirely
  • Non-present fields can be detected easily and code can easily plan for their absence. 
  • Good evolution/compatibility story overall.
  • Good performance
  • Easy to process definition. Simple parsers. Wide language coverage. 

The above points are not meant to serve as a general and exhaustive comparison between serialization protocols. Every software has its own requirements and idiosyncrasies in terms of data representation and such characteristics can lead to different format selection per use case. These points are presented here to demonstrate that flatbuffers is a sufficient candidate for Connect's embedded protocol and could be used to describe other embedded protocols in the futurenew Connect protocol that will introduce incremental cooperative rebalancing will extend the existing protocol by appending its new fields as optional fields on the existing format. This is a backwards compatible and easy to integrate way to have both protocols handled and live in the same code base for as much as it is necessary. Introducing a different serialization format was also considered initially (read below under Rejected Alternatives a short summary). To read how the existing protocol is extended to support incremental cooperative rebalancing read below the section on Public Interfaces

Deferred Resolution of Imbalance under Different Rebalancing Scenarios 

...

Subscription (Member → Leader):

Code Block
 Subscription => Version SubscriptionFlatBuffer
   Version      Subscription => Version ConfigOffset Allocation
   Version         => Int16
   SubscriptionFlatBuffer  => [Bytes]
Code Block
 SubscriptionFlatBuffer => Url ConfigOffset ConnectAssignmentInt16
   Url	                   => [String]
   ConfigOffset            => Int64
   AssignedConnectors      => [Connectors and Tasks]
Allocation         Connector     => [StringByte]
 (optional field, default  Tasks         => [Int32]null)



Allocation contains a serialized Assignment type, representing the current assignment of the worker.


Assignment (Leader → Member):

Code Block
 Assignment => Version AssignmentFlatBuffer
Error Leader LeaderUrl VersionAssignment Revoked ScheduledDelay
   Version            => Int16
   AssignmentFlatBuffer    => [Bytes]
Code Block
 AssignmentFlatBuffer => Error Leader LeaderUrl ScheduledRebalanceDelay AssignedConnectors RevokedConnectorsInt16
   Error                        => Int16
   Leader                       => [String]
   LeaderUrl                    => [String]

   Assignment  ScheduledRebalanceDelay	=> Int32
   AssignedConnectors 		=> [Connector and TasksByte]
     Connector              => [String]
     Tasks                  => [int32]
   RevokedConnectorsRevoked			 		=> [Connector and Tasks]Byte] (optional field, default null)
     Connector              => [String] 
     Tasks                  => [Int32]

    ScheduledDelay		=> Int32 (optional field, default 0)

Configuration Properties

Along with the changes in Connect's protocol format, the addition of the following configuration properties is proposed:

...

Sizing a Connect cluster as well as selecting the number and type of connectors that run on it should be an architectural decision driven by business logic and not by system constraints as much as possible. Connect is a lightweight framework with a mission to be scalable and relieve operators from the burden of running individual services for every integration of Kafka with third-party systems. Connect's scalability has been demonstrated on numerous production deployments and this KIP aims to alleviate a pain point in Connect's scalability that might occur more frequently under the situations described in the motivation section. Therefore, seems preferable to address this gap rather than depend on ad-hoc workarounds and an exclusive dependence on external tooling. 

Extend the existing Connect protocol format by appending additional required fields at the end of current version's format

Use a more versatile serialization format for the new and subsequent Connect protocols.

Initially, this KIP included a suggestion to express the new protocol format for Connect on the serialization format defined by flatbuffers. This suggestion was made to set up the protocol for more flexible upgrades in the future. However, after discussion on the Kafka developer's mailing list, it became apparent that introducing such a dependency with this KIP did not seem absolutely necessary at this point. Since this suggestion was orthogonal to the - already significant - changes brought up by this proposal, consideration of a different serialization format for the Connect protocol was postponed for future versions, and only if that's deemed necessary then. Admittedly, the changes in the protocol fields are not very frequent, with version 1 aiming to succeed version 0 after more than 3 years since its initial release. The current proposal adds a number of non-trivial fields of variable size. At the same time, it's improbable that this is the final iteration on a protocol that enriches Connect's functionality in a powerful way. In anticipation of future extensions and in order to avoid rediscovering the wheel by implementing a general purpose serialization scheme with the current constructs, this KIP suggests that, starting with the next version, Connect will use a lightweight, performant and backwards compatible serialization format that will replace the format used in version 0.

Reduce number of rebalances and differentiate between initial and successive rebalances

The current proposal aims to offer a scalable and easy to configure and use solution to Connect's Worker rebalancing pain points under a wide range of use cases. Without ruling out the existence of use cases that would appreciate a more fine grain control over specific phases of the Worker rebalance process (e.g. avoid a redundant rebalance in some cases or use a different delay for initial vs successive group joins), this KIP errs on the side of retaining Connect's simplicity in terms of implementation and configuration demands, while addressing the majority of the use cases under which the current rebalance protocol is showing its limitations.