Current state: Under Discussion
Discussion thread: None
JIRA:
GitHub : https://github.com/apache/kafka/pull/12705
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
We are using Kafka as a messaging system (without streaming) with the following properties :
Sometimes ago, we were using only one consumer per microservice so we have to scale up the number of pods (if topics have multiple partitions) if we would like to improve performance consumption.
Therefore, we have decided to have multiple consumers in a same microservice (1 thread per consumer).
Then some questions came up :
Combining these hypothesis and requirements, we though that having as many consumers as topics is the best option if each partitions of a topic is assigned to only one consumer.
So, we have needed of a new partition assignment strategy, the TopicRoundRobinAssignor.
Suppose there are two consumers C0
and C1
, two topics t0
and t1
, and each topic has 3 partitions, resulting in partitions t0p0
, t0p1
, t0p2
, tp1p0
, t1p1
and t1p2
.
The assignment will be:C0: [t0p0, t0p1, t0p2]
C1: [t1p0, t1p1, t1p2]
If we use one of the four existing assignor, we always get mixed partitions assigned to a topic (ie. a partition of different topic).
If one of the consumer fails, we kept in memory which partition failed in order to not propagate the "corrupt" message.
Otherwise, after rebalancing, when the "corrupt" message will be consumed by an other consumer, it will be stopped also, and so on, and so on...
Let me take the same example.
Suppose, there are 2 consumers C0
and C1
, and two topics t0
and t1
, and each has 3 partitions, resulting in partitions t0p0
, t0p1
, t0p2
, tp1p0
, t1p1
and t1p2
.
If I use the standard RoundRobinAssignor, the assignement will be :C0: [t0p0, t0p2, t1p1]
C1: [t0p1, t1p0, t1p2]
If C0
fails, it will be stopped and the new assignement will be :C1: [t0p1, t1p0, t1p2, t0p0, t0p2, t1p1]
As we keep in memory that we do not want to consume t0p0
, t0p2
, t1p1
, only records from t0p1
, t1p0
, t1p2
will continue to be consumed.
Therefore, if we do not fix the issue quickly about the "corrupt" message, the data from each topic will diverge (some records will be consumed, some other not).
If we use the TopicRoundRobinAssignor, the assignment will be :C0: [t0p0, t0p1, t0p2]
C1: [t1p0, t1p1, t1p2]
If C0
fails, it will be stopped and the new assignment will be :C1: [t0p0, t0p1, t0p2, t1p0, t1p1, t1p2]
As we keep in memory that we do not want to consume t0p0
, t0p1
, t0p2
, only records from t0p1
, t0p1
, t0p2
will continue to be consumed.
Therefore, I will continue to consume all records of a same topic and my data stay consistent as much as possible.
If we use one of the four existing assignor, we always get mixed partitions assigned to a topic (ie. a partition of different topic) and so records of a same topic can be proceed concurrently.
Let me take an example.
Suppose, there are 2 consumers C0
and C1
, and two topics t0
and t1
, and each has 3 partitions, resulting in partitions t0p0
, t0p1
, t0p2
, tp1p0
, t1p1
and t1p2
.
If we use the standard RoundRobinAssignor, the assignement will be :C0: [t0p0, t0p2, t1p1]
C1: [t0p1, t1p0, t1p2]
In that case, we must ensure that the processes executed when we received records from t0
and t1
must be thread safe because they came from two different threads C0
and C1
in parallel.
If we use the TopicRoundRobinAssignor, the assignement will be :C0: [t0p0, t0p1, t0p2]
C1: [t1p0, t1p1, t1p2]
In that case, each record record of a same topic is processed in the same thread, therefore there are less risks of concurrency issues.
Using the TopicRoundRobinAssignor, the partitions are uniformly balanced to each container, therefore we can get better performances.
Let suppose there are 2 instances of the same application A0
and A0
, 2 consumers C0
and C1
, and two topics t0
and t1
. t0
has 3 partitions and t1
has two partitions resulting in partitions : t0p0
, t0p1
, t0p2
, tp1p0
, t1p2
.
If we use the TopicRoundRobinAssignor, the assignment will be :A0: [C0: [t0p0, t0p2], C1: [t1p0]]
A1: [C0: [t0p1], C1: [t1p1]]
org.apache.kafka.clients.consumer.TopicRoundRobinAssignor
I propose to add the TopicRoundRobinAssignor as a possible partition assignment strategy, please refer to : https://github.com/apache/kafka/pull/12705
The TopicRoundRobinAssignor is unit tested, please refer to : https://github.com/apache/kafka/pull/12705.
Anyone that needs the same assignment partition strategy can create its own assignor or simply copy/paste the content of the TopicRoundRobinAssignor in its code and use it by configuring the partition.assignment.strategy
property.