Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The official Kafka documentation describes how the feature works and how to migrate offsets from ZooKeeper to Kafka. This wiki provides sample code that shows how to use the new Kafka-based offset storage mechanism.

Step 1: Discover the offset manager for a consumer group by issuing a consumer metadata request to any broker

Code Block
BlockingChannel offsetChannel;
BlockingChannel queryChannel;
// supposeestablish athe query channel has been established to some broker
try {
  queryChannel.send(new ConsumerMetadataRequest("myconsumergroup"));
  ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(queryChannel.read().buffer());
  if (metadataResponse.errorCode() == ErrorMapping.NoError() && metadataResponse.coordinator() != null) {
    if (metadataResponse.coordinator().host() == queryChannel.host() && metadataResponse.coordinator().port() == queryChannel.port)
      offsetChannel = queryChannel;
    else {
      offsetChannel = new BlockingChannel(metadataResponse.coordinator().host(), metadataResponse.coordinator().port(), BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), socketTimeoutMs);
      queryChannel.disconnect();
    }
  } else {
    // retry the query (after backoff)
  }
}
catch (IOException e) {
  // retry the query (after backoff)
}

Step 2: Issue the OffsetCommitRequest or OffsetFetchRequest to the

...

offset manager

Code Block
// How to commit offsets

Map<TopicAndPartition, OffsetAndMetadata> offsets;
// populate the offsets map
OffsetCommitRequest ocRequest = new OffsetCommitRequest("myconsumergroup", offsets, correlationId, clientId);
try {
  offsetChannel.send(ocRequest);
  OffsetCommitResponse ocResponse = OffsetCommitResponse.readFrom(offsetChannel.read().buffer());
  for (partitionErrorCode: ocResponse.commitStatus.values()) {
    if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode) {
      // You must reduce the size of the metadata if you wish to retry
    } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode) {
      offsetChannel.disconnect();
      // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager
    } else {
      error("Error in commit");
      // retry the commit
    }
  }
}
catch (IOException ioe) {
  offsetChannel.disconnect();
  // Go to step 1 and then retry the commit
} 



// How to fetch offsets

List<TopicAndPartition> partitionList;
// populate partitionList with list of partitions for which we want to look up last committed consumer offset
OffsetFetchRequest ofRequest = new OffsetFetchRequest("myconsumergroup", partitionList);
try {
  offsetChannel.send(ofRequest);
  OffsetFetchResponse ofResponse = OffsetFetchResponse.readFrom(offsetChannel.read().buffer());
  Short errorCode = ofResponse.offsets.values.iterator.next().error();
  if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode) {
    offsetChannel.disconnect();
    // Go to step 1 (offset manager has moved) and then retry the offset fetch to the new offset manager
  } else if (errorCode == ErrorMapping.OffsetsLoadInProgress) {
    // retry the offset fetch (after backoff)
  }
}
catch (IOException ioe) {
  offsetChannel.disconnect();
  // Go to step 1 and then retry the offset fetch
}