...
With the existing built-in Assignor implementations, they will be updated accordingly:
Highest Version | Supported Strategy | Notes | |
---|---|---|---|
RangeAssignor | 0 | Eager | Current default value. |
RoundRobinAssignor | 0 | Eager | |
StickyAssignor | 0 | Eager | |
CooperativeStickyAssignor | 0 | Eager, Cooperative | To be default value in 3.0 |
StreamsAssignor | 4 | Eager, Cooperative |
The reason we make "range" and "round-robin" to not support cooperative rebalance is that, this protocol implicitly relies on the assignor to be somewhat sticky to make benefits by trading an extra rebalance. However, for these two assignors, they would not be sticky (although sometimes range may luckily reassign partitions back to old owners, it is not best-effort) and hence we've decided to not make them be selected for cooperative protocol. The existing StickyAssignor was not made to support Cooperative to ensure users follow the smooth upgrade path outlined below, and avoid running into trouble if they already use the StickyAssignor and blindly upgrade.
...
The specific upgrade path is described below. Note that this will be different depending on whether you have a plain consumer app or a Streams app, and make sure to follow the appropriate one.
...
From the user's perspective, the upgrade path of leveraging new protocols is just the same as switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config. The upgrade path would be:
...
The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.
...
...
These changes should all be fairly transparent to Streams apps, as there are no semantics only improved rebalancing performance. However, users using Interactive Queries (IQ) or implementing a StateListener will notice that Streams spends less time in the REBALANCING state, as we will not transition to that until the end of the rebalance. This means all owned stores will remain open for IQ while the rebalance is in progress, and Streams will continue to restore active tasks if there are any that are not yet running, and will process standbys if there aren't.
Looking into the Future: Allow Consumer to Return Records in Rebalance
As summarized in
Jira | ||||||
---|---|---|---|---|---|---|
|
In order to do this, we'd need to allow the consumer#commit API to throw RebalanceInProgressException if it is committing offset while a rebalance is undergoing.
Code Block | ||
---|---|---|
| ||
/**
* ...
*
* @throws org.apache.kafka.common.errors.RebalanceInProgressException if the consumer instance is in the middle of a rebalance
* so it is not yet determined which partitions would be assigned to the consumer. In such cases you can first
* complete the rebalance by calling {@link #poll(Duration)} and commit can be reconsidered afterwards.
* NOTE when you reconsider committing after the rebalance, the assigned partitions may have changed,
* and also for those partitions that are still assigned their fetch positions may have changed too
* if more records are returned from the {@link #poll(Duration)} call.
* ...
*/
@Override
public void commitSync() {
commitSync(Duration.ofMillis(defaultApiTimeoutMs));
} |
Looking into the Future: Heartbeat Communicated Protocol
...
The existing built-in Assignor implementations will then be updated to:
Highest Version | Supported Strategy | Notes | |
---|---|---|---|
RangeAssignor | 0 | Eager | Current default. |
RoundRobinAssignor | 0 | Eager | |
StickAssignor (old) | 0 | Eager | |
StickAssignor (new) | 0 | Cooperative | Will be new default in 3.0 |
StreamsAssignor (old) | 4 | Eager | |
StreamsAssignor (new) | 4 | Cooperative |
Although it makes the upgrade path simpler since we would no longer need the "rebalance.protocol" config on consumer anymore, while just encoding multiple assignors during the first rolling bounce of the upgrade path, it requires duplicated assignor class (of course, the new class could just be extending from the old one and there's not much LOC duplicated) which is a bit cumbersome.
...