Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAccepted

Discussion thread

JIRA:  

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-15230

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Once after starting up, Kafka controllers will register themselves with the active controller. This registration will include information about the endpoints which they possess, as well as information about whether they are ready to perform zk migration, and a randomly generated UUID uniquely identifying the specific incarnation of the controller. The registration will be persisted in the metadata log as hard state.

Periodically, each Each controller will check that the controller registration for its ID is as expected. If it is not, it will re-register. It examines the registration information found in the metadata log. This periodic check is necessary to fix cases where a delayed message from an older incarnation of controller N somehow arrives later than a more recent registration for controller N. Although we expect this to be quite rare, it is possible.

...

It is an error to set both bootstrap.controllers  and bootstrap.servers . Only one can be set at a time. It is also an error to include broker endpoints in --bootstrap-controllerscontroller . If we contact a broker via this mechanism, the command will fail.

...

The following command-line tools will get a new --bootstrap-controllers argumentcontroller argument:

  • kafka-acls.sh
  • kafka-cluster.sh
  • kafka-configs.sh
  • kafka-delegation-tokens.sh
  • kafka-features.sh
  • kafka-metadata-quorum.sh
  • kafka-metadata-shell.sh

  • kafka-reassign-partitions.sh

When the --bootstrap-controllers  controller argument is used --bootstrap-servers must not be specified.

The --bootstrap-controllerscontroller  flag will set the bootstrap.controllers configuration described above. It will also clear the bootstrap.servers configuration if that has been set in some other way (for example, via a configuration file provided to the command-line tool).

The --bootstrap-controllers flag controller flag will be documented as follows:

Code Block
   --bootstrap-controllerscontroller CONTROLLERS
                         A comma-separted list of bootstrap.controllers that can be supplied instead of boostrapbootstrap-servers.
                         This is useful for administrators who wish to bypass the brokers.

Note that it is not necessary to specify the controller IDs when using --bootstrap-controllerscontroller .

Here is an example --bootstrap-controllerscontroller  usage:

Code Block
./bin/kafka-cluster.sh cluster-id --bootstrap-controllerscontroller example.com:9090,example2.com:9090,example3.com:9090

...

Code Block
The Apache Kafka metadata tool

positional arguments:
  command                The command to run.

optional arguments:
  -h, --help             show this help message and exit
  --directory DIRECTORY, -d DIRECTORY
                         The __cluster_metadata-0 directory to read.
  --bootstrap-controllerscontroller CONTROLLERS, -q CONTROLLERS
                         The bootstrap.controllers, used to communicate directly with the metadata quorum.
  --config CONFIG        Path to a property file containing a Kafka configuration

...

Code Block
UNSUPPORTED_ENDPOINT_TYPE([next], "This endpoint type is not supported yet.", UnsupportedEndpointTypeUnsupportedEndpointTypeException::new),

DescribeCluster Changes

There will also be a new UNKNOWN_CONTROLLER_ID error.

Code Block
UNKNOWN_CONTROLLER_ID([next], "This controller ID is not known.", UnknownControllerIdException::new),

DescribeCluster Changes

 When bootstrap.controller  is set, the AdminClient will use DescribeClusterRequest When bootstrap.controller  is set, the AdminClient will use DescribeClusterResquest  rather than MetadataRequest  to obtain the cluster toplolgy.

...

If the MetadataVersion is too old to support controller heartbeatsregistrations, and EndpointType was passed as "controllers," the controller will return UNSUPPORTED_ENDPOINT_TYPE . This reflects the fact that it doesn't have metadata about the controller endpoints in these older MetadataVersions.

ControllerRegistrationRequest / Response

There will be a new ControllerRegistrationRequest. All controllers will send this to the active controller.

Code Block
{
  "apiKey": ...,
  "type": "request",
  "name": "ControllerRegistrationRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+",
      "about": "The ID of the controller to IDregister." },
    { "name": "IncarnationIdActiveControllerEpoch", "type": "int32", "versions": "0+",
      "about": "The epoch of the current active controller." },
    { "name": "IncarnationId", "type": "uuid", "versions": "0+",
      "about": "The controller incarnation ID, which is unique to each process run." },
    { "name": "ZkMigrationReady", "type": "bool", "versions": "0+",
      "about": "Set if the required configurations for ZK migration are present." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this brokercontroller", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname." },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port." },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol." }
    ]
    },
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this controller", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The feature name." },
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported feature level." },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported feature level." }
    ]
    }
  ]
}

...

Code Block
{
  "apiKey": ...,
  "type": "response",
  "name": "ControllerHeartbeatResponseControllerRegistrationResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The response error code." },
    { "name": "ErrorMessage", "type": "string", "nullableVersions": "0+", "versions": "0+",
      "about": "The response error message, or null if there was no error." }
  ]
}

ControllerRegistrationRecord

The active controller will persist all registrations that are sent in with the correct permissions (CLUSTERACTION on CLUSTER). If the controller is not active, we'll send back a NOT_CONTROLLER error.

The active controller may also return STALE_CONTROLLER_EPOCH if the wrong epoch was passed.

ApiVersionsResponse

The ZkMigrationReady field in ApiVersionsResponse is now deprecated, and won't be filled out.

Code Block
diff --git a/clients/src/main/resources/common/message/ApiVersionsResponse.json b/clients/src/main/resources/common/message/ApiVersionsResponse.json
index 9fda953e10e..eb449f07c54 100644
--- a/clients/src/main/resources/common/message/ApiVersionsResponse.json
+++ b/clients/src/main/resources/common/message/ApiVersionsResponse.json
@@ -70,8 +70,6 @@
           "about": "The cluster-wide finalized min version level for the feature."}
       ]
     },
-    { "name":  "ZkMigrationReady", "type": "bool", "versions": "3+", "taggedVersions": "3+",
-      "tag": 3, "ignorable": true, "default": "false",
-      "about": "Set by a KRaft controller if the required configurations for ZK migration are present" }
+    { "name":  "ZkMigrationReady", "type": "deprecated", "versions": "3+", "taggedVersions": "3+", "tag": 3 }
   ]
 }

RegisterControllerRecord

The data from the registration request will be written to a new RegisterControllerRecordThe data from the registration request will be written to a new ControllerRegistrationRecord .

Code Block
{
  "apiKey": ...,
  "type": "metadata",
  "name": "RegisterControllerRecord",
  "validVersions": "0+",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+",
      "about": "The controller id." },
    { "name": "IncarnationId", "type": "uuid", "versions": "0+",
      "about": "The incarnation ID of the controller process" },
    { "name": "ZkMigrationReady", "type": "boolean", "versions": "0+",
      "about": "Set if the required configurations for ZK migration are present." },
    { "name": "EndPoints", "type": "[]ControllerEndpoint", "versions": "0+",
      "about": "The endpoints that can be used to communicate with this controller.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "uint16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
    ]},
    { "name": "Features", "type": "[]ControllerFeature",
      "about": "The features on this controller", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The feature name." },
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported feature level." },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported feature level." }
    ]}
  ]
}

...

As described above, AdminClient will now support bootstrap.controller  in addition to bootstrap.server . The following APIs will be supported:

API
Notes
alterConfigs
createAcls
deleteAcls
Notes

describeCluster
describeAcls
describeConfigs
describeClientQuotas
alterClientQuotas
incrementalAlterConfigsThis can be used to alter log4j settings on inactive controllers. That is the one case where we don't send the RPC to the active controller.
describeDelegationToken
electLeaders
alterPartitionReassignments
listPartitionReassignments
describeClientQuotas
describeUserScramCredentials
describeFeatures
updateFeatures
describeMetadataQuorum
unregisterBrokerthis affects brokers, not controllers

When an unsupported API is used with bootstrap.controller , UnsupportedEndpointType  is returned as an error.

Controller Changes

Fix compatibility gates

Previously, the controller used ApiVersionsResponse messages obtained from the other controllers to determine:

...

With the introduction of controller registrations, the active controller will simply use this information instead. This will be more accurate than the old system.

New APIs

The following RPCs will now be supported on the controller:

APINotes
DESCRIBE_CONFIGSEven on inactive controllers, this can be used to alter the log4j settings dynamically.
DESCRIBE_CLUSTERAs described above, DESCRIBE_CLUSTER will be used by AdminClient for bootstrapping.
REGISTER_CONTROLLER

Compatibility, Deprecation, and Migration Plan

Client Compatibility Matrix


pre-KIP-919 brokerpost-KIP-919 brokerpre-KIP-919 controllerpost KIP-919 controller
bootstrap.serversuccesssuccess

Fails because METADATA request is not supported.

Fails because METADATA request is not supported.

bootstrap.controllerFails because DESCRIBE_CLUSTER returns MISMATCHED_ENDPOINTFails because DESCRIBE_CLUSTER v1 is not supported.Fails because DESCRIBE_CLUSTER v1 is not supported.

If the metadata version is too old to support controller registrations, UNSUPPORTED_ENDPOINT_TYPE.

Otherwise, success.

Controller Registration MetadataVersion Gate

...

Rather than having a bootstrap.controllers  configuration, we could have a separate configuration like direct.to.controller  and put the controller servers into bootstrap.servers . Similarly, we could reuse --bootstrap-server erather than adding --bootstrap-controllerscontroller.

We decided to go with the scheme proposed above to make it clearer when a tool was going directly to the controller. This also makes it clearer which command-line tools have this capability and which do not.

For example, kafka-console-consumer.sh  does not have the capability to go direct to the controller, since the controller does not handle produces. Therefore, it's intuitive that kafka-console-consumer.sh lacks the --bootstrap-controllers flagcontroller flag.

Another issue is that in the future, we may want to support using the controllers as bootstrap servers for the brokers. The scheme above leaves the door open for this, whereas a scheme that reused existing configurations would not. 

...