Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Producer idempotence could be done automatically and much more cheaply by optionally integrating support for this on the server.
  2. The existing high-level consumer doesn't expose a lot of the more fine grained control of offsets (e.g. to reset your position). We will be working on that soon

 

How do I accurately get offsets of messages for a certain timestamp using OffsetFetchRequest?

Kafka allows querying offsets of messages by time and it does so at segment granularity. The timestamp parameter is the unix timestamp and querying the offset by timestamp returns the latest possible offset of the message that is appended no later than the given timestamp. There are 2 special values of the timestamp - latest and earliest. For any other value of the unix timestamp, Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes.

For more accurate results, you may configure the log segment size based on time (log.roll.ms) instead of size (log.segment.bytes). However care should be taken since doing so might increase the number of file handlers due to frequent log segment rolling.

Brokers

How does Kafka depend on Zookeeper?

Starting from 0.9, we are removing all the Zookeeper dependency from the clients (for details one can check this page). However, the brokers will continue to be heavily depend on Zookeeper for:

  1. Server failure detection.
  2. Data partitioning.
  3. In-sync data replication.
  4. Consumer membership management.

Once the Zookeeper quorum is down, brokers could result in a bad state and could not normally serve client requests, etc. Although when Zookeeper quorum recovers, the Kafka brokers should be able to resume to normal state automatically, there are still a few corner cases the they cannot and a hard kill-and-recovery is required to bring it back to normal. Hence it is recommended to closely monitor your zookeeper cluster and provision it so that it is performant.

Also note that if Zookeeper was hard killed previously, upon restart it may not successfully load all the data and update their creation timestamp. To resolve this you can clean-up the data directory of the Zookeeper before restarting (if you have critical metadata such as consumer offsets you would need to export / import them before / after you cleanup the Zookeeper data and restart the server).

Why does controlled shutdown fail?

If a controlled shutdown attempt fails, you will see error messages like the following in your broker logs

WARN [Kafka Server 0], Retrying controlled shutdown after the previous attempt failed... (kafka.server.KafkaServer)
WARN [Kafka Server 0], Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed

In addition to these error messages, if you also see SocketTimeoutExceptions, it indicates that the controller could not finish moving the leaders for all partitions on the broker within controller.socket.timeout.ms. The solution is to increase controller.socket.timeout.ms as well as increase controlled.shutdown.retry.backoff.ms and controlled.shutdown.max.retries to give enough time for the controlled shutdown to complete. If you don't see SocketTimeoutExceptions, it could indicate a problem in your cluster state or a bug as this happens when the controller is not able to move the leaders to another broker for several retries.

Why can't my consumers/producers connect to the brokers?

When a broker starts up, it registers its ip/port in ZK. You need to make sure the registered ip is consistent with what's listed in metadata.broker.list in the producer config. By default, the registered ip is given by InetAddress.getLocalHost.getHostAddress. Typically, this should return the real ip of the host. However, sometimes (e.g., in EC2), the returned ip is an internal one and can't be connected to from outside. The solution is to explicitly set the host ip to be registered in ZK by setting the "hostname" property in server.properties. In another rare case where the binding host/port is different from the host/port for client connection, you can set advertised.host.name and advertised.port for client connection.

Why partition leaders migrate themselves some times?

During a broker soft failure, e.g., a long GC, its session on ZooKeeper may timeout and hence be treated as failed. Upon detecting this situation, Kafka will migrate all the partition leaderships it currently hosts to other replicas. And once the broker resumes from the soft failure, it can only act as the follower replica of the partitions it originally leads.

To move the leadership back to the brokers, one can use the preferred-leader-election tool here. Also, in 0.8.2 a new feature will be added which periodically trigger this functionality (details here).

To reduce Zookeeper session expiration, either tune the GC or increase zookeeper.session.timeout.ms in the broker config.

How many topics can I have?

Unlike many messaging systems Kafka topics are meant to scale up arbitrarily. Hence we encourage fewer large topics rather than many small topics. So for example if we were storing notifications for users we would encourage a design with a single notifications topic partitioned by user id rather than a separate topic per user.

The actual scalability is for the most part determined by the number of total partitions across all topics not the number of topics itself (see the question below for details).

How do I choose the number of partitions for a topic?

There isn't really a right answer, we expose this as an option because it is a tradeoff. The simple answer is that the partition count determines the maximum consumer parallelism and so you should set a partition count based on the maximum consumer parallelism you would expect to need (i.e. over-provision). Clusters with up to 10k total partitions are quite workable. Beyond that we don't aggressively test (it should work, but we can't guarantee it).

Here is a more complete list of tradeoffs to consider:

  • A partition is basically a directory of log files.
  • Each partition must fit entirely on one machine. So if you have only one partition in your topic you cannot scale your write rate or retention beyond the capability of a single machine. If you have 1000 partitions you could potentially use 1000 machines.
  • Each partition is totally ordered. If you want a total order over all writes you probably want to have just one partition.
  • Each partition is not consumed by more than one consumer thread/process in each consumer group. This allows to have each process consume in a single threaded fashion to guarantee ordering to the consumer within the partition (if we split up a partition of ordered messages and handed them out to multiple consumers even though the messages were stored in order they would be processed out of order at times).
  • Many partitions can be consumed by a single process, though. So you can have 1000 partitions all consumed by a single process.
  • Another way to say the above is that the partition count is a bound on the maximum consumer parallelism.
  • More partitions will mean more files and hence can lead to smaller writes if you don't have enough memory to properly buffer the writes and coalesce them into larger writes
  • Each partition corresponds to several znodes in zookeeper. Zookeeper keeps everything in memory so this can eventually get out of hand.
  • More partitions means longer leader fail-over time. Each partition can be handled quickly (milliseconds) but with thousands of partitions this can add up.
  • When we checkpoint the consumer position we store one offset per partition so the more partitions the more expensive the position checkpoint is.
  • It is possible to later expand the number of partitions BUT when we do so we do not attempt to reorganize the data in the topic. So if you are depending on key-based semantic partitioning in your processing you will have to manually copy data from the old low partition topic to a new higher partition topic if you later need to expand.

Note that I/O and file counts are really about #partitions/#brokers, so adding brokers will fix problems there; but zookeeper handles all partitions for the whole cluster so adding machines doesn't help.

Why do I see lots of Leader not local exceptions on the broker during controlled shutdown?

This happens when the producer clients are using num.acks=0. When the leadership for a partition is changed, the clients (producer and consumer) gets an error when they try to produce or consume from the old leader when they wait for a response. The client then refreshes the partition metadata from zookeeper and gets the new leader for the partition and retries. This does not work for the producer client when ack = 0. This is because the producer does not wait for a response and hence does not know about the leadership change. The client would end up loosing messages till the shutdown broker is brought back up. This issue is fixed in KAFKA-955

How to reduce churns in ISR? When does a broker leave the ISR ?

ISR is a set of replicas that are fully sync-ed up with the leader. In other words, every replica in ISR has all messages that are committed. In an ideal system, ISR should always include all replicas unless there is a real failure. A replica will be dropped out of ISR if it diverges from the leader. This is controlled by two parameters: replica.lag.time.max.ms and replica.lag.max.messages. The former is typically set to a value that reliably detects the failure of a broker. We have a min fetch rate JMX in the broker. If that rate is n, set the former to a value larger than 1/n * 1000. The latter is typically set to the observed max lag (a JMX bean) in the follower. Note that if replica.lag.max.messages is too large, it can increase the time to commit a message. If latency becomes a problem, you can increase the number of partitions in a topic.

If a replica constantly drops out of and rejoins isr, you may need to increase replica.lag.max.messages. If a replica stays out of ISR for a long time, it may indicate that the follower is not able to fetch data as fast as data is accumulated at the leader. You can increase the follower's fetch throughput by setting a larger value for num.replica.fetchers.

After bouncing a broker, why do I see LeaderNotAvailable or NotLeaderForPartition exceptions on startup?

If you don't use controlled shutdown, some partitions that had leaders on the broker being bounced go offline immediately. The controller takes some time to elect leaders and notify the brokers to assume the new leader role. Following this, clients take some time to send metadata requests and discover the new leaders. If the broker is stopped and restarted quickly, clients that have not discovered the new leader keep sending requests to the newly restarted broker. The exceptions are throws since the newly restarted broker is not the leader for any partition.

Can I add new brokers dynamically to a cluster?

Brokers

How does Kafka depend on Zookeeper?

Starting from 0.9, we are removing all the Zookeeper dependency from the clients (for details one can check this page). However, the brokers will continue to be heavily depend on Zookeeper for:

  1. Server failure detection.
  2. Data partitioning.
  3. In-sync data replication.
  4. Consumer membership management.

Once the Zookeeper quorum is down, brokers could result in a bad state and could not normally serve client requests, etc. Although when Zookeeper quorum recovers, the Kafka brokers should be able to resume to normal state automatically, there are still a few corner cases the they cannot and a hard kill-and-recovery is required to bring it back to normal. Hence it is recommended to closely monitor your zookeeper cluster and provision it so that it is performant.

Also note that if Zookeeper was hard killed previously, upon restart it may not successfully load all the data and update their creation timestamp. To resolve this you can clean-up the data directory of the Zookeeper before restarting (if you have critical metadata such as consumer offsets you would need to export / import them before / after you cleanup the Zookeeper data and restart the server).

Why does controlled shutdown fail?

If a controlled shutdown attempt fails, you will see error messages like the following in your broker logs

WARN [Kafka Server 0], Retrying controlled shutdown after the previous attempt failed... (kafka.server.KafkaServer)
WARN [Kafka Server 0], Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed

In addition to these error messages, if you also see SocketTimeoutExceptions, it indicates that the controller could not finish moving the leaders for all partitions on the broker within controller.socket.timeout.ms. The solution is to increase controller.socket.timeout.ms as well as increase controlled.shutdown.retry.backoff.ms and controlled.shutdown.max.retries to give enough time for the controlled shutdown to complete. If you don't see SocketTimeoutExceptions, it could indicate a problem in your cluster state or a bug as this happens when the controller is not able to move the leaders to another broker for several retries.

Why can't my consumers/producers connect to the brokers?

When a broker starts up, it registers its ip/port in ZK. You need to make sure the registered ip is consistent with what's listed in metadata.broker.list in the producer config. By default, the registered ip is given by InetAddress.getLocalHost.getHostAddress. Typically, this should return the real ip of the host. However, sometimes (e.g., in EC2), the returned ip is an internal one and can't be connected to from outside. The solution is to explicitly set the host ip to be registered in ZK by setting the "hostname" property in server.properties. In another rare case where the binding host/port is different from the host/port for client connection, you can set advertised.host.name and advertised.port for client connection.

Why partition leaders migrate themselves some times?

During a broker soft failure, e.g., a long GC, its session on ZooKeeper may timeout and hence be treated as failed. Upon detecting this situation, Kafka will migrate all the partition leaderships it currently hosts to other replicas. And once the broker resumes from the soft failure, it can only act as the follower replica of the partitions it originally leads.

To move the leadership back to the brokers, one can use the preferred-leader-election tool here. Also, in 0.8.2 a new feature will be added which periodically trigger this functionality (details here).

To reduce Zookeeper session expiration, either tune the GC or increase zookeeper.session.timeout.ms in the broker config.

How many topics can I have?

Unlike many messaging systems Kafka topics are meant to scale up arbitrarily. Hence we encourage fewer large topics rather than many small topics. So for example if we were storing notifications for users we would encourage a design with a single notifications topic partitioned by user id rather than a separate topic per user.

The actual scalability is for the most part determined by the number of total partitions across all topics not the number of topics itself (see the question below for details).

How do I choose the number of partitions for a topic?

There isn't really a right answer, we expose this as an option because it is a tradeoff. The simple answer is that the partition count determines the maximum consumer parallelism and so you should set a partition count based on the maximum consumer parallelism you would expect to need (i.e. over-provision). Clusters with up to 10k total partitions are quite workable. Beyond that we don't aggressively test (it should work, but we can't guarantee it).

Here is a more complete list of tradeoffs to consider:

  • A partition is basically a directory of log files.
  • Each partition must fit entirely on one machine. So if you have only one partition in your topic you cannot scale your write rate or retention beyond the capability of a single machine. If you have 1000 partitions you could potentially use 1000 machines.
  • Each partition is totally ordered. If you want a total order over all writes you probably want to have just one partition.
  • Each partition is not consumed by more than one consumer thread/process in each consumer group. This allows to have each process consume in a single threaded fashion to guarantee ordering to the consumer within the partition (if we split up a partition of ordered messages and handed them out to multiple consumers even though the messages were stored in order they would be processed out of order at times).
  • Many partitions can be consumed by a single process, though. So you can have 1000 partitions all consumed by a single process.
  • Another way to say the above is that the partition count is a bound on the maximum consumer parallelism.
  • More partitions will mean more files and hence can lead to smaller writes if you don't have enough memory to properly buffer the writes and coalesce them into larger writes
  • Each partition corresponds to several znodes in zookeeper. Zookeeper keeps everything in memory so this can eventually get out of hand.
  • More partitions means longer leader fail-over time. Each partition can be handled quickly (milliseconds) but with thousands of partitions this can add up.
  • When we checkpoint the consumer position we store one offset per partition so the more partitions the more expensive the position checkpoint is.
  • It is possible to later expand the number of partitions BUT when we do so we do not attempt to reorganize the data in the topic. So if you are depending on key-based semantic partitioning in your processing you will have to manually copy data from the old low partition topic to a new higher partition topic if you later need to expand.

Note that I/O and file counts are really about #partitions/#brokers, so adding brokers will fix problems there; but zookeeper handles all partitions for the whole cluster so adding machines doesn't help.

Why do I see lots of Leader not local exceptions on the broker during controlled shutdown?

This happens when the producer clients are using num.acks=0. When the leadership for a partition is changed, the clients (producer and consumer) gets an error when they try to produce or consume from the old leader when they wait for a response. The client then refreshes the partition metadata from zookeeper and gets the new leader for the partition and retries. This does not work for the producer client when ack = 0. This is because the producer does not wait for a response and hence does not know about the leadership change. The client would end up loosing messages till the shutdown broker is brought back up. This issue is fixed in KAFKA-955

How to reduce churns in ISR? When does a broker leave the ISR ?

ISR is a set of replicas that are fully sync-ed up with the leader. In other words, every replica in ISR has all messages that are committed. In an ideal system, ISR should always include all replicas unless there is a real failure. A replica will be dropped out of ISR if it diverges from the leader. This is controlled by two parameters: replica.lag.time.max.ms and replica.lag.max.messages. The former is typically set to a value that reliably detects the failure of a broker. We have a min fetch rate JMX in the broker. If that rate is n, set the former to a value larger than 1/n * 1000. The latter is typically set to the observed max lag (a JMX bean) in the follower. Note that if replica.lag.max.messages is too large, it can increase the time to commit a message. If latency becomes a problem, you can increase the number of partitions in a topic.

If a replica constantly drops out of and rejoins isr, you may need to increase replica.lag.max.messages. If a replica stays out of ISR for a long time, it may indicate that the follower is not able to fetch data as fast as data is accumulated at the leader. You can increase the follower's fetch throughput by setting a larger value for num.replica.fetchers.

After bouncing a broker, why do I see LeaderNotAvailable or NotLeaderForPartition exceptions on startup?

If you don't use controlled shutdown, some partitions that had leaders on the broker being bounced go offline immediately. The controller takes some time to elect leaders and notify the brokers to assume the new leader role. Following this, clients take some time to send metadata requests and discover the new leaders. If the broker is stopped and restarted quickly, clients that have not discovered the new leader keep sending requests to the newly restarted broker. The exceptions are throws since the newly restarted broker is not the leader for any partition.

Can I add new brokers dynamically to a cluster?

Yes, new brokers can be added online to a cluster. Those new brokers won't have any data initially until either some new topics are created or some replicas are moved to them using the partition reassignment tool. 

How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?

 

Kafka allows querying offsets of messages by time and it does so at segment granularity. The timestamp parameter is the unix timestamp and querying the offset by timestamp returns the latest possible offset of the message that is appended no later than the given timestamp. There are 2 special values of the timestamp - latest and earliest. For any other value of the unix timestamp, Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes.

 

For more accurate results, you may configure the log segment size based on time (log.roll.ms) instead of size (log.segment.bytes). However care should be taken since doing so might increase the number of file handlers due to frequent log segment rolling.

Yes, new brokers can be added online to a cluster. Those new brokers won't have any data initially until either some new topics are created or some replicas are moved to them using the partition reassignment tool. 

Build issues

How do I get Kafka dependencies to work in Play framework?

...