Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: COMING SOON [One of "Under Discussion", " Accepted", "Rejected"]

Discussion thread: here

Vote thread: hereCOMING SOON [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-12446

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

It is not necessary to review this as part of the KIP process but for those who are interested, here is an early pull request with the proposed changes.

Affected Public Interfaces

KGroupedTable interface 

As a result of the proposed change, all the methods exposed via the `KGroupedTable` interface (i.e.  aggregate , reduce , and count ) will no longer produce inconsistent states in cases where the grouping key has not changed, transient or otherwise, reflecting the improved semantics of a single "atomic" update. For example, consider the following simple topology: 

...

Users should implement `.equals` method for key

Another important part of the proposed logic is detecting if the key has changed. In order for us to be able to do this, we depend on a correct implementation of the .equals  method on the key, which users will need to implement. 

Note however that this is not a strict new requirement for users; if users fail to implement the .equals  method method, then they should still get generally get the old behaviour of sending the oldValue and the newValue as two separate messages to the repartition topic i.e. nothing breaks. In this way, our changes can be made considered backwards compatible with existing code where the key type does not implement the .equals  method method. There is one edge-case where this does not hold true, as follows. 

Since the default .equals implementation for an Object  is by reference, if a user's groupBy returns the same reference for the key, then the oldKey and the newKey will naturally .equals  each other. This will result in a single event being sent to the repartition topic. This change in behaviour should be considered a "bug-fix" rather than a "breaking change" as the semantics of the operation remain unchanged, the only thing that changes for users is they no longer see transient "inconsistent" states.  In the worst case, users in this situation will need to update any strict tests that check specifically for the presence of transient "inconsistent" states

Compatibility, Deprecation, and Migration Plan

...

  1. In the first rolling bounce, we replace the byte code (i.e. swap the jars), set the config upgrade.from="older version"  (possible values are "0.10.0" - "3.34"), and then bounce each instance to upgrade it.

    The upgrade.from="older version"  config will ensure we are still writing out only the old serialization format until all instances are on the new byte code, at which point we can be sure that all instances in the group will be able to successfully deserialize the new format if we were to start writing it. 
  2. The second rolling bounce is to simply remove the `upgradeupgrade.from="older version"`   config and bounce each instance for it to begin writing in the new serialization format. 

...