THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
BlockingChannel offsetChannel; // suppose a query channel has been established to some broker try { queryChannel.send(new ConsumerMetadataRequest("myconsumergroup")); ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(queryChannel.read().buffer()); if (metadataResponse.errorCode == ErrorMapping.NoError && metadataResponse.coordinator != null) { if (metadataResponse.coordinator.host == queryChannel.host && metadataResponse.port == queryChannel.port) offsetChannel = queryChannel; else { offsetChannel = new BlockingChannel(coordinator.host, coordinator.port, BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), socketTimeoutMs); queryChannel.disconnect(); } } else { // retry the query (after backoff) } } catch (IOException e) { // retry the query (after backoff) } |
Step 2: Issue the OffsetCommitRequest or OffsetFetchRequest to the coordinator
Code Block |
---|
// How to commit offsets:
Map<TopicAndPartition, OffsetAndMetadata> offsets;
// populate the offsets map
OffsetCommitRequest ocRequest = new OffsetCommitRequest("myconsumergroup", offsets, correlationId, clientId);
try {
offsetChannel.send(ocRequest);
OffsetCommitResponse ocResponse = OffsetCommitResponse.readFrom(offsetChannel.read().buffer());
for (partitionErrorCode: ocResponse.commitStatus.values()) {
if (partitionErrorCode != ErrorMapping.NoError) {
// you can choose to either retry the entire commit or retry only the failed portion of the commit
}
}
|