Status

Current state: Accepted

Discussion thread: here

JIRA Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As we have continued to develop the KRaft controller, we have found some shortcomings in the original record schemas introduced in KIP-631. Many record types were introduced without flexible versions, which was not intentional. Some other records lacked necessary fields, such as the fields for KIP-455 reassignment. This KIP addresses those changes by changing the wire protocol in a minimal way.

Public Interfaces

RegisterBrokerRecord

Add a new version of this record that is flexible, as intended

Add entityType: brokerId annotations where appropriate.

Add a fenced boolean that can be set to false if the broker in question is unfenced.

{
  "apiKey": 0,
  "type": "metadata",
  "name": "RegisterBrokerRecord",
- "validVersions": "0",
+ "flexibleVersions": "0+",
  "fields": [
-   { "name": "BrokerId", "type": "int32", "versions": "0+",
+   { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker id." },
    { "name": "IncarnationId", "type": "uuid", "versions": "0+",
      "about": "The incarnation ID of the broker process" },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+",
      "about": "The broker epoch assigned by the controller." },
    { "name": "EndPoints", "type": "[]BrokerEndpoint", "versions": "0+",
      "about": "The endpoints that can be used to communicate with this broker.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "uint16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
    ]},
    { "name": "Features", "type": "[]BrokerFeature",
      "about": "The features on this broker", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The feature name." },
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported feature level." },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported feature level." }
    ]},
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
-     "about": "The broker rack." }
+     "about": "The broker rack." },
+    { "name": "Fenced", "type": "bool", "versions": "0+", "default": "true",
+     "about": "True if the broker is fenced." }
  ]
}

TopicRecord

Make the record flexible, as intended.

 {
   "apiKey": 2,
   "type": "metadata",
   "name": "TopicRecord",
   "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
       "about": "The topic name." },
     { "name": "TopicId", "type": "uuid", "versions": "0+",
       "about": "The unique ID of this topic." }
   ]
 }

PartitionRecord

Create a new version of this that is flexible, as intended.

Add entityType: brokerId where appropriate.

 {
   "apiKey": 3,
   "type": "metadata",
   "name": "PartitionRecord",
   "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The partition id." },
     { "name": "TopicId", "type": "uuid", "versions": "0+",
       "about": "The unique ID of this topic." },
-    { "name": "Replicas", "type":  "[]int32", "versions":  "0+",
+    { "name": "Replicas", "type":  "[]int32", "versions":  "0+", "entityType": "brokerId",
       "about": "The replicas of this partition, sorted by preferred order." },
     { "name": "Isr", "type":  "[]int32", "versions":  "0+",
       "about": "The in-sync replicas of this partition" },
-    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+",
+    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+", "entityType": "brokerId",
       "about": "The replicas that we are in the process of removing." },
-    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+",
+    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+", "entityType": "brokerId",
       "about": "The replicas that we are in the process of adding." },
-    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
+    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
       "about": "The lead replica, or -1 if there is no leader." },
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The epoch of the partition leader." },
     { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
       "about": "An epoch that gets incremented each time we change anything in the partition." }
   ]
 }

ConfigRecord

Correct the mistake of not making this record flexible.

Make Value nullable. If it is set to null, that indicates that we are removing the designated config mapping.

 {
   "apiKey": 4,
   "type": "metadata",
   "name": "ConfigRecord",
   "validVersions": "0",
-  "flexibleVersions": "none",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "ResourceType", "type": "int8", "versions": "0+",
       "about": "The type of resource this configuration applies to." },
     { "name": "ResourceName", "type": "string", "versions": "0+",
       "about": "The name of the resource this configuration applies to." },
     { "name": "Name", "type": "string", "versions": "0+",
       "about": "The name of the configuration key." },
-    { "name": "Value", "type": "string", "versions": "0+",
+    { "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "0+",
       "about": "The value of the configuration." }
   ]
 }

PartitionChangeRecord

Add removingReplicas, addingReplicas, and replicas to PartitionChangeRecord. These are needed to implement reassignment.

Add entityType: brokerId where appropriate.

 {
   "apiKey": 5,
   "type": "metadata",
   "name": "PartitionChangeRecord",
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The partition id." },
     { "name": "TopicId", "type": "uuid", "versions": "0+",
       "about": "The unique ID of this topic." },
     { "name": "Isr", "type":  "[]int32", "default": "null", "entityType": "brokerId",
       "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0,
       "about": "null if the ISR didn't change; the new in-sync replicas otherwise." },
     { "name": "Leader", "type": "int32", "default": "-2", "entityType": "brokerId",
       "versions": "0+", "taggedVersions": "0+", "tag": 1,
-      "about": "-1 if there is now no leader; -2 if the leader didn't change; the new leader otherwise." }
+      "about": "-1 if there is now no leader; -2 if the leader didn't change; the new leader otherwise." },
+    { "name": "Replicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
+      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 2,
+      "about": "null if the replicas didn't change; the new replicas otherwise." },
+    { "name": "RemovingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
+      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 3,
+      "about": "null if the removing replicas didn't change; the new removing replicas otherwise." },
+    { "name": "AddingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
+      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 4,
+      "about": "null if the adding replicas didn't change; the new adding replicas otherwise." }
   ]
 }

FenceBrokerRecord

Make a version of this record that is flexible, as intended. Add entityType: brokerId to the broker id field.

{
  "apiKey": 7,
  "type": "metadata",
  "name": "FenceBrokerRecord",
  "validVersions": "0",
+ "flexibleVersions": "0+",
  "fields": [
-   { "name": "Id", "type": "int32", "versions": "0+",
+   { "name": "Id", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker ID to fence. It will be removed from all ISRs." },
    { "name": "Epoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the broker to fence." }
  ]
}

UnfenceBrokerRecord

Make a version of this record that is flexible, as intended. Add entityType: brokerId to the broker id field.

{
  "apiKey": 8,
  "type": "metadata",
  "name": "UnfenceBrokerRecord",
  "validVersions": "0",
+ "flexibleVersions": "0+",
  "fields": [
-   { "name": "Id", "type": "int32", "versions": "0+",
+   { "name": "Id", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker ID to unfence." },
    { "name": "Epoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the broker to unfence." }
  ]
}

RemoveTopicRecord

Make this record flexible, as intended.

 {
   "apiKey": 9,
   "type": "metadata",
   "name": "RemoveTopicRecord",
   "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "TopicId", "type": "uuid", "versions": "0+",
       "about": "The topic to remove. All associated partitions will be removed as well." }
   ]
 }

DelegationTokenRecord

Make this record that is flexible, as intended.

 {
   "apiKey": 10,
   "type": "metadata",
   "name": "DelegationTokenRecord",
   "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Owner", "type": "string", "versions": "0+",
       "about": "The delegation token owner." },
     { "name": "Renewers", "type": "[]string", "versions": "0+",
       "about": "The principals which have renewed this token." },
     { "name": "IssueTimestamp", "type": "int64", "versions": "0+",
       "about": "The time at which this timestamp was issued." },
     { "name": "MaxTimestamp", "type": "int64", "versions": "0+",
       "about": "The time at which this token cannot be renewed any more." },
     { "name": "ExpirationTimestamp", "type": "int64", "versions": "0+",
       "about": "The next time at which this token must be renewed." },
     { "name": "TokenId", "type": "string", "versions": "0+",
       "about": "The token id." }
   ]
 }

UserScramCredentialRecord

Make this record flexible, as intended.

 {
   "apiKey": 11,
   "type": "metadata",
   "name": "UserScramCredentialRecord",
   "validVersions": "0",
-  "flexibleVersions": "none",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Name", "type": "string", "versions": "0+",
       "about": "The user name." },
     { "name": "CredentialInfos", "type": "[]CredentialInfo", "versions": "0+",
       "about": "The mechanism and related information associated with the user's SCRAM credential.", "fields": [
       { "name": "Mechanism", "type": "int8", "versions": "0+",
         "about": "The SCRAM mechanism." },
       { "name": "Salt", "type": "bytes", "versions": "0+",
         "about": "A random salt generated by the client." },
       { "name": "SaltedPassword", "type": "bytes", "versions": "0+",
         "about": "The salted password." },
       { "name": "Iterations", "type": "int32", "versions": "0+",
         "about": "The number of iterations used in the SCRAM credential." }]}
   ]
 }

FeatureLevelRecord

Make this record flexible, as intended. Since we don't generate this record yet, no need for a new record version.

 {
   "apiKey": 12,
   "type": "metadata",
   "name": "FeatureLevelRecord",
   "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Name", "type": "string", "versions": "0+",
       "about": "The feature name." },
     { "name": "FeatureLevel", "type": "int16", "versions": "0+",
       "about": "The current finalized feature level of this feature for the cluster." },
   ]
 }

QuotaRecord

Rename this record to ClientQuotaRecord to avoid confusion. Make it flexible.

 {
   "apiKey": 14,
   "type": "metadata",
-  "name": "QuotaRecord",
+  "name": "ClientQuotaRecord",
   "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Entity", "type": "[]EntityData", "versions": "0+",
       "about": "The quota entity to alter.", "fields": [
       { "name": "EntityType", "type": "string", "versions": "0+",
         "about": "The entity type." },
       { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
         "about": "The name of the entity, or null if the default." }
     ]},
     { "name": "Key", "type": "string", "versions": "0+",
       "about": "The quota configuration key." },
     { "name": "Value", "type": "float64", "versions": "0+",
       "about": "The value to set, otherwise ignored if the value is to be removed." },
     { "name": "Remove", "type": "bool", "versions": "0+",
       "about": "Whether the quota configuration value should be removed, otherwise set." }
   ]
 }

ProducerIdsRecord

Correct the mistake the flexibleVersions was not set on ProducerIdsRecord.

{
  "apiKey": 15,
  "type": "metadata",
  "name": "ProducerIdsRecord",
  "validVersions": "0",
+ "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The ID of the requesting broker" },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The epoch of the requesting broker" },
    { "name": "ProducerIdsEnd", "type": "int64", "versions": "0+",
      "about": "The highest producer ID that has been generated"}
  ]
}

BrokerRegistrationChangeRecord

Create a new record type named BrokerRegistrationChangeRecord. This record is similar to FenceBrokerRecord and UnfenceBrokerRecord , but more general. In the future, we will allow broker endpoints to be changed via this record.

Note that if "EndPoints" is present, the supplied endpoints replace the existing endpoints of the broker.

 {
   "apiKey": 17,
   "type": "metadata",
   "name": "BrokerRegistrationChangeRecord",
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker id." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+",
      "about": "The broker epoch assigned by the controller." },
    { "name": "Fenced", "type": "int8", "versions": "0+", "taggedVersions": "0+", "tag": 0,
      "about": "-1 if the broker has been unfenced, 0 if no change, 1 if the broker has been fenced." },
    { "name": "EndPoints", "type": "[]Listener", "versions": "0+", "taggedVersions": "0+", "tag": 1,
      "about": "The listeners of this broker or null if there was no change.",
      "nullableVersions": "0+", "default": "null", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "uint16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
      ]
   ]
 }

Compatibility, Deprecation, and Migration Plan

The metadata records used in 3.0 will not be backwards compatible with those in 2.8. This is OK because we don't support upgrade from 2.8 to 3.0 when using KRaft.

We will increment the frame version in 3.0 so that we can easily detect when a 2.8 record is present, and give a useful exception message.

Rejected Alternatives

We could have made these changes in a compatible fashion. However, since we don't support upgrade from 2.8, there is no good reason to do that here. This would also be less efficient (more "if" statements in the generated code, etc.)

  • No labels