...
Code Block |
---|
$ cat /tmp/kafka-logs/meta.properties # #Tue Dec 01 10:08:08 PST 2020 cluster.id=3Db5QLSqSZieL3rJBUUegA version=1 broker.id=0 |
...
kafka-storage.sh
There will be a new command-line tool, kafka-storage.sh.
...
Code Block |
---|
$ ./bin/kafka-storage.sh random-uuid 51380268-1036-410d-a8fc-fb3b55f48033 |
Configurations
...
null
broker
controller
broker,controller
...
If this is null (absent) then we are in legacy mode.
Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.
...
If non-null, this must be a comma-separated list of listener names.
When communicating with the controller quorum, the broker will always use the first listener in this list.
...
A comma-separated list of the names of the listeners used by the KIP-500 controller. This configuration is required if this process is a KIP-500 controller. The legacy controller will not use this configuration
Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.
...
A comma-separated list of the configured listeners. For example,
INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094
...
If non-null, this must be a comma-separated list of all the controller voters, in the format:
{controller-id}@{controller-host):{controller-port}
...
When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.
Note that this replaces the "quorum.voters" config described in KIP-595.
This configuration is required for both brokers and controllers.
...
a 32-bit ID
...
The controller id for this server. Only required if this server is a controller.
...
controller.quorum.election.timeout.ms
...
kafka-cluster.sh
There will also be new command-line tool named kafka-cluster.sh.
Code Block |
---|
$ ./bin/kafka-cluster.sh -h
usage: kafka-cluster [-h] {id,decommission} ...
The Kafka cluster tool.
positional arguments:
{cluster-id,decommission}
cluster-id Get information about the ID of a cluster.
decommission Decommission a broker ID.
optional arguments:
-h, --help show this help message and exit |
kafka-storage.sh will have two subcommands: id and decommission.
cluster-id
Code Block |
---|
$ ./bin/kafka-cluster.sh cluster-id -h
usage: kafka-cluster cluster-id [-b,-c,-h]
optional arguments:
-b, --bootstrap-server a list of host/port pairs to use for establishing the connection to the kafka cluster.
-c, --config a property file containing configs to be passed to Admin Client.
-h, --help show this help message and exit |
The ID command prints out the cluster id
decommission
Code Block |
---|
$ ./bin/kafka-cluster.sh decommission -h
usage: kafka-cluster decommission [-b,-c,-h,-i]
optional arguments:
-b, --bootstrap-server a list of host/port pairs to use for establishing the connection to the kafka cluster.
-c, --config a property file containing configs to be passed to Admin Client.
-f, --force true if we should force the decommissioning even if this broker is the only replica for some partitions.
-h, --help show this help message and exit
-i, --id the ID of the broker to decommission |
The decommission command removes the registration of a specific broker ID. Note that this will also remove the broker ID from all replica assignments.
Configurations
Configuration Name | Possible Values | Notes |
---|---|---|
process.roles | null broker controller broker,controller | If this is null (absent) then we are in legacy mode. Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both. |
controller.listener.names | If non-null, this must be a comma-separated list of listener names. When communicating with the controller quorum, the broker will always use the first listener in this list. | A comma-separated list of the names of the listeners used by the KIP-500 controller. This configuration is required if this process is a KIP-500 controller. The legacy controller will not use this configuration Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291. |
listeners | A comma-separated list of the configured listeners. For example, INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094 | This configuration is now required. |
controller.connect | If non-null, this must be a comma-separated list of all the controller voters, in the format: {controller-id}@{controller-host):{controller-port} | When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum. Note that this replaces the "quorum.voters" config described in KIP-595. This configuration is required for both brokers and controllers. |
controller.id | a 32-bit ID | The controller id for this server. Only required if this server is a controller. |
broker.id | a 32-bit ID | The broker id for this server. Only required if this server is a broker. |
initial.broker.registration.timeout.ms | 60000 | When initially registering with the controller quorum, the number of milliseconds to wait before declaring failure and exiting the broker process. |
broker.heartbeat.interval.ms | 3000 | The length of time between broker heartbeats. |
broker.registration.timeout.ms | 18000 | The length of time that a broker lease lasts if no heartbeats are made. |
metadata.log.dir | If set, this must be a path to a log directory. | This configuration determines where we put the metadata log. if it is not set, the metadata log is placed in the first log directory from log.dirs. |
controller.quorum.fetch.timeout.ms | Maximum time without a successful fetch from the current leader before a new election is started. | New name for quorum.fetch.timeout.ms |
controller.quorum.election.timeout.ms | Maximum time without collected a majority of votes during the candidate state before a new election is retried | New name for quorum.election.timeout.ms |
controller.quorum.election.backoff.max.ms | Maximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered. | New name for quorum.election.backoff.max.ms |
controller.quorum.request.timeout.ms | Maximum time before a pending request is considered failed and the connection is dropped | New name for quorum.request.timeout.ms |
controller.quorum.retry.backoff.ms | Initial delay between request retries. This config and the one below is used for retriable request errors or lost connectivity and are different from the election.backoff configs above | New name for quorum.retry.backoff.ms |
controller.quorum.retry.backoff.max.ms | Max delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.ms | New name for quorum.retry.backoff.max.ms |
Deprecated Configurations
Configuration Name | Reason |
---|---|
control.plane.listener.name | We no longer need to maintain a separate listener for messages from the controller, since the controller does not send messages out any more (it receives them). |
broker.id.generation.enable | Automatic broker ID generation is no longer supported. |
zookeeper.* | We no longer need configurations for ZooKeeper. |
New Error Codes
DUPLICATE_BROKER_REGISTRATION
There will be a new error code, DUPLICATE_BROKER_REGISTRATION, that the active controller will return when a broker tries to register with an ID that is currently in use.
INVALID_CLUSTER_ID
There will be a new error code, INVALID_CLUSTER_ID, that the controller will return if the broker tries to register with the wrong cluster ID.
RPCs
Obsoleting the Metadata Propagation RPCs
As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller. The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.
Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest. These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode. Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.
Obsoleting the Controlled Shutdown RPC
The broker heartbeat mechanism replaces the controlled shutdown RPC. Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.
Topic Identifiers
This KIP builds on top of the work done in KIP-516 to improve how partitions are tracked.
Because each topic is identified by a unique topic UUID, we can implement topic deletion with a single record, RemoveTopicRecord. Upon replaying this record, each broker will delete the associated topic if it is present.
Of course, some brokers may be down when the topic is deleted. In fact, some brokers may never see the RemoveTopicRecord. This record may get collapsed into one of the periodic metadata snapshots. If this happens, the record will be reflected in the snapshot through the absence of a broker record, not its presence. Therefore, during the startup process, brokers must compare the log directories that they have with the ones contained in the latest metadata. The appropriate time to do this is at the start of the RECOVERY phase. At this point, the broker has the latest metadata.
BrokerRegistration
Code Block | ||
---|---|---|
| ||
{
"apiKey": 57,
"type": "request",
"name": "BrokerRegistrationRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+",
"about": "The broker ID." },
{ "name": "ClusterId", "type": "uuid", "versions": "0+",
"about": "The cluster id of the broker process." },
{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
"about": "The incarnation id of the broker process." },
{ "name": "CurMetadataOffset", "type": "int64", "versions": "0+",
"about": "The highest metadata offset which the broker has reached." },
{ "name": "Listeners", "type": "[]Listener",
"about": "The listeners of this broker", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
"about": "The name of the endpoint." },
{ "name": "Host", "type": "string", "versions": "0+",
"about": "The hostname." },
|
Deprecated Configurations
...
New Error Codes
DUPLICATE_BROKER_REGISTRATION
There will be a new error code, DUPLICATE_BROKER_REGISTRATION, that the active controller will return when a broker tries to register with an ID that is currently in use.
INVALID_CLUSTER_ID
There will be a new error code, INVALID_CLUSTER_ID, that the controller will return if the broker tries to register with the wrong cluster ID.
RPCs
Obsoleting the Metadata Propagation RPCs
As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller. The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.
Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest. These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode. Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.
Obsoleting the Controlled Shutdown RPC
The broker heartbeat mechanism replaces the controlled shutdown RPC. Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.
Topic Identifiers
This KIP builds on top of the work done in KIP-516 to improve how partitions are tracked.
Because each topic is identified by a unique topic UUID, we can implement topic deletion with a single record, RemoveTopicRecord. Upon replaying this record, each broker will delete the associated topic if it is present.
Of course, some brokers may be down when the topic is deleted. In fact, some brokers may never see the RemoveTopicRecord. This record may get collapsed into one of the periodic metadata snapshots. If this happens, the record will be reflected in the snapshot through the absence of a broker record, not its presence. Therefore, during the startup process, brokers must compare the log directories that they have with the ones contained in the latest metadata. The appropriate time to do this is at the start of the RECOVERY phase. At this point, the broker has the latest metadata.
BrokerRegistration
Code Block | ||
---|---|---|
| ||
{ "apiKey": 57, "type": "request", "name": "BrokerRegistrationRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "BrokerIdPort", "type": "int32int16", "versions": "0+", "about": "The broker IDport." }, { "name": "ClusterIdSecurityProtocol", "type": "uuidint16", "versions": "0+", "about": "The cluster id of the broker process." }, security protocol." } ] { "name": "IncarnationIdFeatures", "type": "uuid[]Feature", "versions": "0+", "about": "The incarnationfeatures idon of thethis broker", process."versions": }, "0+", "fields": [ { "name": "CurMetadataOffsetName", "type": "int64string", "versions": "0+", "mapKey": true, "about": "The highestfeature metadata offset which the broker has reached." }, { "namename." } { "name": "MinSupportedVersion", "type": "Listenersint16", "typeversions": "[]Listener0+", "about": "The listenersminimum ofsupported thisfeature brokerlevel.", "versions": "0+", "fields": [ }, { "name": "NameMaxSupportedVersion", "type": "stringint16", "versions": "0+", "mapKey": true, "about": "The namemaximum ofsupported thefeature endpointlevel." }, ] }, { "name": "HostRack", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The hostname rack which this broker is in." }, ] } { "apiKey": 57, { "nametype": "Portresponse", "typename": "int16BrokerRegistrationResponse", "versionsvalidVersions": "0+", "aboutflexibleVersions": "The port." }0+", "fields": [ { "name": "SecurityProtocolThrottleTimeMs", "type": "int16int32", "versions": "0+", "about": "TheDuration securityin protocol." } ] { "name": "Features", "type": "[]Feature", "about": "The features on this broker", "versions": "0+", "fields": [ milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "NameErrorCode", "type": "stringint16", "versions": "0+", "mapKey": true, "about": "The feature name error code, or 0 if there was no error." }, { "name": "MinSupportedVersionBrokerEpoch", "type": "int16int64", "versions": "0+", "default": "-1", "about": "The broker's assigned epoch, or -1 if none was assigned." } ] } |
BrokerHeartbeat
As described earlier, the broker periodically sends out a heartbeat request to the active controller.
Code Block | ||
---|---|---|
| ||
{ "apiKey": 58, "abouttype": "The minimum supported feature level." }, { request", "name": "MaxSupportedVersionBrokerHeartbeatRequest", "typevalidVersions": "int160", "versionsflexibleVersions": "0+", "about": "The maximum supported feature level." } ] },"fields": [ { "name": "RackBrokerId", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "about": "The rack which this broker is inID." }, ] } { "apiKey { "name": 57"BrokerEpoch", "type": "responseint64", "nameversions": "BrokerRegistrationResponse0+", "validVersionsdefault": "0-1", "flexibleVersionsabout": "0+", The broker "fieldsepoch.": [}, { "name": "ThrottleTimeMsCurrentMetadataOffset", "type": "int32int64", "versions": "0+", "about": "DurationOne in milliseconds for whichmore than the requesthighest wasmetadata throttled due to a quota violation, or zero ifoffset which the requestbroker did not violate any quotahas reached." }, { "name": "ErrorCodeShouldFence", "type": "int16bool", "versions": "0+", "about": "TheTrue errorif code,the orbroker 0wants ifto therebe wasfenced, nofalse errorotherwise." }, { "name": "BrokerEpochShouldShutDown", "type": "int64", "versions": "0+bool", "defaultversions": "-10+", "about": "TheTrue broker'sif assignedthe epoch,broker orwants -1to ifinitiate nonecontrolled was assignedshutdown." } ] } |
BrokerHeartbeat
As described earlier, the broker periodically sends out a heartbeat request to the active controller.
Code Block | ||
---|---|---|
| ||
{ "apiKey": 58, "type": "requestresponse", "name": "BrokerHeartbeatRequestBrokerHeartbeatResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "BrokerId", "type": "int32", "versions": "0+", "about": "The broker ID": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "BrokerEpochErrorCode", "type": "int64int16", "versions": "0+", "default": "-1", "about": "The broker epoch error code, or 0 if there was no error." }, { "name": "CurrentMetadataOffsetIsCaughtUp", "type": "int64bool", "versions": "0+", "about": "OneTrue if morethe thanbroker thehas highestapproximately metadatacaught offsetup whichwith the brokerlatest has reachedmetadata." }, { "name": "ShouldFenceIsFenced", "type": "bool", "versions": "0+", "about": "True if the broker wants to beis fenced, false otherwise." }, { "name": "ShouldShutDownControlledShutdownOk", "type": "bool", "versions": "0+", "about": "True if the broker wantscan toexecute initiatea controlled shutdown now." } ] } ] } |
The controller will wait to unfence a broker until it sends a heartbeat where ShouldFence is false and CurrentMetadataOffset is caught up.
If the heartbeat request has ShouldShutDown set, the controller will try to move all the leaders off of the broker.
The controller will set ControlledShutdownOk if the broker is cleared to execute a controlled shutdown. In other words, if it has no leaderships.
The controller will return NOT_CONTROLLER if it is not active. Brokers will always return NOT_CONTROLLER for these RPCs.
DecommissionBroker
The DecomissionBrokerRequest asks the controller to unregister a broker from the cluster.
Code Block | ||
---|---|---|
| ||
{ "apiKey": 5859, "type": "responserequest", "name": "BrokerHeartbeatResponseDecommissionBrokerRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, "fields": [ { "name": "ErrorCodeBrokerId", "type": "int16int32", "versions": "0+", "about": "The errorbroker code,ID or 0 if there was no errorto decommission." }, { "name": "IsCaughtUpForce", "type": "bool", "versions": "0+", "default": "false", "about": "True if we should force the decommissioning even if this broker has approximately caught up with the latest metadata." }, is the only replica for some partitions." } ] } { "apiKey": 59, "type": "response", "name": "DecommissionBrokerResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "IsFencedThrottleTimeMs", "type": "boolint32", "versions": "0+", "about": "TrueDuration in milliseconds for which the request was throttled due to a quota violation, or zero if the broker is fenced request did not violate any quota." }, { "name": "ControlledShutdownOkErrorCode", "type": "boolint16", "versions": "0+", "about": "TrueThe iferror thecode, brokeror can0 executeif athere controlledwas shutdownno nowerror." }, ] } |
The controller will wait to unfence a broker until it sends a heartbeat where ShouldFence is false and CurrentMetadataOffset is caught up.
If the heartbeat request has ShouldShutDown set, the controller will try to move all the leaders off of the broker.
The controller will set ControlledShutdownOk if the broker is cleared to execute a controlled shutdown. In other words, if it has no leaderships.
...
valid response codes are:
- NONE if the decommissioning succeeded
- NOT_CONTROLLER if the node that the request was sent to is not the controller
- UNSUPPORTED_VERSION if KIP-500 mode is not enabled
- BROKER_NOT_AVAILABLE if the given broker ID is not registered.
- BROKER_REQUIRED_FOR_PARTITIONS if the given broker is the last replica for at least one partition, and force was not specified.
Record Formats
RegisterBrokerRecord
...