Producers

Why do I get QueueFullException in my producer when running in async mode?

This typically happens when the producer is trying to send messages quicker than the broker can handle. If the producer can't block, one will have to add enough brokers so that they jointly can handle the load. If the producer can block, one can set queue.enqueueTimeout.ms in producer config to -1. This way, if the queue is full, the producer will block instead of dropping messages.

Consumers

Why does my consumer get InvalidMessageSizeException?

This typically means that the "fetch size" of the consumer is too small. Each time the consumer pulls data from the broker, it reads bytes up to a configured limit. If that limit is smaller than the largest single message stored in Kafka, the consumer can't decode the message properly and will throw an InvalidMessageSizeException. To fix this, increase the limit by setting the property "fetch.size" properly in config/consumer.properties. The default fetch.size is 300,000 bytes.

On EC2, why can't my high-level consumers connect to the brokers?

When a broker starts up, it registers its host ip in ZK. The high-level consumer later uses the registered host ip to establish the socket connection to the broker. By default, the registered ip is given by InetAddress.getLocalHost.getHostAddress. Typically, this should return the real ip of the host. However, in EC2, the returned ip is an internal one and can't be connected to from outside. The solution is to explicitly set the host ip to be registered in ZK by setting the "hostname" property in server.properties.

Should I choose multiple group ids or a single one for the consumers?

If all consumers use the same group id, messages in a topic are distributed among those consumers. In other words, each consumer will get a non-overlapping subset of the messages. Having more consumers in the same group increases the degree of parallelism and the overall throughput of consumption. See the next question for the choice of the number of consumer instances. On the other hand, if each consumer is in its own group, each consumer will get a full copy of all messages.

Why some of the consumers in a consumer group never receive any message?

Currently, a topic partition is the smallest unit that we distribute messages among consumers in the same consumer group. So, if the number of consumers is larger than the total number of partitions in a Kafka cluster (across all brokers), some consumers will never get any data. The solution is to increase the number of partitions on the broker.

Why are there many rebalances in my consumer log?

A typical reason for many rebalances is the consumer side GC. If so, you will see Zookeeper session expirations in the consumer log (grep for Expired). Occasional rebalances are fine. Too many rebalances can slow down the consumption and one will need to tune the java GC setting.

Can I predict the results of the consumer rebabalance?

During the rebalance process, each consumer will execute the same deterministic algorithm to range partition a sorted list of topic-partitions over a sorted list of consumer instances. This makes the whole rebalancing process deterministic. For example, if you only have one partition for a specific topic and going to have two consumers consuming this topic, only one consumer will get the data from the partition of the topic; and even if the consumer named "Consumer1" is registered after the other consumer named "Consumer2", it will replace "Consumer2" gaining the ownership of the partition in the rebalance.

My consumer seems to have stopped, why?

First, try to figure out if the consumer has really stopped or is just slow, using our tool

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \--group consumer-group1 \--zkconnect zkhost:zkport \--topic topic1 
consumer-group1,topic1,0-0 (Group,Topic,BrokerId-PartitionId)
Owner = consumer-group1-consumer1
Consumer offset = 70121994703
= 70,121,994,703 (65.31G)
Log size = 70122018287
= 70,122,018,287 (65.31G)
Consumer lag = 23584
= 23,584 (0.00G)

If consumer offset is not moving after some time, then consumer is likely to have stopped. If consumer offset is moving, but consumer lag (difference between the end of the log and the consumer offset) is increasing, the consumer is slower than the producer. If the consumer is slow, the typical solution is to increase the degree of parallelism in the consumer. This may require increasing the number of partitions of a topic.

The high-level consumer will block if

Why messages are delayed in my consumer?

This could be a general throughput issue. If so, you can use more consumer streams (may need to increase # partitions) or make the consumption logic more efficient.

Another potential issue is when multiple topics are consumed in the same consumer connector. Internally, we have an in-memory queue for each topic, which feed the consumer iterators. We have a single fetcher thread per broker that issues multi-fetch requests for all topics. The fetcher thread iterates the fetched data and tries to put the data for different topics into its own in-memory queue. If one of the consumer is slow, eventually its corresponding in-memory queue will be full. As a result, the fetcher thread will block on putting data into that queue. Until that queue has more space, no data will be put into the queue for other topics. Therefore, those other topics, even if they have less volume, their consumption will be delayed because of that. To address this issue, either making sure that all consumers can keep up, or using separate consumer connectors for different topics.

Brokers

How many topics can I have?

Unlike many messaging systems Kafka topics are meant to scale up arbitrarily. Hence we encourage fewer small topics rather than many small topics. So for example if we were storing notifications for users we would encourage a design with a single notifications topic partitioned by user id rather than a separate topic per user.

The actual scalability is for the most part determined by the number of total partitions across all topics not the number of topics itself (see the question below for details).

How do I choose the number of partitions for a topic?

There isn't really a right answer, we expose this as an option because it is a tradeoff. The simple answer is that the partition count determines the maximum consumer parallelism and so you should set a partition count based on the maximum consumer parallelism you would expect to need (i.e. over-provision). Clusters with up to 10k total partitions are quite workable. Beyond that we don't aggressively test (it should work, but we can't guarantee it).

Here is a more complete list of tradeoffs to consider:

Note that I/O and file counts are really about #partitions/#brokers, so adding brokers will fix problems there; but zookeeper handles all partitions for the whole cluster so adding machines doesn't help.