Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

The official Kafka documentation describes how the feature works and how to migrate offsets from ZooKeeper to Kafka. This wiki provides sample code that shows how to use the new Kafka-based offset storage mechanism.

Step 1: Discover and connect to the offset manager for a consumer group by issuing a consumer metadata request to any broker

Code Block
import kafka.api.*;
import kafka.cluster.Broker;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.ConsumerMetadataResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.network.BlockingChannel offsetChannel;
// suppose a query channel has been established to some broker
try {
  queryChannel;
import java.util.*;

...

    try {
        BlockingChannel channel = new BlockingChannel("localhost", 9092,
                BlockingChannel.UseDefaultBufferSize(),
                BlockingChannel.UseDefaultBufferSize(),
                5000 /* read timeout in millis */);
        channel.connect();
        final String MY_GROUP = "demoGroup";
        final String MY_CLIENTID = "demoClientId";
        int correlationId = 0;
        final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0);
        final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1);
        channel.send(new ConsumerMetadataRequest("myconsumergroup"MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
        ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(queryChannelchannel.readreceive().buffer());

        if (metadataResponse.errorCode() == ErrorMapping.NoError &&()) {
            Broker offsetManager = metadataResponse.coordinator != null) {
();
            // if the (metadataResponse.coordinator.host == queryChannel.host && metadataResponse.port == queryChannel.port)
coordinator is different, from the above channel's host then reconnect
          offsetChannel = queryChannelchannel.disconnect();
    else  {
      offsetChannelchannel = new BlockingChannel(coordinatoroffsetManager.host(), coordinatoroffsetManager.port(),
                                          BlockingChannel.UseDefaultBufferSize(),
                                          BlockingChannel.UseDefaultBufferSize(), socketTimeoutMs
                                          5000 /* read timeout in millis */);
         queryChannel.disconnect   channel.connect();
      }
  } else {
            // retry the query (after backoff)
        }
    }
    catch (IOException e) {
        // retry the query (after backoff)
    }

Step 2: Issue the OffsetCommitRequest or OffsetFetchRequest to the

...

offset manager

Code Block
// How to commit offsets:


        long now = System.currentTimeMillis();
        Map<TopicAndPartition, OffsetAndMetadata> offsets;
// populate the offsets map
OffsetCommitRequest ocRequest = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
        offsets.put(testPartition0, new OffsetAndMetadata(100L, "associated metadata", now));
        offsets.put(testPartition1, new OffsetAndMetadata(200L, "more metadata", now));
        OffsetCommitRequest commitRequest = new OffsetCommitRequest("myconsumergroup", offsets, correlationId, clientId);
try {
  offsetChannel.send(ocRequest);
  OffsetCommitResponse ocResponse
                MY_GROUP,
                offsets,
                correlationId++,
                MY_CLIENTID,
                (short) 1 /* version */); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper
        try {
            channel.send(commitRequest.underlying());
            OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(offsetChannelchannel.readreceive().buffer());
            if (commitResponse.hasError()) {
				for (partitionErrorCode: ocResponsecommitResponse.commitStatuserrors().values()) {
				    if (partitionErrorCode !== ErrorMapping.NoErrorOffsetMetadataTooLargeCode()) {
			  	        // You must reduce the size of the metadata if you can choose to either wish to retry
				    } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
				      channel.disconnect();
                      // Go to step 1 (offset manager has moved) and then retry the entire commit or retry only the failed portion of the commit
    }
   commit to the new offset manager
 				    } else {
				        // log and retry the commit
			        }
 	            }
			}
		}
		catch (IOException ioe) {
	        channel.disconnect();
		    // Go to step 1 and then retry the commit
	    }

// How to fetch offsets

		List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
        partitions.add(testPartition0);
        OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
                MY_GROUP,
                partitions,
                (short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
                correlationId,
                MY_CLIENTID);
		try {
	        channel.send(fetchRequest.underlying());
    	    OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
	        OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);
    	    short offsetFetchErrorCode = result.error();
            if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
				channel.disconnect();
				// Go to step 1 and retry the offset fetch
			} else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) {
			    // retry the offset fetch (after backoff)
  			} else {
	    	    long retrievedOffset = result.offset();
		        String retrievedMetadata = result.metadata();
			}
		}
		catch (IOException e) {
			channel.disconnect();
			// Go to step 1 and then retry offset fetch after backoff
		}