Status
Current state: "Under Discussion"
Discussion thread: TODO
JIRA: KAFKA-6054
Released: 1.2
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka Streams exchanges custom metadata during rebalance. This metadata has a magic version number encode to allow upgrade the metadata format. In the past we upgrade the metadata format once (in 0.10.1.0 release), but did not design a proper upgrade path. This implies, that upgrading from 0.10.0.x versions, is only possible if the whole application is shut down first (ie, rolling bounces don't work).
In 1.2, we plan to upgrade the metadata format again and thus need to fix KAFKA-6054. The current implementation forces us, to design an upgrade path with two rolling bounced. This also requires that users configure the application correctly during upgrade (see details below). This is cumbersome and error prone for the user. Thus, we also propose a simplified upgrade path with no requirements for users to configure Kafka Stream correctly and also allows for single rolling bounce upgrade.
Note, that this KIP is requirement for KIP-258 and KIP-262.
Public Interfaces
For the upgrade from 0.10.0.x, ..., 1.1.x version to 1.2 version, we need to add a new configuration parameter upgrade.from
that will be null
by default and can take the following values:
null
: no upgrade required (if a new application is started or upgrade was done already)"0.10.0.x"
,"0.10.1.x"
,"0.10.2.x"
,"0.11.0.x"
,"1.0.x"
, and"1.1.x"
: for upgrading from 0.10.0.x,..., 1.1.x to 1.2
Note, that the above proposal only fixes KAFKA-6054 in 1.2. If we want to have fixes for versions 0.10.1.x, ...,1.1.x for KAFKA-6054, we would need to back port only one required values:
null
: for no upgrade required"0.10.0.x"
: for upgrading from 0.10.0.x to any version of 0.10.1.x, ..., 1.1.x
Proposed Changes
We add the above config for upgrading from 0.10.0.x, ..., 1.1.x to 1.2 and future releases. We describe the upgrade path in detail below. To enable single rolling bounce upgrade with no required configuration of the applications, we also add a second magic byte encoding "supported version" into the rebalance metadata that allows us to implement a "version probing" step. If we want to change the metadata in pre-1.2 releases again, this "version probing" step allows for single rolling bounce upgrade (details below).
Note, that the currently used rebalance metadata version are 1 (0.10.0.x) and 2 (0.10.1.x, ..., 1.1.x). We increase the metadata version numer to 3 in 1.2 release.
Upgrading to 1.2:
- prepare a jar hot swap from old version to 1.2; Kafka Streams need to be configured with
upgrade.from="<old.version>"
for startup - do a rolling bounce to get the new jar and config in place for each instance
- config
"upgrade.from"
tells the application to sendSubscription
using old endocing (version 1 or 2), to be compatible with potential version 1 or 2 leader in the group - instances will receive a corresponding version 1 or 2
Assigment
in this stage
- config
- user prepares a second round of rebalance; this time, the configuration parameter
upgrade.from
must be removed for new startup - do a second rolling bounce for each instance to get new config
- bounced instance can send a version 3
Subscription
as we know that leade will undestand version 3 - the leader sends version 1 or 2
Assigment
back as long as at least one version 1 or 2Subsription
is received - if the leader receives only version 3
Subscirption,
it send version 3Assigment
and the upgrad is completed
- bounced instance can send a version 3
Future upgrades with "version probing":
In the current implemenation, the group leader fails if it receives a subscription with a higher version number than it understands. We propose to change this: instead of failing, the leader will send an empty assignment back encoding its supported version. This allows the upgrade follower to downgrade it subscription and rejoin the group sending a subscription that the (not yet) upgraded leader understands. As we always encode the leader's supported version in the assignment, after the leader is upgrade and understand the new metadata version, all other instances can switch back to the higherst supporte metadata version.
Detailed upgrade protocol from metadata version X to Y.
- On startup/rolling-bounce, an instance does not know what version the leader understands and (optimistically) sends an
Subscription
with the latest version Y - (Old, ie, not yet upgraded) Leader sends empty
Assignment
back to the corresponding instance that sent the newerSubscription
it does not understand. TheAssignmet
metadata only encods both version numbers (used-version == supported-version) as leader's supported-version X. - For all other instances the leader sends a regular
Assignment
in version X back. - If an upgrade follower sends new version number Y
Subscription
and receives version XAssignment
with "supported-version = X", it can downgrade to X (in-memory flag) and resends a newSubscription
with old version X to retry joining the group. To force a immediate second rebalance, the follower does an "unsubscribe()/subscribe()/poll()" sequence. - As long as the leader (before or after upgrade) receives at least one old version X
Subscription
it always send used-versionAssignment
X back (the encoded supported version is X before the leader is upgrade and Y after the leader is upgraded). - If an upgraded instance receives an
Assigment
it always checks the leaders supported-version and update its downgraded "used-version" if possible
Thus, a single rolling bounce without any configs setting is required as the leader allows to probe its supported version instead of failing. Note, that if the leader is bounced last, the metadata upgrad only finishes after one more rebalance. We can trigger this rebalance with one more "unsubscribe()/subscribe()/poll()" sequnce (to make sure only one instance executes this, the leader should be responsible to trigger this final rebalance – note, if the leader is not bounced last, we can detect this and avoid the additional rebalance).
Compatibility, Deprecation, and Migration Plan
Increasing the rebalance metadata version to 3 is not a backward compatible change per-se. However, the outlined upgrade path allows userd to upgrade to 1.2 with zero downtime and two rolling bounces. Note, that an simplified "offline upgrade" is also possible. Instead of setting configs and doing two rolling bounced, all application instances are stopped. Afterwards, all instances are restarted with the new 1.2 jar.
Test Plan
- unit and integration tests for StreamPartitionsAssigner to react correctly to configs and received subscription
- system tests that perform rolling bounce upgrades as described above
- this should include failure scenario during the upgrade
- this should include "simulated upgrades" to metadata version 4, to ensure that the implementation work correct for future changes
Rejected Alternatives
- use consumer's built-in protocal upgrade mechanism (ie, register multple "assigment strategies")
- has the disadvantage that we need to implement two
StreamsPartitionAssingor
classes - increased network traffic during rebalance
- encoding "supported version" in metadata (ie, version probing) subsumes this approach for future releases
- if we want to "disable" the old protocol, a second rebalance is required, too
- has the disadvantage that we need to implement two
- Don't fix KAFKA-6054
- it's a simple fix to include: just add one more accepted values to parameter
upgrade.from
- it's s fair question, how many people will run with Streams 0.10.0 – note those, that if people are "stuck" with 0.10.0 broker, they can use 0.10.1 or newer as it's not backwards compatible to 0.10.0 – thus, might be more than expected
- it's a simple fix to include: just add one more accepted values to parameter
- Fix KAFKA-6054 only for 1.2 release
- it's a relativley simply fix for older releases (main desing work is already coverd and writing the code is not to complex becuase it's only the rebalance metadata version change)
- it's unclear though if we will have bug-fix releases for older versions; thus nobody might ever be able to get this code (if they don't build from corresponding dev-branches themselves)