THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
The official Kafka documentation describes how the feature works and how to migrate offsets from ZooKeeper to Kafka. This wiki provides sample (Scala) code that shows how to use the new Kafka-based offset storage mechanism. The javaapi package does not currently support committing offsets to Kafka.
Step 1: Discover and connect to the offset manager for a consumer group by issuing a consumer metadata request to any broker
Code Block |
---|
import kafka.api.*; import kafka.cluster.Broker; import kafka.common.OffsetAndMetadata; import kafka.common.OffsetMetadataAndError; import kafka.common.TopicAndPartition; import kafka.javaapi.ConsumerMetadataResponse; import kafka.javaapi.OffsetCommitRequest; import kafka.javaapi.OffsetCommitResponse; import kafka.javaapi.OffsetFetchRequest; import kafka.javaapi.OffsetFetchResponse; import kafka.network.BlockingChannel offsetChannel; BlockingChannel queryChannel; // establish the query channel to some broker try { queryChannel; import java.util.*; ... try { BlockingChannel channel = new BlockingChannel("localhost", 9092, BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 5000 /* read timeout in millis */); channel.connect(); final String MY_GROUP = "demoGroup"; final String MY_CLIENTID = "demoClientId"; int correlationId = 0; final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0); final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1); channel.send(new ConsumerMetadataRequest("myconsumergroup"MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, "myClientId"MY_CLIENTID)); ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(queryChannelchannel.receive().buffer()); if (metadataResponse.errorCode() == ErrorMapping.NoError()) && { Broker offsetManager = metadataResponse.coordinator() != null) { ; // if (metadataResponse.coordinator().host() == queryChannel.host() && metadataResponse.coordinator().port() == queryChannel.port) the coordinator is different, from the above channel's host then reconnect offsetChannel = queryChannelchannel.disconnect(); else { offsetChannelchannel = new BlockingChannel(metadataResponse.coordinator()offsetManager.host(), metadataResponse.coordinator()offsetManager.port(), BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), socketTimeoutMs); 5000 /* read timeout in millis */); queryChannelchannel.disconnectconnect(); } } else { // retry the query (after backoff) } } catch (IOException e) { // retry the query (after backoff) } |
Step 2: Issue the OffsetCommitRequest or OffsetFetchRequest to the offset manager
Code Block |
---|
// How to commit offsets long now = System.currentTimeMillis(); Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>(); offsets; // populate the offsets map OffsetCommitRequest ocRequest.put(testPartition0, new OffsetAndMetadata(100L, "associated metadata", now)); offsets.put(testPartition1, new OffsetAndMetadata(200L, "more metadata", now)); OffsetCommitRequest commitRequest = new OffsetCommitRequest("myconsumergroup", offsets, correlationId, clientId); try { offsetChannel.send(ocRequest); OffsetCommitResponse ocResponse MY_GROUP, offsets, correlationId++, MY_CLIENTID, (short) 1 /* version */); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper try { channel.send(commitRequest.underlying()); OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(offsetChannelchannel.receive().buffer()); if (commitResponse.hasError()) { for (partitionErrorCode: ocResponsecommitResponse.errors().values()) { if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) { // You must reduce the size of the metadata if you wish to retry } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) { offsetChannelchannel.disconnect(); // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager } else { // log and error("Error in commit"); retry the commit } // retry the commit } } } catch (IOException ioe) { offsetChannel channel.disconnect(); // Go to step 1 and then retry the commit } } // How to fetch offsets List<TopicAndPartition> partitionList partitions = new ArrayList<TopicAndPartition>(); // populate partitionList with list of partitions for which we want to look up last committed consumer offset OffsetFetchRequest ofRequest partitions.add(testPartition0); OffsetFetchRequest fetchRequest = new OffsetFetchRequest("myconsumergroup", partitionList); try { offsetChannel.send(ofRequest); OffsetFetchResponse ofResponse MY_GROUP, partitions, (short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper correlationId, MY_CLIENTID); try { channel.send(fetchRequest.underlying()); OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(offsetChannelchannel.receive().buffer()); ShortOffsetMetadataAndError errorCoderesult = ofResponsefetchResponse.offsets.values.iterator.next()().get(testPartition0); short offsetFetchErrorCode = result.error(); if (errorCodeoffsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) { offsetChannel channel.disconnect(); // Go to step 1 (offset manager has moved) and then retry the offset fetch to the new offset manager } else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) { // retry the offset fetch (after backoff) } else { long retrievedOffset = result.offset(); String retrievedMetadata = result.metadata(); } } catch (IOException ioee) { offsetChannel channel.disconnect(); // Go to step 1 and then retry the offset fetch after backoff } |