Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

------------------------------------------------------------------------------------------------

Consumer Failure Handling

When the coordinator thinks a consumer has failed, it will do the following:

  1. Remove the consumer from the group metadata.
  2. Trigger the rebalance process.

 

Coordinator Failure Handling

When the consumer thinks its current coordinator has failed, it will do the following:

  1. Refresh its metadata from the existing known brokers to get the new coordinator information (if metadata request failed/timeout then retry until success).
  2. Set up the connection to the new coordinator.
  3. Start sending heartbeats to the coordinator.

 

----------------------------------------- 1/2/3 ------------------------------------------

Coordinator (failed)                                                                 <-- (ping) --    Consumer 1 (alive)

                                                                                               (Have not heard back from coordinator, try to reconnect to the new coordinator)

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               (Have not heard back from coordinator, try to reconnect to the new coordinator)

 

Broker (alive)                                                                         <-- (metadata) -- Consumer 1

                                                                                              -- (response) -->

                                                                                              <-- (metadata) -- Consumer 2

                                                                                              -- (response) -->

 

Coordinator (new)                                                                  <-- (ping) --    Consumer 1 (alive)

                                                                                               -- (response) -->

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               -- (response) -->

PingRequest

 

Code Block
themeEclipse
languageactionscript3
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerId             => String
}

 

PingResponse

Code Block
themeEclipse
languageactionscript3
{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16
}

Consumer Failure Handling

When the coordinator thinks a consumer has failed, it will do the following:

  1. Remove the consumer from the group metadata.
  2. Trigger the rebalance process.

 

Coordinator Failure Handling

When the consumer thinks its current coordinator has failed, it will do the following:

  1. Refresh its metadata from the existing known brokers to get the new coordinator information (if metadata request failed/timeout then retry until success).
  2. Set up the connection to the new coordinator.
  3. Start sending heartbeats to the coordinator.

 

----------------------------------------- 1/2/3 ------------------------------------------

Coordinator (failed)                                                                 <-- ------------------------------------

 

Consumer Startup (Subscription Change)

On consumer startup, it will first try to subscribe to a list of topics; and similarly when the consumer changes its subscription list, it can be treated as a consumer restart.

Therefore, these two process can be merged as one behavior that is triggered by subscription change (from possibly empty).

If consumer has not connected to the coordinator, it will first try to find the current coordinator from metada refresh and setup the connection.

Coordinator Discovery

The coordinator discovery is through the metadata request, which currently contains the 1) cluster information and the 2) leader information for specified topics. Will add a third piece of metadata in the response, which is the coordinator id based on consumer group.

----------------------------------------- 1 ------------------------------------------

Any Broker                                                                             <-- (metadata) --     Consumer 1 (alive)

                                                                                               -- (response: {cluster, leaders, coordinator}) -->

(ping) --    Consumer 1 (alive)

                                                                                               (Have not heard back from coordinator, try to reconnect to the new coordinator)

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               (Have not heard back from coordinator, try to reconnect to the new coordinator)

 

Broker (alive)                                                                         <-- (metadata) -- Consumer 1

                                                                                              -- (response) -->

                                                                                              <-- (metadata) -- Consumer 2

                                                                                              -- (response) -->

 

Coordinator (new)                                                                  <-- (ping) --    Consumer 1 (alive)

                                                                                               -- (response) -->

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               -- (response) -->

-------------------------------------------------------------------------------------------

Consumer Startup (Subscription Change)

On consumer startup, it will first try to subscribe to a list of topics; and similarly when the consumer changes its subscription list, it can be treated as a consumer restart.

Therefore, these two process can be merged as one behavior that is triggered by subscription change (from possibly empty).

If consumer has not connected to the coordinator, it will first try to find the current coordinator from metada refresh and setup the connection.

Coordinator Discovery

The coordinator discovery is through the metadata request, which currently contains the 1) cluster information and the 2) leader information for specified topics. Will add a third piece of metadata in the response, which is the coordinator id based on consumer group.

----------------------------------------- 1 ------------------------------------------

Any Broker                                                                             <-- (metadata) --     Consumer 1 (alive)

                                                                                               -- (response: {cluster, leaders, coordinator}) -->

---------------------------------------------------------------------------------------

MetadataRequest

 

Code Block
themeEclipse
languageactionscript3
{
  Version                => int16
  CorrelationId          => int64
  ClientId               => String
  Topics                 => [String]
  GroupIds               => [String]
}

 

This MetadataRequest inherits from the TopicMetadataRequest in 0.8.

MetadataResponse

Code Block
themeEclipse
languageactionscript3
{
  Version                => int16
  CorrelationId          => int64
  Brokers                => [Broker]
  TopicsMetadata         => [TopicMetadata]
  CoordinatorIds         => [int16]
}

 ---------------------------------------------------------------------------------------

Consumer (re)Join-Group

If the consumer 1) did not know which partitions to consumer (since it has just started, for example), 2) realized a topic change either from the metadata response or from subscription change. It will send a join request to the coordinator.

...

 

Code Block
themeEclipse
languageactionscript3
{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16
  Generation             => int16
  GroupId                => String 
  ConsumerId             => String
  PartitionsToOwn        => [<TopicAndPartition>]
}
Note that the consumer

 

 

...

id is assigned by the coordinator upon joining the group, which is then used in heartbeat protocol and offset management.

Consumer Group Rebalance

The coordinator will trigger rebalance under the following conditions:

...