Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Sometimes there can be a need to downgrade the cluster-wide availability of one or more broker features from some finalized version X to a version Y, where Y < X. Firstly, we leave it to the cluster operator (i.e. human) to decide whether the downgrade is backwards compatible in the first place i.e. externally ensure the changes introduced/exercised in version Y are no longer relevant in the system ahead of the downgrade. As for the support we provide:

    • The rules are the same as with upgrades → such a downgrade request is rejected by the system, unless all brokers support the downgraded version Y of the feature.

    • We assume downgrades of finalized feature max versions are rare. For safety reasons, we request for the human to specify an explicit "force downgrade" flag to avoid accidental downgrades to feature finalized max versions.
  • A need can arise to deprecate the usage of a certain version of one or more broker feature. We do not provide any special support for this. Instead, the cluster operator (i.e. human) should use external means to establish that it is safe to stop supporting a particular version of broker feature. For example, verify (if needed) that no clients are actively using the version, before deciding to stop supporting it.

...

Code Block
{ 
   "listener_security_protocol_map":{ 
      "INTERNAL":"PLAINTEXT",
      "REPLICATION":"PLAINTEXT",
      "EXTERNAL":"SASL_SSL"
   },
   "endpoints":[ 
      "INTERNAL://kafka-0.kafka.def.cluster.local:9071",
      "REPLICATION://kafka-0.kafka.abc.cluster.local:9072",
      "EXTERNAL://b3-kfc-mnope.us.clusterx:9092"
   ],
   "rack":"0",
   "jmx_port":7203,
   "host":"kafka-0.kafka.def.cluster.local",
   "timestamp":"1579041366882",
   "port":9071,
   "version":4,
   // ----- START: PROPOSED ADDITIONAL METADATA -----
   "features": { 
      "exactly_once_semantics": {  // string -> name of the feature
          "minVersion": 0,   // int64 -> represents the min supported version of this feature
          "maxVersion": 3  // int64 -> represents the max supported version of this feature
      },
      "consumer_offsets_topic_schema": { 
          "minVersion": 1,
          "maxVersion": 4
      }
   }
   // ----- END: PROPOSED ADDITIONAL METADATA -----
}

Persistence of finalized feature max versions

The proposal is that cluster-wide finalized feature max versions will be persisted in a specific common ZK node. The path to the ZK node is proposed as: '/features' . The node content type is JSON (string), and the size is expected to be typically small (in several KBs). Couple high level details:

...

Code Block
{
   "version": 0, // int64 -> Represents the version of the schema for the data stored in the ZK node
   "data": {
        "epoch": 0, // int64 -> Represents the version of the contents of the "data" dictionary.
		"features": {
        	"exactly_once_semantics": {   // string -> name of the feature
            	"version": 3 // int64 -> Represents the cluster-wide finalized max version of this feature
	        },
    	    "consumer_offsets_topic_schema": { 
        	    "version": 4
	        }
		}
   }
}

...

  • The input to the API is a list of FeatureUpdate that need to be applied. Each item specifies an operation (as a FeatureUpdateType) such as:

    • Adding a new Feature with a finalized max version. This happens when the cluster-wide max feature version is being finalized for the first time for a feature.

    • Updating the max finalized feature version of an existing Feature.

    • Deleting an existing Feature.

  • The response (during success) is a list of latest finalized Feature, with an epoch number (monotonically increasing).

...

Code Block
=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
#  - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
#  - Optionally, provide `--controller` flag directing the tool to issue the query to the
#    controller (while discovering the controller via the bootstrap server).
#    This can be useful for debugging purposes.
$> kafka-features.sh describe --bootstrap-server kafka-broker0.prn1:9071 [--controller]

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersion": 1,
            "maxVersion": 2
        },
        "transaction_coordinator": {
        	"minVersion": 0,
        	"maxVersion": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersion": 3,
        	"maxVersion": 7
        }
	},
	"finalized_features": {
        "epoch": 0,
        "group_coordinator": {
            "version": 1
        },
        "transaction_coordinator": {
        	"version": 4
        },
        "consumer_offsets_topic_schema": { 
            "version": 7
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== ADD_OR_UPDATE FEATURES ===

# Add or update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATE request is issued.
#  - Use `--features` to provide a comma-separated list of features and their new finalized max versions to ADD_OR_UPDATE.
#  - Optionally, use `--force-downgrade` to force downgrade of a finalized feature version. This should be used only when required.

$> kafka-features.sh update --bootstrap-server kafka-broker0.prn1:9071 --features group_coordinator:2,transaction_coordinator:3 [--force-downgrade]

Please confirm before finalizing the upgrade of the following features:
1. group_coordinator from v1 (existing) to v2 (new)

[Y/n]? Y

Please confirm before downgrading the following features:
1.transaction_coordinator from v4 (existing) to v3 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersion": 1,
            "maxVersion": 2
        },
        "transaction_coordinator": {
        	"minVersion": 0,
        	"maxVersion": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersion": 3,
        	"maxVersion": 7
        }
	},
	"finalized_features": {
        "epoch": 1,
        "group_coordinator": {
            "version": 2
        },
        "transaction_coordinator": {
        	"version": 5
        },
        "consumer_offsets_topic_schema": { 
            "version": 7
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== DELETE FEATURES ===

# Delete a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual delete request is issued.
#  - Use `--features` to provide a comma-separated list of features to be deleted.

$> kafka-features.sh delete --bootstrap-server kafka-broker0.prn1:9071 --features group_coordinator,transaction_coordinator

Please confirm deletion of the following features:
1. group_coordinator
2. transaction_coordinator

[Y/n] Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersion": 1,
            "maxVersion": 2
        },
        "transaction_coordinator": {
        	"minVersion": 0,
        	"maxVersion": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersion": 3,
        	"maxVersion": 7
        }
	},
	"finalized_features": {
		"epoch": 2,
        "consumer_offsets_topic_schema": { 
            "version": 7
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

...