Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updated some of the consumer questions for 0.9

...

By default, when a consumer is started for the very first time, it ignores all existing data in a topic and will only consume new data coming in after the consumer is started. If this is the case, try sending some more data after the consumer is started. Alternatively, you can configure the consumer by setting auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest" for the old consumer.

Why does my consumer get InvalidMessageSizeException?

This typically means that the "fetch size" of the consumer is too small. Each time the consumer pulls data from the broker, it reads bytes up to a configured limit. If that limit is smaller than the largest single message stored in Kafka, the consumer can't decode the message properly and will throw an InvalidMessageSizeException. To fix this, increase the limit by setting the property "fetch.size" (0.7) / "fetch.message.max.bytes" (0.8) properly in config/consumer.properties. The default fetch.size is 300,000 bytes. For the new consumer in 0.9, the property to adjust is "max.partition.fetch.bytes," and the default is 1MB.

Should I choose multiple group ids or a single one for the consumers?

...

If the consumer is in a different data center from the broker, you may need to tune the socket buffer size to amortize the long network latency. Specifically, for Kafka 0.7, you can increase socket.receive.buffer in the broker, and socket.buffersize and fetch.size in the consumer. For Kafka 0.8, the consumer properties are socket.receive.buffer.bytes and fetch.message.max.bytes.

How can I rewind the offset in the consumer?

With the new consumer in 0.9, we have added a seek API to set to next position that will be fetched. You can either seek to the earliest position with seekToBeginning(), the latest with seekToEnd(), or to an arbitrary offset with seek().

If you are using the older high level consumer, currently there is no api to reset the offsets in the consumer. The only way is to If you are using the high level consumer, currently there is no api to reset the offsets in the consumer. The only way is to stop all consumers and reset the offsets for that consumer group in ZK manually. We do have an import/export offset tool that you can use (bin/kafka-run-class.sh kafka.tools.ImportZkOffsets and bin/kafka-run-class.sh kafka.tools.ExportZkOffsets). To get the offsets for importing, we have a GetOffsetShell tool (bin/kafka-run-class.sh kafka.tools.GetOffsetShell) that allows you to get the offsets before a give timestamp. The offsets returned there are the offsets corresponding to the first message of each log segment. So the granularity is very coarse.

I don't want my consumer's offsets to be committed automatically. Can I manually manage my consumer's offsets?

You can turn off the autocommit behavior (which is on by default) by setting auto.commit.enable=false in your consumer's config. There are a couple of caveats to keep in mind when doing this:

  • You will manually commit offsets using the consumer's commitOffsets API. Note that this will commit offsets for all partitions that the consumer currently owns. The consumer connector does not currently provide a more fine-grained commit API.
  • If a consumer rebalances for any reason it will fetch the last committed offsets for any partitions that it ends up owning. If you have not yet committed any offsets for these partitions, then it will use the latest or earliest offset depending on whether auto.offset.reset is set to largest or smallest (respectively).

So For releases prior to 0.9, if you need more fine-grained control over offsets, you will need to use the SimpleConsumer and manage offsets on your own. We hope to address this deficiency in the client rewrite: https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPIIn 0.9, we have added a commitOffsets() API to the old consumer which takes a map with the specific offsets to be committed. A similar API is also available for the new consumer (commitSync() and commitAsync()).

What is the relationship between fetch.wait.max.ms and socket.timeout.ms on the consumer?

...

First you need to make sure these large messages can be acceptted accepted at Kafka brokers:
{code}
. The broker property message.max.bytes
{code}
 controls the maximum size of a message that can be accepted at the broker, and any single message (including the wrapper message for compressed message set) whose size is larger than this value will be rejected for producing.Then  Then you need to make sure consumers can fetch such large messages on brokers:
{code}
from brokers. For the old consumer, you should use the property fetch.message.max.bytes
{code}
controls ,which controls the maximum number of bytes a consumer issues in one fetch. If it is less than a message's size, the fetching will be blocked on that message keep retrying. The property for the new consumer is max.partition.fetch.bytes.

How do we migrate to committing offsets to Kafka (rather than Zookeeper) in 0.8.2?

...