Current state: "Under Discussion"
Discussion thread: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade
JIRA: KAFKA-6054
Released: 1.2
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Kafka Streams exchanges custom metadata during rebalance. This metadata has a magic version number encode to allow upgrading of the metadata format. In the past we upgraded the metadata format once (in 0.10.1.0 release), but did not design a proper upgrade path. This implies, that upgrading from 0.10.0.x versions, is only possible if the whole application is shut down first (ie, rolling bounces don't work).
In 1.2, we plan to upgrade the metadata format again and thus need to fix KAFKA-6054. The current implementation forces us, to design an upgrade path with two rolling bounced. This also requires that users configure the application correctly during upgrade (see details below). This is cumbersome and error prone for the user. Thus, we also propose a simplified upgrade path with no requirements for users to configure Kafka Streams correctly for upgrade and also to allow for single rolling bounce upgrades.
Note, that this KIP is requirement for KIP-258 and KIP-262.
Public Interfaces
For the upgrade from 0.10.0.x, ..., 1.1.x version to 1.2 version, we need to add a new configuration parameter upgrade.from
that will be null
by default and can take the following values:
null
: no upgrade required (if a new application is started or upgrade was done already)
"0.10.0"
, "0.10.1"
, "0.10.2"
, "0.11.0"
, "1.0"
, and "1.1"
: for upgrading from 0.10.0.x,..., 1.1.x to 1.2Note, that the above proposal only fixes KAFKA-6054 in 1.2. If we want to have fixes for versions 0.10.1.x, ...,1.1.x for KAFKA-6054, we would need to back port only one required value:
null
: for no upgrade required"0.10.0"
: for upgrading from 0.10.0.x to any version of 0.10.1.x, ..., 1.1.xWe add the above config for upgrading from 0.10.0.x, ..., 1.1.x to 1.2 and future releases. We describe the upgrade path in detail below. To enable single rolling bounce upgrade with no required configuration of the applications, we add a second magic byte encoding "supported version" into the rebalance metadata that allows us to implement a "version probing" step. If we want to change the metadata in pre-1.2 releases again, this "version probing" step allows for single rolling bounce upgrade (details below).
Note, that the currently used rebalance metadata version are 1 (0.10.0.x) and 2 (0.10.1.x, ..., 1.1.x). We increase the metadata version number to 3 in 1.2 release.
// Metadata v2 Metadata => VersionNumber ClientId PreviosAssignedTasks StandbyTasks UserEndpoint VersionNumber => int32 ClientId => int64 int64 // UUID most signification bits + least significant bits PreviosAssignedTasks => numTasks [TaskId] StandbyTasks => numTasks [TaskId] UserEndpoint => int32 String // string length + string numTasks => int32 TaskId => GroupId Partition GroupId => int32 Partition => 32 // Metadata v3 // ProduceRequest v3 ProduceRequest => TransactionalId RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]] TransactionalId => nullableString RequiredAcks => int16 Timeout => int32 Partition => int32 MessageSetSize => int32 MessageSet => bytes |
upgrade.from="<old.version>"
for startup"upgrade.from"
tells the application to send Subscription
using old endocing (version 1 or 2), to be compatible with potential version 1 or 2 leader in the groupAssigment
in this stageupgrade.from
must be removed for new startupupgrade.from
removed)Subscription
as we know that leader understands version 3Assigment
back as long as at least one version 1 or 2 Subsription
is receivedSubscirption,
it send version 3 Assigment
and the upgrad is completedIn the current implementation, the group leader fails if it receives a subscription with a higher version number than it understands. We propose to change this: instead of failing, the leader will send an empty assignment back encoding its supported version. This allows the upgraded follower to downgrade its subscription and rejoin the group sending a subscription that the (not yet) upgraded leader understands. As we always encode the leader's supported version in the assignment, after the leader is upgrade and understand the new metadata version, all other instances can switch back to the highest supported metadata version.
Subscription
with the latest version YAssignment
back to the corresponding instance that sent the newer Subscription
it does not understand. The Assignment
metadata only encodes both version numbers (used-version == supported-version) as leader's supported-version X.Assignment
in version X back.Subscription
and receives version X Assignment
with "supported-version = X", it can downgrade to X (in-memory flag) and resends a new Subscription
with old version X to retry joining the group. To force an immediate second rebalance, the follower does an "unsubscribe()/subscribe()/poll()" sequence.Subscription
it always sends version Assignment
X back (the encoded supported version is X before the leader is upgrade and Y after the leader is upgraded).Assigment
it always checks the leaders supported-version and update its downgraded "used-version" if possibleThus, a single rolling bounce without any config settings is sufficient as the leader allows to probe its supported version instead of failing. Note, that if the leader is bounced last, the metadata upgrade only finishes after one more rebalance. We can trigger this rebalance with one more "unsubscribe()/subscribe()/poll()" sequence (to make sure only one instance executes this, the leader should be responsible to trigger this final rebalance – note, if the leader is not bounced last, we can detect this and avoid the additional rebalance).
Increasing the rebalance metadata version to 3 is not a backward compatible change per-se. However, the outlined upgrade path allows users to upgrade to 1.2 with zero downtime and two rolling bounces. Note, that an simplified "offline upgrade" is also possible. Instead of setting configs and doing two rolling bounces, all application instances can be stopped. Afterwards, all instances are restarted with the new 1.2 jar.
Test matrix:
from version | to 0.10.1.x | to 0.10.2.x | to 0.11.0.x | to 1.0.x | to 1.1.x | to 1.2.x | to post-1.2.x (simulate metadata version 4) |
---|---|---|---|---|---|---|---|
0.10.0.x | x (*) | x (*) | x (*) | x (*) | x (*) | x | x |
0.10.1.x | x (*) | x (*) | x (*) | x (*) | x | x | |
0.10.2.x | x (*) | x (*) | x (*) | x | x | ||
0.11.0.x | x (*) | x (*) | x | x | |||
1.0.x | x (*) | x | x | ||||
1.1.x | x | x | |||||
1.2.x | x (tests "version probing") |
(*): requires back porting of KAFKA-6054 to older branches
StreamsPartitionAssingor
classesupgrade.from