Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
BlockingChannel offsetChannel;
// suppose a query channel has been established to some broker
try {
  queryChannel.send(new ConsumerMetadataRequest("myconsumergroup"));
  ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(queryChannel.read().buffer());
  if (metadataResponse.errorCode == ErrorMapping.NoError && metadataResponse.coordinator != null) {
    if (metadataResponse.coordinator.host == queryChannel.host && metadataResponse.port == queryChannel.port)
      offsetChannel = queryChannel;
    else {
      offsetChannel = new BlockingChannel(coordinator.host, coordinator.port, BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), socketTimeoutMs);
      queryChannel.disconnect();
    }
  } else {
    // retry the query (after backoff)
  }
}
catch (IOException e) {
  // retry the query (after backoff)
}

Step 2: Issue the OffsetCommitRequest or OffsetFetchRequest to the coordinator

Code Block
// How to commit offsets:
Map<TopicAndPartition, OffsetAndMetadata> offsets;
// populate the offsets map
OffsetCommitRequest ocRequest = new OffsetCommitRequest("myconsumergroup", offsets, correlationId, clientId);
try {
  offsetChannel.send(ocRequest);
  OffsetCommitResponse ocResponse = OffsetCommitResponse.readFrom(offsetChannel.read().buffer());
  for (partitionErrorCode: ocResponse.commitStatus.values()) {
    if (partitionErrorCode != ErrorMapping.NoError) {
      // you can choose to either retry the entire commit or retry only the failed portion of the commit
    }
  }