THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
// How to commit offsets: Map<TopicAndPartition, OffsetAndMetadata> offsets; // populate the offsets map OffsetCommitRequest ocRequest = new OffsetCommitRequest("myconsumergroup", offsets, correlationId, clientId); try { offsetChannel.send(ocRequest); OffsetCommitResponse ocResponse = OffsetCommitResponse.readFrom(offsetChannel.read().buffer()); for (partitionErrorCode: ocResponse.commitStatus.values()) { if (partitionErrorCode !== ErrorMapping.NoErrorOffsetMetadataTooLargeCode) { // You must reduce the size of the metadata if you can choose to either wish to retry } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode) { offsetChannel.disconnect(); // Go to step 1 (offset manager has moved) and then retry the entire commit or retry only the failed portion of the commit } } commit to the new offset manager } else { error("Error in commit"); // retry the commit } } } catch (IOException ioe) { offsetChannel.disconnect(); // Go to step 1 and then retry the commit } // How to fetch offsets List<TopicAndPartition> partitionList; // populate partitionList with list of partitions for which we want to look up last committed consumer offset OffsetFetchRequest ofRequest = new OffsetFetchRequest(groupId, partitionList); try { offsetChannel.send(ofRequest); OffsetFetchResponse ofResponse = OffsetFetchResponse.readFrom(offsetChannel.read().buffer()); Short errorCode = ofResponse.offsets.values.iterator.next().error(); if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode) { offsetChannel.disconnect(); // Go to step 1 (offset manager has moved) and then retry the offset fetch to the new offset manager } else if (errorCode == ErrorMapping.OffsetsLoadInProgress) { // retry the offset fetch (after backoff) } } catch (IOException ioe) { offsetChannel.disconnect(); // Go to step 1 and then retry the offset fetch } |