Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When starting up, the broker will send a fetch request to the controller, getting the state of the metadata log.  If the broker's local copy of the metadata log is too far behind, the broker will fetch a complete snapshot from the controller, as described in KIP-630.  Otherwise, the broker will fetch just what it needs.  Once it has caught up to the high water mark of the controller (or at least, what the controller's high water mark used to be), it will be able to remove the log directories which it no longer needs.

Broker

...

Registration and Heartbeats

Every distributed system needs a way of managing cluster membership.  Prior to KIP-500, Kafka brokers registered ephemeral znodes in order to register themselves as part of the cluster.  The Kafka controller passively consumed the registration information from Zookeeper.

In the post-KIP-500 world there is no ZooKeeper and no ephemeral znodes.  Instead, each broker sends a BrokerHeartbeat registers itself with the active controller using a BrokerRegistrationRequest. The active controller assigns the broker a new broker epoch, based on the latest committed offset in the log.  Subsequently, the broker sends a heartbeat request to the active controller every few seconds .

This heartbeat acts as a registration.  However, the controller has a choice about whether to accept it.  It will reject brokers whose metadata is too stale, or whose IDs have been claimed by another broker.  It will also reject brokers that do not support the minimum feature level of all KIP-584 features that are enabled.

When the broker accepts the registration, it grants or renews a broker ID lease associating the broker process with its ID.  Leases are time-bounded.

A broker cannot continue using a lease indefinitely after sending a single heartbeat.  When brokers are rejected by the controller, or otherwise unable to renew their lease before it expires, they enter the "fenced" state.

Fencing

to keep the registration active.

Broker registrations are time-bounded.  They only last for a certain amount of time.  This period is specified by the controller in its response.  Once the period has elapsed, if the broker has not renewed its registration via a heartbeat, it must re-register.

Unlike ZooKeeper, the active controller may choose not to accept a broker's registration.  For example, it will do this if the ID that the broker is trying to claim has already been claimed.  In that case, the broker should shut down rather than trying to claim a broker ID which is already in use.  Another reason that the controller can reject a registration is is if the broker doesn't support all of the KIP-584 feature levels that are currently enabled in the cluster.

Finally, the broker registration and heartbeat mechanism gives the broker a chance to specify its target state, and the controller to tell the broker what its actual state should be.

Fencing

Brokers that Brokers that don't have a broker ID lease are said to be "fenced."  When a broker is fenced, it cannot process any client requests.  This prevents brokers which are not receiving metadata updates or that are not receiving and processing them fast enough from causing issues to clients.

...

This state machine ties together a few of the previous sections, such as broker registration and shutdown.

The broker heartbeat always contains the broker's epoch, unless the broker is in the INITIAL state.  When the broker is in the INITIAL state, it is waiting for the controller to assign it a new broker epoch.

NOT_RUNNING(0)

The broker starts up in this state.

If it needs to recover log directories, it transitions to RECOVERING_FROM_UNCLEAN_SHUTDOWN.

Otherwise, it transitions to REGISTERING

On a control-C in this state, the broker process shuts down.

REGISTERING(1)

While in this state, the broker tries to register with the active controller.

If the active controller refuses the registration, the broker process shuts down.

Otherwise, the broker moves into the FENCED state.

On a control-C in this state, the broker process shuts down.

RECOVERING_FROM_UNCLEAN_SHUTDOWN(2)

While in this state, the broker is recovering log directories.

Once it is done, it transitions to REGISTERING.

On a control-C in this tate, the broker process shuts down.

RUNNING(3)

While in this state, the broker is accepting client requests and responding.

If the broker is unable to heartbeat to the active controller, it transitions to the FENCED state.

On a control-C in this state, the broker transitions into PENDING_CONTROLLED_SHUTDOWN.

FENCED(4)

While in this state, the broker does not respond to client requests.

It will try to re-register itself with the active  controller.  If it succeeds, it will go into the RUNNING state.

On a control-C in this state, the broker process shuts down.

PENDING_CONTROLLED_SHUTDOWN(6)

While in this state, the broker is trying to shut down gracefully.

The controller will move the leadership of partitions off of this broker.

Once the controller tells the broker it can shut down (in a heartbeat response), it will shut down.

Public Interfaces

Command-Line Tools

There will be a new command-line tool, kafka-storage.sh.

Code Block
kafka-storage.sh: inspect and modify kafka storage directories
subcommands:
    info: print out information about kafka storage directories
        -c|--config [path]: the configuration file to use
    format: format the kafka storage directories
        -c|--config [path]: the configuration file to use
Code Block
                        INITIAL
sent initial heartbeart,  | 
got back an epoch         |
                          V
                        ACTIVE -------------------------> SHUTDOWN
lease expired or revoked  |  ^                              ^
(stale metadata,          |  | lease restored               | controller granted
 id conflict,             |  | with new broker epoch        | controlled shutdown
 unable to communicate,   V  |              -f|--force: format even if the directories are          |
 etc.)not empty.
        -d|--directory [path]: format only a single directory, rather than  FENCED -----------------------------+

Public Interfaces

Command-Line Tools

There will be a new command-line tool, kafka-storage.sh.

Code Block
kafka-storage.sh: inspect and modify kafka storage directories
subcommands:
    info: print out information about kafka storage directories
        -c|--config [path]: the configuration file to use
    format: format the kafka storage directories
        -c|--config [path]: the configuration file to use
        -f|--force: format even if the directories are not empty.
        -d|--directory [path]: format only a single directory, rather than all directories

The info subcommand will display information about each directory, such as whether it exists, is formatted, etc.

The format subcommand will initialize the subdirectories.

A cluster configured in KIP-500 mode will contain the following line in the meta.properties file in each directory:

Code Block
kip.500.mode=enabled

If the storage directories in a node have not been properly formatted, the node will not be able to start up in KIP-500 mode.  On the other hand, formatting will continue to be optional for legacy mode.

Configurations

...

null

broker

controller

broker,controller

...

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

...

If non-null, this must be a comma-separated list of listener names.

When communicating with the controller quorum, the broker will always use the first listener in this list.

...

A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.  The "control plane listener" is used on brokers, not on controllers.

...

If non-null, this must be a comma-separated list of all the controller voters, in the format:

{controller-id}@{controller-host):{controller-port}

...

When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.

Note that this replaces the "quorum.voters" config described in KIP-595.

...

a 32-bit ID

...

The controller id for this server.  Only required if this server is a controller.

...

controller.quorum.election.timeout.ms

...

all directories

The info subcommand will display information about each directory, such as whether it exists, is formatted, etc.

The format subcommand will initialize the subdirectories.

A cluster configured in KIP-500 mode will contain the following line in the meta.properties file in each directory:

Code Block
kip.500.mode=enabled

If the storage directories in a node have not been properly formatted, the node will not be able to start up in KIP-500 mode.  On the other hand, formatting will continue to be optional for legacy mode.

Configurations

Configuration NamePossible ValuesDescription
process.roles

null

broker

controller

broker,controller

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

controller.listener.names

If non-null, this must be a comma-separated list of listener names.

When communicating with the controller quorum, the broker will always use the first listener in this list.

A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.  The "control plane listener" is used on brokers, not on controllers.

controller.connect

If non-null, this must be a comma-separated list of all the controller voters, in the format:

{controller-id}@{controller-host):{controller-port}

When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.

Note that this replaces the "quorum.voters" config described in KIP-595.

controller.id

a 32-bit ID

The controller id for this server.  Only required if this server is a controller.

broker.ida 32-bit IDThe broker id for this server.  Only required if this server is a broker.
registration.heartbeat.interval.ms2000The length of time between broker heartbeats.
registration.lease.timeout.ms18000The length of time that a broker lease lasts if no heartbeats are made.
metadata.log.dirIf set, this must be a path to a log directory.This configuration determines where we put the metadata log.  if it is not set, the metadata log is placed in the first log directory from log.dirs.
controller.quorum.fetch.timeout.msMaximum time without a successful fetch from the current leader before a new election is started.New name for quorum.fetch.timeout.ms

controller.quorum.election.timeout.ms

Maximum time without collected a majority of votes during the candidate state before a new election is retriedNew name for quorum.election.timeout.ms
controller.quorum.election.backoff.max.msMaximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered.New name for quorum.election.backoff.max.ms
controller.quorum.request.timeout.msMaximum time before a pending request is considered failed and the connection is droppedNew name for quorum.request.timeout.ms
controller.quorum.retry.backoff.msInitial delay between request retries. This config and the one below is used for retriable request errors or lost connectivity and are different from the election.backoff configs aboveNew name for quorum.retry.backoff.ms
controller.quorum.retry.backoff.max.msMax delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.msNew name for quorum.retry.backoff.max.ms

RPCs

Obsoleting the Metadata Propagation RPCs

As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller.  The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.

Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest.  These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode.  Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.

Obsoleting the Controlled Shutdown RPC

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

BrokerRegistration

Code Block
languagejs
{
  "apiKey": 57,
  "type": "request",
  "name": "BrokerRegistrationRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker epoch, or -1 if one has not yet been assigned." },
    { "name": "CurMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has reached." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "int16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
      ]
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
    

RPCs

Obsoleting the Metadata Propagation RPCs

As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller.  The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.

Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest.  These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode.  Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.

Obsoleting the Controlled Shutdown RPC

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

BrokerHeartbeat

As described earlier, the broker periodically sends out a heartbeat request to the active controller.

The listeners, features, and rack fields only need to be set  when the broker is in INITIAL state, and is requesting a new broker epoch.

Code Block
languagejs
{
  "apiKey": 50,
  "type": "request",
  "name": "BrokerHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "CurrentStateName", "type": "int8string", "versions": "0+", "mapKey": true,
          "about": "The currentfeature state that the broker is in." },
name." }
        { "name": "TargetStateMinSupportedVersion", "type": "int8int16", "versions": "0+",
          "about": "The stateminimum thatsupported the broker wants to reachfeature level." },
        { "name": "BrokerIdMaxSupportedVersion", "type": "int32int16", "versions": "0+",
          "about": "The maximum supported brokerfeature IDlevel." }
      ]
    },
    { "name": "BrokerEpochRack", "type": "int64string", "versions": "0+", "defaultnullableVersions": "-10+",
      "about": "The brokerrack epoch,which orthis -1broker ifis one has not yet been assigned." }in." }
  ]
}

{
  "apiKey": 57,
  "type": "response",
 { "name": "CurMetadataOffsetBrokerRegistrationResponse",
  "typevalidVersions": "int640",
  "versionsflexibleVersions": "0+",
      "aboutfields": "The highest metadata offset which the broker has reached." },[
    { "name": "ListenersThrottleTimeMs", "type": "[]Listenerint32", "nullableVersionsversions": "0+",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
    Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "NameErrorCode", "type": "stringint16", "versions": "0+", "mapKey": true,
          "about": "The name of the endpointerror code, or 0 if there was no error." },
        { "name": "HostActiveControllerId", "type": "stringint32", "versions": "0+",
          "about": "The hostname ID of the active controller, or -1 if the controller doesn't know." },
        { "name": "PortBrokerEpoch", "type": "int16int64", "versions": "0+", "default": "-1",
      "about": "The broker's assigned epoch, or -1 if "about": "The portnone was assigned." },
        { "name": "SecurityProtocolLeaseDurationMs", "type": "int16int64", "versions": "0+",
          "about": "The security protocol." }
      ]
    { If BrokerEpoch is not -1, the number of milliseconds that we want the lease to last." }
  ]
}

BrokerHeartbeat

As described earlier, the broker periodically sends out a heartbeat request to the active controller.

Code Block
languagejs
{
  "apiKey": 58,
 "name": "Features", "type": "[]Featurerequest",
  "nullableVersionsname": "0+BrokerHeartbeatRequest",
      "aboutvalidVersions": "The features on this broker0",
  "versionsflexibleVersions": "0+",
  "fields": [
        { "name": "NameTargetState", "type": "stringint8", "versions": "0+", "mapKey": true,
          "about": "The feature name state that the broker wants to reach." },
        { "name": "MinSupportedVersionBrokerId", "type": "int16int32", "versions": "0+",
          "about": "The minimum supported feature levelbroker ID." },
        { "name": "MaxSupportedVersionBrokerEpoch", "type": "int16int64", "versions": "0+",
          "aboutdefault": "The maximum supported feature level." }-1",
      ]
   "about": "The broker epoch." },
    { "name": "RackCurrentMetadataOffset", "type": "stringint64", "versions": "0+", "nullableVersions": "0+",
      "about": "The highest rackmetadata offset which thisthe broker ishas inreached." }
  ]
}   

{
  "apiKey": 5058,
  "type": "response",
  "name": "BrokerHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ActiveControllerId", "type": "int32", "versions": "0+",
      "about": "The ID of the active controller, or -1 if the controller doesn't know." },
    { "name": "NextState", "type": "int8", "versions": "0+",
      "about": "The state to which the broker should transition controller, or -1 if the controller doesn't know." },
    { "name": "BrokerEpochNextState", "type": "int64int8", "versions": "0+", "default": "-1",
      "about": "The broker'sstate assignedto epoch,which orthe -1broker if none was assignedshould transition." },
    { "name": "LeaseDurationMs", "type": "int64", "versions": "0+",
      "about": "If BrokerEpoch is not -1, the number of milliseconds that we want the lease to last." }
  ]
}

enum BrokerState {
    UNKNOWN(0),
    INITIAL(1),
    FENCED(2),
    ACTIVE(3),
    SHUTDOWN(4);
}

The controller will wait to unfence a broker until it has sent at least one heartbeat where that broker's currentState is active.  So a typical transition will look like this:

...