You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Under Discussion

Discussion thread

JIRA

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As we have continued to develop the KRaft controller, we have found some shortcomings in the original record schemas introduced in KIP-631. Many record types were introduced without flexible versions, which was not intentional. Some other records lacked necessary fields, such as the fields for KIP-455 reassignment. This KIP addresses those changes by changing the wire protocol in a minimal way.

Public Interfaces

RegisterBrokerRecord

Add a new version of this record that is flexible, as intended

Add entityType: brokerId annotations where appropriate.

Add a fenced boolean that can be set to false if the broker in question is unfenced.

{
  "apiKey": 0,
  "type": "metadata",
  "name": "RegisterBrokerRecord",
- "validVersions": "0",
+ "validVersions": "0-1",
+ "flexibleVersions": "1+",
  "fields": [
-   { "name": "BrokerId", "type": "int32", "versions": "0+",
+   { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker id." },
    { "name": "IncarnationId", "type": "uuid", "versions": "0+",
      "about": "The incarnation ID of the broker process" },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+",
      "about": "The broker epoch assigned by the controller." },
    { "name": "EndPoints", "type": "[]BrokerEndpoint", "versions": "0+",
      "about": "The endpoints that can be used to communicate with this broker.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "uint16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
    ]},
    { "name": "Features", "type": "[]BrokerFeature",
      "about": "The features on this broker", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The feature name." },
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported feature level." },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported feature level." }
    ]},
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
-     "about": "The broker rack." }
+     "about": "The broker rack." },
+    { "name": "Fenced", "type": "bool", "versions": "1+", "default": "true",
+     "about": "True if the broker is fenced." }
  ]
}

TopicRecord

Make a version of this record that is flexible, as intended.

 {
   "apiKey": 2,
   "type": "metadata",
   "name": "TopicRecord",
-   "validVersions": "0",
+  "validVersions": "0-1",
+  "flexibleVersions": "1+",
   "fields": [
     { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
       "about": "The topic name." },
     { "name": "TopicId", "type": "uuid", "versions": "0+",
       "about": "The unique ID of this topic." }
   ]
 }

PartitionRecord

Create a new version of this that is flexible, as intended.

Add entityType: brokerId where appropriate.

 {
   "apiKey": 3,
   "type": "metadata",
   "name": "PartitionRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",
+  "flexibleVersions": "1+",
   "fields": [
     { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The partition id." },
     { "name": "TopicId", "type": "uuid", "versions": "0+",
       "about": "The unique ID of this topic." },
-    { "name": "Replicas", "type":  "[]int32", "versions":  "0+",
+    { "name": "Replicas", "type":  "[]int32", "versions":  "0+", "entityType": "brokerId",
       "about": "The replicas of this partition, sorted by preferred order." },
     { "name": "Isr", "type":  "[]int32", "versions":  "0+",
       "about": "The in-sync replicas of this partition" },
-    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+",
+    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+", "entityType": "brokerId",
       "about": "The replicas that we are in the process of removing." },
-    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+",
+    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+", "entityType": "brokerId",
       "about": "The replicas that we are in the process of adding." },
-    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
+    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
       "about": "The lead replica, or -1 if there is no leader." },
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The epoch of the partition leader." },
     { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
       "about": "An epoch that gets incremented each time we change anything in the partition." }
   ]
 }

ConfigRecord

Correct the mistake of not making this record flexible.

Make Value nullable. If it is set to null, that indicates that we are removing the designated config mapping.

 {
   "apiKey": 4,
   "type": "metadata",
   "name": "ConfigRecord",
   "validVersions": "0-1",
-  "flexibleVersions": "none",
+  "flexibleVersions": "1+",
   "fields": [
     { "name": "ResourceType", "type": "int8", "versions": "0+",
       "about": "The type of resource this configuration applies to." },
     { "name": "ResourceName", "type": "string", "versions": "0+",
       "about": "The name of the resource this configuration applies to." },
     { "name": "Name", "type": "string", "versions": "0+",
       "about": "The name of the configuration key." },
-    { "name": "Value", "type": "string", "versions": "0+",
+    { "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "1+",
       "about": "The value of the configuration." }
   ]
 }

PartitionChangeRecord

Add removingReplicas, addingReplicas, and replicas to PartitionChangeRecord. These are needed to implement reassignment. No version bump is needed since the new fields are tagged (optional) fields.

Add entityType: brokerId where appropriate.

 {
   "apiKey": 5,
   "type": "metadata",
   "name": "PartitionChangeRecord",
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The partition id." },
     { "name": "TopicId", "type": "uuid", "versions": "0+",
       "about": "The unique ID of this topic." },
     { "name": "Isr", "type":  "[]int32", "default": "null", "entityType": "brokerId",
       "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0,
       "about": "null if the ISR didn't change; the new in-sync replicas otherwise." },
     { "name": "Leader", "type": "int32", "default": "-2", "entityType": "brokerId",
       "versions": "0+", "taggedVersions": "0+", "tag": 1,
-      "about": "-1 if there is now no leader; -2 if the leader didn't change; the new leader otherwise." }
+      "about": "-1 if there is now no leader; -2 if the leader didn't change; the new leader otherwise." },
+    { "name": "Replicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
+      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 2,
+      "about": "null if the replicas didn't change; the new replicas otherwise." },
+    { "name": "RemovingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
+      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 3,
+      "about": "null if the removing replicas didn't change; the new removing replicas otherwise." },
+    { "name": "AddingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
+      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 4,
+      "about": "null if the adding replicas didn't change; the new adding replicas otherwise." }
   ]
 }

FenceBrokerRecord

Make a version of this record that is flexible, as intended. Add entityType: brokerId to the broker id field.

{
  "apiKey": 7,
  "type": "metadata",
  "name": "FenceBrokerRecord",
- "validVersions": "0",
+ "validVersions": "0-1",
+ "flexibleVersions": "1+",
  "fields": [
-   { "name": "Id", "type": "int32", "versions": "0+",
+   { "name": "Id", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker ID to fence. It will be removed from all ISRs." },
    { "name": "Epoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the broker to fence." }
  ]
}

UnfenceBrokerRecord

Make a version of this record that is flexible, as intended. Add entityType: brokerId to the broker id field.

{
  "apiKey": 8,
  "type": "metadata",
  "name": "UnfenceBrokerRecord",
- "validVersions": "0",
+ "validVersions": "0-1",
+ "flexibleVersions": "1+",
  "fields": [
-   { "name": "Id", "type": "int32", "versions": "0+",
+   { "name": "Id", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker ID to unfence." },
    { "name": "Epoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the broker to unfence." }
  ]
}

RemoveTopicRecord

Make a version of this record that is flexible, as intended.

 {
   "apiKey": 9,
   "type": "metadata",
   "name": "RemoveTopicRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",
+  "flexibleVersions": "1+",
   "fields": [
     { "name": "TopicId", "type": "uuid", "versions": "0+",
       "about": "The topic to remove. All associated partitions will be removed as well." }
   ]
 }

DelegationTokenRecord

Make this record that is flexible, as intended. Since we don't generate this record yet, no need for a new version.

 {
   "apiKey": 10,
   "type": "metadata",
   "name": "DelegationTokenRecord",
   "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Owner", "type": "string", "versions": "0+",
       "about": "The delegation token owner." },
     { "name": "Renewers", "type": "[]string", "versions": "0+",
       "about": "The principals which have renewed this token." },
     { "name": "IssueTimestamp", "type": "int64", "versions": "0+",
       "about": "The time at which this timestamp was issued." },
     { "name": "MaxTimestamp", "type": "int64", "versions": "0+",
       "about": "The time at which this token cannot be renewed any more." },
     { "name": "ExpirationTimestamp", "type": "int64", "versions": "0+",
       "about": "The next time at which this token must be renewed." },
     { "name": "TokenId", "type": "string", "versions": "0+",
       "about": "The token id." }
   ]
 }

UserScramCredentialRecord

Make this record flexible, as intended. Since we don't generate this record yet, no need for a new record version.

 {
   "apiKey": 11,
   "type": "metadata",
   "name": "UserScramCredentialRecord",
   "validVersions": "0",
-  "flexibleVersions": "none",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Name", "type": "string", "versions": "0+",
       "about": "The user name." },
     { "name": "CredentialInfos", "type": "[]CredentialInfo", "versions": "0+",
       "about": "The mechanism and related information associated with the user's SCRAM credential.", "fields": [
       { "name": "Mechanism", "type": "int8", "versions": "0+",
         "about": "The SCRAM mechanism." },
       { "name": "Salt", "type": "bytes", "versions": "0+",
         "about": "A random salt generated by the client." },
       { "name": "SaltedPassword", "type": "bytes", "versions": "0+",
         "about": "The salted password." },
       { "name": "Iterations", "type": "int32", "versions": "0+",
         "about": "The number of iterations used in the SCRAM credential." }]}
   ]
 }

FeatureLevelRecord

Make this record flexible, as intended. Since we don't generate this record yet, no need for a new record version.

 {
   "apiKey": 12,
   "type": "metadata",
   "name": "FeatureLevelRecord",
   "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Name", "type": "string", "versions": "0+",
       "about": "The feature name." },
     { "name": "MinFeatureLevel", "type": "int16", "versions": "0+",
       "about": "The current finalized minimum feature level of this feature for the cluster." },
     { "name": "MaxFeatureLevel", "type": "int16", "versions": "0+",
       "about": "The current finalized maximum feature level of this feature for the cluster." }
   ]
 }

QuotaRecord

Rename this record to ClientQuotaRecord to avoid confusion.

Correct the mistake that the first version of this was not flexible.

 {
   "apiKey": 14,
   "type": "metadata",
-  "name": "QuotaRecord",
+  "name": "ClientQuotaRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",
+  "flexibleVersions": "1+",
   "fields": [
     { "name": "Entity", "type": "[]EntityData", "versions": "0+",
       "about": "The quota entity to alter.", "fields": [
       { "name": "EntityType", "type": "string", "versions": "0+",
         "about": "The entity type." },
       { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
         "about": "The name of the entity, or null if the default." }
     ]},
     { "name": "Key", "type": "string", "versions": "0+",
       "about": "The quota configuration key." },
     { "name": "Value", "type": "float64", "versions": "0+",
       "about": "The value to set, otherwise ignored if the value is to be removed." },
     { "name": "Remove", "type": "bool", "versions": "0+",
       "about": "Whether the quota configuration value should be removed, otherwise set." }
   ]
 }

BrokerRegistrationChangeRecord

Create a new record type named BrokerRegistrationChangeRecord. This record is similar to FenceBrokerRecord and UnfenceBrokerRecord , but more general. In the future, we will allow broker endpoints to be changed via this record.

 {
   "apiKey": 15,
   "type": "metadata",
   "name": "BrokerRegistrationChangeRecord",
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker id." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+",
      "about": "The broker epoch assigned by the controller." },
    { "name": "Fenced", "type": "int8", "versions": "0+", "taggedVersions": "0+", "tag": 0,
      "about": "-1 if the broker has been unfenced, 0 if no change, 1 if the broker has been fenced." }
   ]
 }

RemoveFeatureLevelRecord

Create a new record type named RemoveFeatureLevelRecord. This record removes a feature level.

 {
   "apiKey": 16,
   "type": "metadata",
   "name": "RemoveFeatureLevelRecord",
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
    { "name": "Name", "type": "string", "versions": "0+"
      "about": "The feature name." }
   ]
 }

Compatibility, Deprecation, and Migration Plan

These changes should be compatible with 2.8. New message versions are used where necessary.

Rejected Alternatives

We could also have made these changes in an incompatible fashion, since KRaft was still in Early Access in the 2.8 release. However, since it's easy to make them in a compatible fashion, that seems better.

  • No labels