Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
BlockingChannel offsetChannel;
BlockingChannel queryChannel;
// establish the query channel to some broker
try {
  queryChannel.send(new ConsumerMetadataRequest("myconsumergroup"));
  ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(queryChannel.readreceive().buffer());
  if (metadataResponse.errorCode() == ErrorMapping.NoError() && metadataResponse.coordinator() != null) {
    if (metadataResponse.coordinator().host() == queryChannel.host() && metadataResponse.coordinator().port() == queryChannel.port)
      offsetChannel = queryChannel;
    else {
      offsetChannel = new BlockingChannel(metadataResponse.coordinator().host(), metadataResponse.coordinator().port(),
                                          BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), socketTimeoutMs);
      queryChannel.disconnect();
    }
  } else {
    // retry the query (after backoff)
  }
}
catch (IOException e) {
  // retry the query (after backoff)
}

...

Code Block
// How to commit offsets

Map<TopicAndPartition, OffsetAndMetadata> offsets;
// populate the offsets map
OffsetCommitRequest ocRequest = new OffsetCommitRequest("myconsumergroup", offsets, correlationId, clientId);
try {
  offsetChannel.send(ocRequest);
  OffsetCommitResponse ocResponse = OffsetCommitResponse.readFrom(offsetChannel.readreceive().buffer());
  for (partitionErrorCode: ocResponse.commitStatuserrors().values()) {
    if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) {
      // You must reduce the size of the metadata if you wish to retry
    } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
      offsetChannel.disconnect();
      // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager
    } else {
      error("Error in commit");
      // retry the commit
    }
  }
}
catch (IOException ioe) {
  offsetChannel.disconnect();
  // Go to step 1 and then retry the commit
} 



// How to fetch offsets

List<TopicAndPartition> partitionList;
// populate partitionList with list of partitions for which we want to look up last committed consumer offset
OffsetFetchRequest ofRequest = new OffsetFetchRequest("myconsumergroup", partitionList);
try {
  offsetChannel.send(ofRequest);
  OffsetFetchResponse ofResponse = OffsetFetchResponse.readFrom(offsetChannel.readreceive().buffer());
  Short errorCode = ofResponse.offsets.values.iterator.next().error();
  if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
    offsetChannel.disconnect();
    // Go to step 1 (offset manager has moved) and then retry the offset fetch to the new offset manager
  } else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) {
    // retry the offset fetch (after backoff)
  }
}
catch (IOException ioe) {
  offsetChannel.disconnect();
  // Go to step 1 and then retry the offset fetch
}