Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Brokers start up in the fenced state, and can leave this state only by sending a heartbeat to the active controller and getting back a response that tells them they can become active.

Brokers will re-enter the fenced state if they are unable to communicate with the active controller within registration.lease.timeout.ms.

Controlled Shutdown

In the pre-KIP-500 world, brokers triggered a controller shutdown by making an RPC to the controller.  When the controller returned a successful result from this RPC, the broker knew that it could shut down.

...

Configuration NamePossible ValuesDescription
process.roles

null

broker

controller

broker,controller

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

controller.listeners

null

a listener name

A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.  The "control plane listener" is used on brokers, not on controllers.

controller.connect

null

a comma-separated list of controller URIs.

A comma-separated list of controller URIs that KIP-500 brokers should connect to on startup.  Required for nodes running in KIP-500 mode.
controller.id

a 32-bit ID

The controller id for this server. This must be set to a non-negative number when running as a KIP-500 controller. Controller IDs should not overlap with broker IDs.
registration.heartbeat.interval.ms20001000The length of time between broker heartbeats.
registration.lease.timeout.ms200008000The length of time that a broker lease lasts if no heartbeats are made.

...

Code Block
languagejs
{
  "apiKey": 50,
  "type": "request",
  "name": "BrokerHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TargetStateCurrentState", "type": "int8", "versions": "0+",
      "about": "The current state that the broker wantsis to reachin." },
    { "name": "BrokerIdTargetState", "type": "int32int8", "versions": "0+",
      "about": "The state that the broker wants to IDreach." },
    { "name": "BrokerEpochBrokerId", "type": "int64int32", "versions": "0+", "default": "-1",
      "about": "The broker epoch, or -1 if one has not yet been assignedID." },
    { "name": "LeaseStartTimeMsBrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker epoch, or -1 if one has not yet been assigned." },
    { "name": "LeaseStartTimeMs", "type": "int64", "versions": "0+",
      "about": "The time which the broker wants the lease to start at in milliseconds." },
    { "name": "CurMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has reached." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "int16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
      ]
    }
  ]
}

{
  "apiKey": 50,
  "type": "response",
  "name": "BrokerHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ActiveControllerId", "type": "int32", "versions": "0+",
      "about": "The ID of the active controller, or -1 if the controller doesn't know." },
    { "name": "NextState", "type": "int8", "versions": "0+",
      "about": "The state to which the broker should transition." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker's assigned epoch, or -1 if none was assigned." },
    { "name": "LeaseEndTimeMs", "type": "int64", "versions": "0+",
      "about": "The time in milliseconds at which the lease should end. This is based on the start time that was passed, not the controller's local clock." }
  ]
}

enum BrokerState {
    UNKNOWN(0),
    INITIAL(1),
    FENCED(2),
    ACTIVE(3),
    SHUTDOWN(4);
}

The LeaseStartTimeMs is expected to be the broker's 'System.currentTimeMillis()' at the point of the request. The active controller will add its lease period to this in order to compute the LeaseEndTimeMs.

The controller will wait to unfence a broker until it has sent at least one heartbeat where that broker's currentState is active.  So a typical transition will look like this:

  • broker sends a BrokerHeartbeatRequest with currentState = fenced, targetState = active.  controller response with nextState = active
  • broker sends a BrokerHeartbeatRequest with currentState = active, targetState = active.  controller response with nextState = active
  • controller unfences the broker

The second step informs the controller that the broker has received its response to the first step.  In order to speed this process along, the broker will send the second heartbeat immediately after receiving the first, rather than waiting for the heartbeat timeout.

As always with enums, the UNKNOWN state is used only to translate values that our software is too old to understand.

...