Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For the effectiveness of the KIP, consumer with `group.instance.id` set will not send leave group request when they go offline, which means we shall only rely on session.timeout to trigger group rebalance. It is because the proposed rebalance protocol will trigger rebalance with this intermittent in-and-out which is not ideal. In static membership we leverage the consumer group health management to client application such as k8s. Therefore, it is also advised to make the session timeout large enough so that broker side will not trigger rebalance too frequently due to member come and go. By having a handful admin tool, user could proactively remove members if session timeout is too long in runtime.

Within the join group response, we are also Since the member id is randomly generated by broker, the persistence behavior of static membership will be hindered since the leader doesn't know whether this member is new or old. For leader to make better assignment decision,  we are attaching `group.instance.id` on response members so that the leader could make better assignment decision. Since the within the join group response.

One example is like (Thanks Jason for the idea!):

members: {A=1, B=2, C=3}
generation: 5

In fact, the consumer leader of the group is not aware of the instance ids of the members. So it sees the membership as:

members: {1, 2, 3}.
generation: 5

Now suppose that A does a rolling restart. After restarting, the coordinator will assign a new memberId to A and let it continue using the previous assignment. So we now have the following state:

Code Block
Suppose we have three consumers in the group with static instance ids: A, B, and C. Assume we have a stable group and the respective memberIds are 1, 2, and 3. So inside the group coordinator, we have the following state:
members: {A=

...

1, B=2, C=3}
generation: 5

In fact, the consumer leader of the group is not aware of the instance ids of the members. So it sees the membership as:
members: {1, 2, 3}.
generation: 5

Now suppose that A does a rolling restart. After restarting, the coordinator will assign a new memberId to A and let it continue using the previous assignment. So we now have the following state:
members: {A=4, B=2, C=3}
generation: 5


The leader on the other hand still sees the members in the group as {1, 2, 3} because it does not know that member A restarted and was given a new memberId. Suppose that eventually something causes the group to rebalance (e.g. maybe a new topic was created).
When the leader attempts its assignment, it will see the members {2, 3, 4}. 

However, appending group.instance.id for join group response provides some benefit even for the simple partition assignors. Consider, the default range assignor, for example. Basically it works by sorting the members in the group and then assigning partition ranges to achieve balance. Suppose we have a partition with 9 partitions. If the membership were {1, 2, 3}, then the assignment would be the following:
memberId: 1, assignment: {0, 1, 2}
memberId: 2, assignment: {3, 4, 5}
memberId: 3, assignment: {6, 7, 8}

Now when the membership changes to {2, 3, 4}, then all the assignments change as well:
memberId: 2, assignment: {0, 1, 2}
memberId: 3, assignment: {3, 4, 5}
memberId: 4, assignment: {6, 7, 8}

So basically all of the assignments change even though it's the same static members. However, if we could consider the instanceId as the first sort key, then we can compute the assignment consistently even across restarts:
instanceId: A, memberId: 1, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}

And after the restart:
instanceId: A, memberId: 4, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}


The full benefit of static assignment can only be realized if the assignor knows the instance ids of the members in the group. It shouldn't be necessary to do anything fancy with additional metadata.


Kafka Streams Change

KStream uses stream thread as consumer unit. For a stream instance configured with `num.threads` = 16, there would be 16 main consumers running on a single instance. If user specifies the client id, the stream consumer client id will be like: User client id + "-StreamThread-" + thread id + "-consumer"If user client id is not set, then we will use process id. Our plan is to reuse the consumer client id to define `group.instance.id`, so effectively the KStream instance will be able to use static membership if end user defines unique `client.id` for stream instances.

...