Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

 

Table of Contents

Goals

The main goal of this project is to have a thin consumer client design that can be easily adopted/reimplemented in non-Java languages. To achieve this we have decided to:

...

The Records returned by the poll() function will include metadata such as offset, partition, key and partition-key. This would also help removing the decompression/recompression in mirror maker (details can be found in KAFKA-1011).

 

Here is an exmaple of using the consumer API:

Code Block
Consumer

...

 consumer = new Consumer(props);
consumer.subscribe("topic");
long timeout = 1000;
while(true) {
  List<Record> messages = consumer.poll(timeout);
  process(messages);
  consumer.commit(true);
}

 

And the callback functions can be implemented as:

Code Block
void onPartitionDeassigned(partitions) {
  if (kafka.manage.offsets)
    commit(true);
  else
    offsets = getOffsets();
    // store offsets
}

void onPartitionAssigned(partitions) {
  offsets = new Map[(String, Int), Long]
  if (kafka.manage.offsets)
    offsets = getOffsets()
    assertEqual(offsets.keyset, partitions)
  else
    // read offsets from local disk, FS, database, etc..

  position(offsets) // ready to start fetching
}

 

Consumer Coordination Protocol Overview

Each consumer group will be assigned a consumer coordinator responsible for its membership management and offset management. Each broker will host a consumer coordinator so that the management of consumer groups will be distributed across all brokers. We follow the in-built offset management design described here to assign coordinators based on the "offset" topic partition leadership. That is, an "offset" topic will be used to store the offsets sent from each consumer group, and the partition id is based on the consumer group names. The leader of the partition that a consumer group's offsets commit to will be the coordinator of that group.

Since all the brokers cache the leadership for all the topic partitions, all brokers will know the coordinator for any given consumer groups. Upon receiving any requests from a consumer, the coordinator will only serve it if it is the assigned coordinator of the specified group this consumer belongs to; otherwise the request will be rejected.

 

For each consumer group, the coordinator stores the following information:

1) For each consumer group it currently serve, the group metadata containing:

  • Its subscribed topic list.
  • Group configs, including session timeout, etc.
  • Consumer metadata for each consumer in the group.
  • Current offsets for each consumed topic/partition.
  • Partition ownership metadata, including the consumer-assigned-partitions map, current group generation number, etc.

2) For each existing topic, a list of consumer groups that are currently subscribing to it.

 

In addition, for each consumer group the coordinator also maintains the group config and partition ownership metadata in ZK as a json object. This node will be updated upon rebalance completion and read upon coordinator failover (details below).

 

Consumer Startup (Subscription Change)

On consumer startup, it will usually first try to subscribe to a list of topics.

If it does not subscribe to any topics, then poll() will always return empty set; if it has subscribed to specific topic/partitions, then it does not need to communicate with the coordinator but just start fetching data (will talk about how to start fetching data below).

Only if it has changed its topic subscription (including from an empty set on startup), it will need to coordinate with the coordinators. If consumer has not connected to the coordinator, it will first try to find the current coordinator and setup the connection.

Another note is that in this proposal we do not require consumer's client id in the config but leave it as an optional config, when it is not specified a default id of will be used, and if group management is invoked the coordinator assign consumer ids upon them successfully joining the group.

Coordinator Discovery

The coordinator discovery is through the coordinator request, which can be answered by any alive brokers and whose response will contain the coordinator id based on consumer group.

 

-----

Each consumer group will be assigned a consumer coordinator responsible for its membership management and offset management. Each broker will host a consumer coordinator so that the management of consumer groups will be distributed across all brokers. We follow the in-built offset management design described here to assign coordinators based on the "offset" topic partition leadership. That is, an "offset" topic will be used to store the offsets sent from each consumer group, and the partition id is based on the consumer group names. The leader of the partition that a consumer group's offsets commit to will be the coordinator of that group.

Since all the brokers cache the leadership for all the topic partitions, all brokers will know the coordinator for any given consumer groups. Upon receiving any requests from a consumer, the coordinator will only serve it if it is the assigned coordinator of the specified group this consumer belongs to; otherwise the request will be rejected.

 

For each consumer group, the coordinator stores the following information:

1) For each consumer group it currently serve, the group metadata containing:

  • Its subscribed topic list.
  • Group configs, including session timeout, etc.
  • Consumer metadata for each consumer in the group.
  • Current offsets for each consumed topic/partition.
  • Partition ownership metadata, including the consumer-assigned-partitions map, current group generation number, etc.

2) For each existing topic, a list of consumer groups that are currently subscribing to it.

 

In addition, for each consumer group the coordinator also maintains the group config and partition ownership metadata in ZK as a json object. This node will be updated upon rebalance completion and read upon coordinator failover (details below).

 

Consumer Startup (Subscription Change)

On consumer startup, it will usually first try to subscribe to a list of topics.

If it does not subscribe to any topics, then poll() will always return empty set; if it has subscribed to specific topic/partitions, then it does not need to communicate with the coordinator but just start fetching data (will talk about how to start fetching data below).

Only if it has changed its topic subscription (including from an empty set on startup), it will need to coordinate with the coordinators. If consumer has not connected to the coordinator, it will first try to find the current coordinator and setup the connection.

Another note is that in this proposal we do not require consumer's client id in the config but leave it as an optional config, when it is not specified a default id of will be used, and if group management is invoked the coordinator assign consumer ids upon them successfully joining the group.

Coordinator Discovery

The coordinator discovery is through the coordinator request, which can be answered by any alive brokers and whose response will contain the coordinator id based on consumer group.

 

-----------------------------------------------------------------------------------

Any Broker (alive)                                                <-- (find_coordinator) -- Consumer 1

                                                                            -- (response) -->

                                                                            <-- (find_coordinator) -- Consumer 2

                                                                            -- (response) -->

------------------------------------------------------------------------------------------------

Any Broker (alive)                                                <-- (find_coordinator) -- Consumer 1

            

FindCoordinatorRequest

                                                               -- (response) -->

                                                                            <-- (find_coordinator) -- Consumer 2

                                                                            -- (response) -->

-----------------------------------------------------------------------------------

 

FindCoordinatorRequest

Code Block
{
  Version 
Code Block
{
  Version                         => int16
  CorrelationId          => int64
  ClientId               => String
  GroupId                => String
}

...

When the consumer has succesfully joined the group, it will keep sending the heartbeat request to the coordinator, this protocol is used both for consumer failure detection (by coordinator) and coordinator failure detection (by consumer).

Heartbeat

...

1) Consumers periodically send heartbeat request to the coordinator based on every session_timeout / heartbeat_frequency (which is an integer > 1).

...

Code Block
{
  Version                => int16
  CorrelationId          => int64
  offsets                => [TopicAndPartition -> OffsetAndError] 
}

 

 

Open Questions

Apart with the above proposal, we still have some open questions that worth discuss:

...