Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
// How to commit offsets:

Map<TopicAndPartition, OffsetAndMetadata> offsets;
// populate the offsets map
OffsetCommitRequest ocRequest = new OffsetCommitRequest("myconsumergroup", offsets, correlationId, clientId);
try {
  offsetChannel.send(ocRequest);
  OffsetCommitResponse ocResponse = OffsetCommitResponse.readFrom(offsetChannel.read().buffer());
  for (partitionErrorCode: ocResponse.commitStatus.values()) {
    if (partitionErrorCode !== ErrorMapping.NoErrorOffsetMetadataTooLargeCode) {
      // You must reduce the size of the metadata if you can choose to either wish to retry
    } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode) {
      offsetChannel.disconnect();
      // Go to step 1 (offset manager has moved) and then retry the entire commit or retry only the failed portion of the commit
    }
  } commit to the new offset manager
    } else {
      error("Error in commit");
      // retry the commit
    }
  }
}
catch (IOException ioe) {
  offsetChannel.disconnect();
  // Go to step 1 and then retry the commit
} 



// How to fetch offsets

List<TopicAndPartition> partitionList;
// populate partitionList with list of partitions for which we want to look up last committed consumer offset
OffsetFetchRequest ofRequest = new OffsetFetchRequest(groupId, partitionList);
try {
  offsetChannel.send(ofRequest);
  OffsetFetchResponse ofResponse = OffsetFetchResponse.readFrom(offsetChannel.read().buffer());
  Short errorCode = ofResponse.offsets.values.iterator.next().error();
  if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode) {
    offsetChannel.disconnect();
    // Go to step 1 (offset manager has moved) and then retry the offset fetch to the new offset manager
  } else if (errorCode == ErrorMapping.OffsetsLoadInProgress) {
    // retry the offset fetch (after backoff)
  }
}
catch (IOException ioe) {
  offsetChannel.disconnect();
  // Go to step 1 and then retry the offset fetch
}