Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Reverted from v. 6

...

The official Kafka documentation describes how the feature works and how to migrate offsets from ZooKeeper to Kafka. This wiki provides sample (Scala) code that shows how to use the new Kafka-based offset storage mechanism. The javaapi package does not currently support committing offsets to Kafka.

Step 1: Discover the offset manager for a consumer group by issuing a consumer metadata request to any broker

Code Block
BlockingChannel offsetChannel;
BlockingChannel queryChannel;
// establish the query channel to some broker
try {
  queryChannel.send(new ConsumerMetadataRequest("myconsumergroup", ConsumerMetadataRequest.CurrentVersion(), correlationId, "myClientId"));
  ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(queryChannel.receive().buffer());
  if (metadataResponse.errorCode() == ErrorMapping.NoError() && metadataResponse.coordinator() != null) {
    if (metadataResponse.coordinator().host() == queryChannel.host() && metadataResponse.coordinator().port() == queryChannel.port)
      offsetChannel = queryChannel;
    else {
      offsetChannel = new BlockingChannel(metadataResponse.coordinator().host(), metadataResponse.coordinator().port(),
                                          BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), socketTimeoutMs);
      queryChannel.disconnect();
    }
  } else {
    // retry the query (after backoff)
  }
}
catch (IOException e) {
  // retry the query (after backoff)
}

...