Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAccepted

Discussion thread: here

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-12931

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
{
  "apiKey": 0,
  "type": "metadata",
  "name": "RegisterBrokerRecord",
- "validVersions": "0",
+ "validVersionsflexibleVersions": "0-1",
+ "flexibleVersions": "1+",
  "fields": [
-   { "name": "BrokerId", "type": "int32", "versions": "0+",
+   { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker id." },
    { "name": "IncarnationId", "type": "uuid", "versions": "0+",
      "about": "The incarnation ID of the broker process" },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+",
      "about": "The broker epoch assigned by the controller." },
    { "name": "EndPoints", "type": "[]BrokerEndpoint", "versions": "0+",
      "about": "The endpoints that can be used to communicate with this broker.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "uint16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
    ]},
    { "name": "Features", "type": "[]BrokerFeature",
      "about": "The features on this broker", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The feature name." },
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported feature level." },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported feature level." }
    ]},
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
-     "about": "The broker rack." }
+     "about": "The broker rack." },
+    { "name": "Fenced", "type": "bool", "versions": "10+", "default": "true",
+     "about": "True if the broker is fenced." }
  ]
}

TopicRecord

Make a version of this record that is the record flexible, as intended.

Code Block
 {
   "apiKey": 2,
   "type": "metadata",
   "name": "TopicRecord",
-   "validVersions": "0",
+  "validVersionsflexibleVersions": "0-1",
+  "flexibleVersions": "1+",
   "fields": [
     { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
       "about": "The topic name." },
     { "name": "TopicId", "type": "uuid", "versions": "0+",
       "about": "The unique ID of this topic." }
   ]
 }

...

Code Block
 {
   "apiKey": 3,
   "type": "metadata",
   "name": "PartitionRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",
+  "flexibleVersions": "10+",
   "fields": [
     { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The partition id." },
     { "name": "TopicId", "type": "uuid", "versions": "0+",
       "about": "The unique ID of this topic." },
-    { "name": "Replicas", "type":  "[]int32", "versions":  "0+",
+    { "name": "Replicas", "type":  "[]int32", "versions":  "0+", "entityType": "brokerId",
       "about": "The replicas of this partition, sorted by preferred order." },
     { "name": "Isr", "type":  "[]int32", "versions":  "0+",
       "about": "The in-sync replicas of this partition" },
-    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+",
+    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+", "entityType": "brokerId",
       "about": "The replicas that we are in the process of removing." },
-    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+",
+    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+", "entityType": "brokerId",
       "about": "The replicas that we are in the process of adding." },
-    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
+    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
       "about": "The lead replica, or -1 if there is no leader." },
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The epoch of the partition leader." },
     { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
       "about": "An epoch that gets incremented each time we change anything in the partition." }
   ]
 }

...

Code Block
 {
   "apiKey": 4,
   "type": "metadata",
   "name": "ConfigRecord",
   "validVersions": "0-1"",
-  "flexibleVersions": "none",
+  "flexibleVersions": "10+",
   "fields": [
     { "name": "ResourceType", "type": "int8", "versions": "0+",
       "about": "The type of resource this configuration applies to." },
     { "name": "ResourceName", "type": "string", "versions": "0+",
       "about": "The name of the resource this configuration applies to." },
     { "name": "Name", "type": "string", "versions": "0+",
       "about": "The name of the configuration key." },
-    { "name": "Value", "type": "string", "versions": "0+",
+    { "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "10+",
       "about": "The value of the configuration." }
   ]
 }

...

Add removingReplicas, addingReplicas, and replicas to PartitionChangeRecord. These are needed to implement reassignment. No version bump is needed since the new fields are tagged (optional) fields.

Add entityType: brokerId where appropriate.

...

Code Block
{
  "apiKey": 7,
  "type": "metadata",
  "name": "FenceBrokerRecord",
- "validVersions": "0",
+ "validVersions": "0-1",
+ "flexibleVersions": "10+",
  "fields": [
-   { "name": "Id", "type": "int32", "versions": "0+",
+   { "name": "Id", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker ID to fence. It will be removed from all ISRs." },
    { "name": "Epoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the broker to fence." }
  ]
}

...

Code Block
{
  "apiKey": 8,
  "type": "metadata",
  "name": "UnfenceBrokerRecord",
- "validVersions": "0",
+ "validVersions": "0-1",
+ "flexibleVersions": "10+",
  "fields": [
-   { "name": "Id", "type": "int32", "versions": "0+",
+   { "name": "Id", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker ID to unfence." },
    { "name": "Epoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the broker to unfence." }
  ]
}

RemoveTopicRecord

Make a version of this record that is flexible, as intended.

Code Block
 {
   "apiKey": 9,
   "type": "metadata",
   "name": "RemoveTopicRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",
+  "flexibleVersions": "10+",
   "fields": [
     { "name": "TopicId", "type": "uuid", "versions": "0+",
       "about": "The topic to remove. All associated partitions will be removed as well." }
   ]
 }

...

Make this record that is flexible, as intended. Since we don't generate this record yet, no need for a new version.

Code Block
 {
Code Block
 {
   "apiKey": 10,
   "type": "metadata",
   "name": "DelegationTokenRecord",
   "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Owner", "type": "string", "versions": "0+",
       "about": "The delegation token owner." },
     { "name": "Renewers", "type": "[]string", "versions": "0+",
       "about": "The principals which have renewed this token." },
     { "name": "IssueTimestamp", "type": "int64", "versions": "0+",
       "about": "The time at which this timestamp was issued." },
     { "name": "MaxTimestamp", "type": "int64", "versions": "0+",
       "about": "The time at which this token cannot be renewed any more." },
     { "name": "ExpirationTimestamp", "type": "int64", "versions": "0+",
       "about": "The next time at which this token must be renewed." },
     { "name": "TokenId", "type": "string", "versions": "0+",
       "about": "The token id." }
   ]
 }

...

Make this record flexible, as intended. Since we don't generate this record yet, no need for a new record version.

Code Block
 {
   "apiKey": 11,
   "type": "metadata",
   "name": "UserScramCredentialRecord",
   "validVersions": "0",
-  "flexibleVersions": "none",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Name", "type": "string", "versions": "0+",
       "about": "The user name." },
     { "name": "CredentialInfos", "type": "[]CredentialInfo", "versions": "0+",
       "about": "The mechanism and related information associated with the user's SCRAM credential.", "fields": [
       { "name": "Mechanism", "type": "int8", "versions": "0+",
         "about": "The SCRAM mechanism." },
       { "name": "Salt", "type": "bytes", "versions": "0+",
         "about": "A random salt generated by the client." },
       { "name": "SaltedPassword", "type": "bytes", "versions": "0+",
         "about": "The salted password." },
       { "name": "Iterations", "type": "int32", "versions": "0+",
         "about": "The number of iterations used in the SCRAM credential." }]}
   ]
 }

...

Code Block
 {
   "apiKey": 12,
   "type": "metadata",
   "name": "FeatureLevelRecord",
   "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Name", "type": "string", "versions": "0+",
       "about": "The feature name." },
     { "name": "MinFeatureLevelFeatureLevel", "type": "int16", "versions": "0+",
       "about": "The current finalized minimum feature level of this feature for the cluster." },
   ]
 }

QuotaRecord

Rename this record to ClientQuotaRecord to avoid confusion. Make it flexible.

Code Block
 {
   "nameapiKey": "MaxFeatureLevel"14,
   "type": "int16metadata",
-  "versionsname": "0+QuotaRecord",
+    "name": "ClientQuotaRecord",
   "aboutvalidVersions": "0"The,
+ current finalized maximum feature level of this feature for the cluster." }"flexibleVersions": "0+",
   "fields": [
   ]
 }

QuotaRecord

Rename this record to ClientQuotaRecord to avoid confusion.

Correct the mistake that the first version of this was not flexible.

Code Block
 {
   "apiKeyname": 14"Entity",
   "type": "metadata[]EntityData",
-  "nameversions": "QuotaRecord0+",
+  "name": "ClientQuotaRecord",
-  "validVersions": "0",
+  "validVersionsabout": "0-1",
+  "flexibleVersions": "1+",
  The quota entity to alter.", "fields": [
       { "name": "EntityEntityType", "type": "[]EntityDatastring", "versions": "0+",
         "about": "The quota entity to altertype.", "fields": [ },
       { "name": "EntityTypeEntityName", "type": "string", "versions": "0+",
         "about": "The entity type." },
       { "name": "EntityName", "type": "string", "versions": "0+", " "nullableVersions": "0+",
         "about": "The name of the entity, or null if the default." }
     ]},
     { "name": "Key", "type": "string", "versions": "0+",
       "about": "The quota configuration key." },
     { "name": "Value", "type": "float64", "versions": "0+",
       "about": "The value to set, otherwise ignored if the value is to be removed." },
     { "name": "Remove", "type": "bool", "versions": "0+",
       "about": "Whether the quota configuration value should be removed, otherwise set." }
   ]
 }

BrokerRegistrationChangeRecord

Create a new record type named BrokerRegistrationChangeRecord. This record is similar to FenceBrokerRecord and UnfenceBrokerRecord , but more general. In the future, we will allow broker endpoints to be changed via this record.

ProducerIdsRecord

Correct the mistake the flexibleVersions was not set on ProducerIdsRecord.

Code Block
Code Block
 {
   "apiKey": 15,
   "type": "metadata",
   "name": "BrokerRegistrationChangeRecordProducerIdsRecord",
   "validVersions": "0",
 +  "flexibleVersions": "0+",
   "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker id.ID of the requesting broker" },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker epoch assignedof bythe therequesting controller.broker" },
    { "name": "FencedProducerIdsEnd", "type": "int8int64", "versions": "0+",
      "taggedVersionsabout": "The highest producer ID that has been generated"}
  ]
}

BrokerRegistrationChangeRecord

Create a new record type named BrokerRegistrationChangeRecord. This record is similar to FenceBrokerRecord and UnfenceBrokerRecord , but more general. In the future, we will allow broker endpoints to be changed via this record.

Note that if "EndPoints" is present, the supplied endpoints replace the existing endpoints of the broker.

Code Block
 {
   "apiKey": 17,
   "type": "metadata",
   "name": "BrokerRegistrationChangeRecord",
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
    { "name": "BrokerId", "type": "int320+", "tagversions": "0+", "entityType": "brokerId",
      "about": "-1 if the broker has been unfenced, 0 if no change, 1 if the broker has been fencedThe broker id." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+",
      "about": "The broker epoch assigned by the controller." },
    { "name": "ListenersFenced", "type": "[]Listenerint8", "versions": "0+", "taggedVersions": "0+", "tag": 10,
      "about": "The-1 listenersif ofthe this broker orhas nullbeen ifunfenced, there0 wasif no change.",
 1 if the broker  "nullableVersions": "0+", "defaulthas been fenced." },
    { "name": "nullEndPoints", "fieldstype": [
        { "name": "Name"[]Listener", "typeversions": "string0+", "versionstaggedVersions": "0+", "mapKeytag": true1,
          "about": "The namelisteners of the endpoint." },
     this broker or null if there was no change.",
   { "name": "Host", "typenullableVersions": "string0+", "versionsdefault": "0+null",
          "aboutfields": "The hostname." },[
        { "name": "PortName", "type": "uint16string", "versions": "0+"+", "mapKey": true,
          "about": "The port name of the endpoint." },
        { "name": "SecurityProtocolHost", "type": "int16string", "versions": "0+",
          "about": "The security protocolhostname." },
      ]
   ]
 }

RemoveFeatureLevelRecord

Create a new record type named RemoveFeatureLevelRecord. This record removes a feature level.

Code Block
 {
   "apiKeyname": 16"Port",
   "type": "metadatauint16",
   "nameversions": "RemoveFeatureLevelRecord0+",
   "validVersions": "0",
       "flexibleVersionsabout": "0+"The port." },
   "fields": [
    { "name": "NameSecurityProtocol", "type": "stringint16", "versions": "0+",
          "about": "The featuresecurity nameprotocol." }
      ]
   ]
 }

Compatibility, Deprecation, and Migration Plan

The metadata records used in 3.0 will not be backwards compatible with those in 2.8. This is OK because we don't support upgrade from 2.8 to 3.0 when using KRaft.

We will increment the frame version in 3.0 so that we can easily detect when a 2.8 record is present, and give a useful exception messageThese changes should be compatible with 2.8. New message versions are used where necessary.

Rejected Alternatives

We could also have made these changes in an incompatible a compatible fashion, since KRaft was still in Early Access in the . However, since we don't support upgrade from 2.8 release. However, since it's easy to make them in a compatible fashion, that seems better., there is no good reason to do that here. This would also be less efficient (more "if" statements in the generated code, etc.)