THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Step 2: Issue the OffsetCommitRequest or OffsetFetchRequest to the offset manager
Code Block |
---|
// How to commit offsets long now = System.currentTimeMillis(); Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>(); offsets.put(testPartition0, new OffsetAndMetadata(100L, "associated metadata", now)); offsets.put(testPartition1, new OffsetAndMetadata(200L, "more metadata", now)); OffsetCommitRequest commitRequest = new OffsetCommitRequest( MY_GROUP, offsets, correlationId++, MY_CLIENTID, (short) 1 /* version */); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper try { channel.send(commitRequest.underlying()); OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer()); if (commitResponse.hasError()) { for (partitionErrorCode: commitResponse.errors().values()) { if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) { // You must reduce the size of the metadata if you wish to retry } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) { channel.disconnect(); // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager } else { // log and retry the commit } } } } catch (IOException ioe) { channel.disconnect(); // Go to step 1 and then retry the commit } // How to fetch offsets List<TopicAndPartition> partitionList partitions = new ArrayList<TopicAndPartition>(); // populate partitionList with list of partitions for which we want to look up last committed consumer offset OffsetFetchRequest ofRequest partitions.add(testPartition0); OffsetFetchRequest fetchRequest = new OffsetFetchRequest(consumerGroupString, partitionList); try { offsetChannel.send(ofRequest); OffsetFetchResponse ofResponse MY_GROUP, partitions, (short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper correlationId, MY_CLIENTID); try { channel.send(fetchRequest.underlying()); OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(offsetChannelchannel.receive().buffer()); Short errorCode OffsetMetadataAndError result = ofResponsefetchResponse.offsets.values.iterator.next()().get(testPartition0); short offsetFetchErrorCode = result.error(); if (errorCodeoffsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) { offsetChannel channel.disconnect(); // Go to step 1 (offset manager has moved) and then retry the offset fetch to the new offset manager } else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) { // retry the offset fetch (after backoff) } else { long retrievedOffset = result.offset(); String retrievedMetadata = result.metadata(); } } catch (IOException ioee) { offsetChannel channel.disconnect(); // Go to step 1 and then retry the offset fetch after backoff } |