THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
BlockingChannel offsetChannel; BlockingChannel queryChannel; // establish the query channel to some broker try { queryChannel.send(new ConsumerMetadataRequest("myconsumergroup")); ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(queryChannel.readreceive().buffer()); if (metadataResponse.errorCode() == ErrorMapping.NoError() && metadataResponse.coordinator() != null) { if (metadataResponse.coordinator().host() == queryChannel.host() && metadataResponse.coordinator().port() == queryChannel.port) offsetChannel = queryChannel; else { offsetChannel = new BlockingChannel(metadataResponse.coordinator().host(), metadataResponse.coordinator().port(), BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), socketTimeoutMs); queryChannel.disconnect(); } } else { // retry the query (after backoff) } } catch (IOException e) { // retry the query (after backoff) } |
...
Code Block |
---|
// How to commit offsets Map<TopicAndPartition, OffsetAndMetadata> offsets; // populate the offsets map OffsetCommitRequest ocRequest = new OffsetCommitRequest("myconsumergroup", offsets, correlationId, clientId); try { offsetChannel.send(ocRequest); OffsetCommitResponse ocResponse = OffsetCommitResponse.readFrom(offsetChannel.readreceive().buffer()); for (partitionErrorCode: ocResponse.commitStatuserrors().values()) { if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) { // You must reduce the size of the metadata if you wish to retry } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) { offsetChannel.disconnect(); // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager } else { error("Error in commit"); // retry the commit } } } catch (IOException ioe) { offsetChannel.disconnect(); // Go to step 1 and then retry the commit } // How to fetch offsets List<TopicAndPartition> partitionList; // populate partitionList with list of partitions for which we want to look up last committed consumer offset OffsetFetchRequest ofRequest = new OffsetFetchRequest("myconsumergroup", partitionList); try { offsetChannel.send(ofRequest); OffsetFetchResponse ofResponse = OffsetFetchResponse.readFrom(offsetChannel.readreceive().buffer()); Short errorCode = ofResponse.offsets.values.iterator.next().error(); if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode()) { offsetChannel.disconnect(); // Go to step 1 (offset manager has moved) and then retry the offset fetch to the new offset manager } else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) { // retry the offset fetch (after backoff) } } catch (IOException ioe) { offsetChannel.disconnect(); // Go to step 1 and then retry the offset fetch } |