Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

WARN: This is an obsolete design. The design that's implemented in Kafka 0.9.0 is described in this wiki.

 

...

Motivation

The motivation for moving to a new set of consumer client APIs with broker side co-ordination is laid out here.

...

The proposed consumer APIs are here. Several API usage examples are documented here.

Group management protocol

Consumer HowTo

This wiki has design details on the new consumer. This level of detail may be too verbose for people who are just trying to write a non-java consumer client using the new protocol. This wiki provides a step by step guide for writing a non-java 0.9 client.

Group management protocol

Rebalancing is the process where a group of consumer instances (belonging to the same groupRebalancing is the process where a group of consumer instances (belonging to the same group) co-ordinate to own a mutually exclusive set of partitions of topics that the group is subscribed to. At the end of a successful rebalance operation for a consumer group, every partition for all subscribed topics will be owned by a single consumer instance within the group. The way rebalancing works is as follows. Every broker is elected as the coordinator for a subset of the consumer groups. The co-ordinator broker for a group is responsible for orchestrating a rebalance operation on consumer group membership changes or partition changes for the subscribed topics. It is also responsible for communicating the resulting partition ownership configuration to all consumers of the group undergoing a rebalance operation.

...

Stopped consumption - In this state, the consumer stops consumption and commits offsets, until it joins the group again

Image RemovedImage Added

Co-ordinator

Following is a state diagram that describes the state changes on the co-ordinator for a particular group.

...

Code Block
{
  GroupId                => String
}

ConsumerMetadataResponse

Code Block
{
  ErrorCode              => int16
  Coordinator            => Broker
}

JoinGroupRequest

 

Code Block
{
  GroupId                	  => String
  SessionTimeout    

...

     	  => int32
  Topics    

...

 

...

  

...

          	  => 

...

JoinGroupRequest

 

Code Block
{
  GroupId[String]
  ConsumerId             	  => String 
 	 PartitionAssignmentStrategy => String
 }

JoinGroupResponse

Code Block
{
  ErrorCode SessionTimeout         	  => int32
  Topics => int16
  GroupGenerationId      => int32
  ConsumerId    	  => [String]
  ConsumerId    => String
  PartitionsToOwn      	  => String 
  PartitionAssignmentStrategy[TopicName [Partition]]
}
TopicName => String
 }
JoinGroupResponse
Partition => int32

 

HeartbeatRequest

Code Block
{
  

...

GroupId                => 

...

String
  GroupGenerationId      => int32

...

  ConsumerId             => String
}

HeartbeatResponse

Code Block
{
  ErrorCode 

...

             => 

...

int16
}
 

...

OffsetCommitRequest (v1)

Code Block

...

OffsetCommitRequest => ConsumerGroup GroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]]
  ConsumerGroup => 

...

string
  GroupGenerationId

...

 

...

=> int32

...

  ConsumerId => String
  TopicName => string
  Partition => int32
  Offset => 

...

HeartbeatResponse

Code Block
{
  ErrorCode              => int16
}

...

int64
  TimeStamp => int64
  Metadata => string

Configs

Server side configs

This list is still in progress

max.session.timeout - The session timeout for any group should not be higher than this value to reduce overhead on the brokers. If it does, the broker sends a SessionTimeoutTooHigh error code in the JoinGroupResponse

partition.assignment.strategies - Comma separated list of properties that map the strategy's friendly name to the class that implements the strategy. This is used for any strategy implemented by the user and released to the Kafka cluster. By default, Kafka will include a set of strategies that can be used by the consumer. The consumer specifies a partitioning strategy in the JoinGroupRequest. It must use the friendly name of the strategy or it will receive an UnknownPartitionAssignmentStrategyException

Client side configs

The client side configs can be found here

Wildcard Subscription

...