Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • there are no more messages available
    • The ConsumerOffsetChecker will show that the log offset of the partitions being consumed does not change on the broker
  • the next message available is larger than the maximum fetch size you have specified
    • One possibility of a stalled consumer is that the fetch size in the consumer is smaller than the largest message in the broker. You can use the DumpLogSegments tool to figure out the largest message size and set fetch.size in the consumer config accordingly.
  • your client code simply stops pulling messages from the iterator (the blocking queue will fill up).
    • One of the typical causes is that the application code that consumes messages somehow died and therefore killed the consumer thread. We recommend using a try/catch clause to log all Throwable in the consumer logic.

Brokers

How many topics can I have?

Unlike many messaging systems Kafka topics are meant to scale up arbitrarily. Hence we encourage fewer small topics rather than many small topics. So for example if we were storing notifications for users we would encourage a design with a single notifications topic partitioned by user id rather than a separate topic per user.

The actual scalability is for the most part determined by the number of total partitions across all topics not the number of topics itself (see the question below for details).

How do I choose the number of partitions for a topic?

There isn't really a right answer, we expose this as an option because it is a tradeoff. The simple answer is that the partition count determines the maximum consumer parallelism and so you should set a partition count based on the maximum consumer parallelism you would expect to need (i.e. over-provision). Clusters with up to 10k total partitions are quite workable. Beyond that we don't aggressively test (it should work, but we can't guarantee it).

Here is a more complete list of tradeoffs to consider:

  • A partition is basically a directory of log files.
  • Each partition must fit entirely on one machine. So if you have only one partition in your topic you cannot scale your write rate or retention beyond the capability of a single machine. If you have 1000 partitions you could potentially use 1000 machines.
  • Each partition is totally ordered. If you want a total order over all writes you probably want to have just one partition.
  • Each partition is not consumed by more than one consumer thread/process in each consumer group. This allows to have each process consume in a single threaded fashion to guarantee ordering to the consumer within the partition (if we split up a partition of ordered messages and handed them out to multiple consumers even though the messages were stored in order they would be processed out of order at times).
  • Many partitions can be consumed by a single process, though. So you can have 1000 partitions all consumed by a single process.
  • Another way to say the above is that the partition count is a bound on the maximum consumer parallelism.
  • More partitions will mean more files and hence can lead to smaller writes if you don't have enough memory to properly buffer the writes and coalesce them into larger writes
  • Each partition corresponds to several znodes in zookeeper. Zookeeper keeps everything in memory so this can eventually get out of hand.
  • More partitions means longer leader fail-over time. Each partition can be handled quickly (milliseconds) but with thousands of partitions this can add up.
  • When we checkpoint the consumer position we store one offset per partition so the more partitions the more expensive the position checkpoint is.
  • It is possible to later expand the number of partitions BUT when we do so we do not attempt to reorganize the data in the topic. So if you are depending on key-based semantic partitioning in your processing you will have to manually copy data from the old low partition topic to a new higher partition topic if you later need to expand.

Note that I/O and file counts are really about #partitions/#brokers, so adding brokers will fix problems there; but zookeeper handles all partitions for the whole cluster so adding machines doesn't helpHaving more partitions increases I/O parallelism for writes and thus leads to higher producer throughput. It also increases the degree of parallelism for consumers (see the previous question). On the other hand, more partitions adds some overhead: (a) there will be more segment files and thus more open file handlers in the broker; (b) there are more offsets to be checkpointed by consumers which can increase the load of Zookeeper. So, one needs to balace these tradeoffs.