THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Step 1: Discover the offset manager for a consumer group by issuing a consumer metadata request to any broker
Code Block |
---|
import kafka.api.ConsumerMetadataRequest; ... BlockingChannel offsetChannel; BlockingChannel queryChannel; // establish the query channel to some broker try { queryChannel.send(new ConsumerMetadataRequest("myconsumergroup"consumerGroupString, ConsumerMetadataRequest.CurrentVersion(), correlationId, clientIdString)); ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(queryChannel.receive().buffer()); if (metadataResponse.errorCode() == ErrorMapping.NoError() && metadataResponse.coordinator() != null) { if (metadataResponse.coordinator().host() == queryChannel.host() && metadataResponse.coordinator().port() == queryChannel.port) offsetChannel = queryChannel; else { offsetChannel = new BlockingChannel(metadataResponse.coordinator().host(), metadataResponse.coordinator().port(), BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), socketTimeoutMs); queryChannel.disconnect(); } } else { // retry the query (after backoff) } } catch (IOException e) { // retry the query (after backoff) } |
...
Code Block |
---|
// How to commit offsets import kafka.javaapi.OffsetCommitRequest; ... Map<TopicAndPartition, OffsetAndMetadata> offsets; // populate the offsets map OffsetCommitRequest ocRequest = new OffsetCommitRequest("myconsumergroup"consumerGroupString, offsets, correlationId, clientIdString, clientId); 1 /* version */); /* note that version 0 commits offsets to ZooKeeper */ try { offsetChannel.send(ocRequest); OffsetCommitResponse ocResponse = OffsetCommitResponse.readFrom(offsetChannel.receive().buffer()); for (partitionErrorCode: ocResponse.errors().values()) { if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) { // You must reduce the size of the metadata if you wish to retry } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) { offsetChannel.disconnect(); // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager } else { error("Error in commit"); // retry the commit } } } catch (IOException ioe) { offsetChannel.disconnect(); // Go to step 1 and then retry the commit } // How to fetch offsets List<TopicAndPartition> partitionList; // populate partitionList with list of partitions for which we want to look up last committed consumer offset OffsetFetchRequest ofRequest = new OffsetFetchRequest("myconsumergroup"consumerGroupString, partitionList); try { offsetChannel.send(ofRequest); OffsetFetchResponse ofResponse = OffsetFetchResponse.readFrom(offsetChannel.receive().buffer()); Short errorCode = ofResponse.offsets.values.iterator.next().error(); if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode()) { offsetChannel.disconnect(); // Go to step 1 (offset manager has moved) and then retry the offset fetch to the new offset manager } else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) { // retry the offset fetch (after backoff) } } catch (IOException ioe) { offsetChannel.disconnect(); // Go to step 1 and then retry the offset fetch } |