You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

In Kafka releases through 0.8.1.1, consumers commit their offsets to ZooKeeper. ZooKeeper does not scale extremely well (especially for writes) when there are a large number of offsets (i.e., consumer-count * partition-count). Fortunately, Kafka now provides an ideal mechanism for storing consumer offsets. Consumers can commit their offsets in Kafka by writing them to a durable (replicated) and highly available topic. Consumers can fetch offsets by reading from this topic (although we provide an in-memory offsets cache for faster access). i.e., offset commits are regular producer requests (which are inexpensive) and offset fetches are fast memory look ups.

The official Kafka documentation describes how the feature works and how to migrate offsets from ZooKeeper to Kafka. This wiki provides sample code that shows how to use the new Kafka-based offset storage mechanism.

Step 1: Discover the offset manager for a consumer group

BlockingChannel offsetChannel;
// suppose a query channel has been established to some broker
try {
  queryChannel.send(new ConsumerMetadataRequest("myconsumergroup"));
  ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(queryChannel.read().buffer());
  if (metadataResponse.errorCode == ErrorMapping.NoError && metadataResponse.coordinator != null) {
    if (metadataResponse.coordinator.host == queryChannel.host && metadataResponse.port == queryChannel.port)
      offsetChannel = queryChannel;
    else {
      offsetChannel = new BlockingChannel(coordinator.host, coordinator.port, BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), socketTimeoutMs);
      queryChannel.disconnect();
    }
  } else {
    // retry the query (after backoff)
  }
}
catch (IOException e) {
  // retry the query (after backoff)
}
  • No labels