...
Brokers start up in the fenced state, and can leave this state only by sending a heartbeat to the active controller and getting back a response that tells them they can become active.
Brokers will re-enter the fenced state if they are unable to communicate with the active controller within registration.lease.timeout.ms.
Controlled Shutdown
In the pre-KIP-500 world, brokers triggered a controller shutdown by making an RPC to the controller. When the controller returned a successful result from this RPC, the broker knew that it could shut down.
...
Configuration Name | Possible Values | Description |
---|---|---|
process.roles | null broker controller broker,controller | If this is null (absent) then we are in legacy mode. Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both. |
controller.listeners | null a listener name | A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required if this process is a KIP-500 controller. The legacy controller will not use this configuration Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291. The "control plane listener" is used on brokers, not on controllers. |
controller.connect | null a comma-separated list of controller URIs. | A comma-separated list of controller URIs that KIP-500 brokers should connect to on startup. Required for nodes running in KIP-500 mode. |
controller.id | a 32-bit ID | The controller id for this server. This must be set to a non-negative number when running as a KIP-500 controller. Controller IDs should not overlap with broker IDs. |
registration.heartbeat.interval.ms | 20001000 | The length of time between broker heartbeats. |
registration.lease.timeout.ms | 200008000 | The length of time that a broker lease lasts if no heartbeats are made. |
...
Code Block | ||
---|---|---|
| ||
{ "apiKey": 50, "type": "request", "name": "BrokerHeartbeatRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "TargetStateCurrentState", "type": "int8", "versions": "0+", "about": "The current state that the broker wantsis to reachin." }, { "name": "BrokerIdTargetState", "type": "int32int8", "versions": "0+", "about": "The state that the broker wants to IDreach." }, { "name": "BrokerEpochBrokerId", "type": "int64int32", "versions": "0+", "default": "-1", "about": "The broker epoch, or -1 if one has not yet been assignedID." }, { "name": "LeaseStartTimeMsBrokerEpoch", "type": "int64", "versions": "0+", "default": "-1", "about": "The broker epoch, or -1 if one has not yet been assigned." }, { "name": "LeaseStartTimeMs", "type": "int64", "versions": "0+", "about": "The time which the broker wants the lease to start at in milliseconds." }, { "name": "CurMetadataOffset", "type": "int64", "versions": "0+", "about": "The highest metadata offset which the broker has reached." }, { "name": "Listeners", "type": "[]Listener", "about": "The listeners of this broker", "versions": "0+", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the endpoint." }, { "name": "Host", "type": "string", "versions": "0+", "about": "The hostname." }, { "name": "Port", "type": "int16", "versions": "0+", "about": "The port." }, { "name": "SecurityProtocol", "type": "int16", "versions": "0+", "about": "The security protocol." } ] } ] } { "apiKey": 50, "type": "response", "name": "BrokerHeartbeatResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ActiveControllerId", "type": "int32", "versions": "0+", "about": "The ID of the active controller, or -1 if the controller doesn't know." }, { "name": "NextState", "type": "int8", "versions": "0+", "about": "The state to which the broker should transition." }, { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1", "about": "The broker's assigned epoch, or -1 if none was assigned." }, { "name": "LeaseEndTimeMs", "type": "int64", "versions": "0+", "about": "The time in milliseconds at which the lease should end. This is based on the start time that was passed, not the controller's local clock." } ] } enum BrokerState { UNKNOWN(0), INITIAL(1), FENCED(2), ACTIVE(3), SHUTDOWN(4); } |
The LeaseStartTimeMs is expected to be the broker's 'System.currentTimeMillis()' at the point of the request. The active controller will add its lease period to this in order to compute the LeaseEndTimeMs.
The controller will wait to unfence a broker until it has sent at least one heartbeat where that broker's currentState is active. So a typical transition will look like this:
- broker sends a BrokerHeartbeatRequest with currentState = fenced, targetState = active. controller response with nextState = active
- broker sends a BrokerHeartbeatRequest with currentState = active, targetState = active. controller response with nextState = active
- controller unfences the broker
The second step informs the controller that the broker has received its response to the first step. In order to speed this process along, the broker will send the second heartbeat immediately after receiving the first, rather than waiting for the heartbeat timeout.
As always with enums, the UNKNOWN state is used only to translate values that our software is too old to understand.
...