Table of Contents |
---|
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion Accepted
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP aims to enforce offsets.topic.replication.factor
upon upon __consumer_offsets
auto topic creation.
Kafka brokers have a config called "offsets.topic.replication.factor
" that that specify the replication factor for the "__consumer_offsets
" topic topic. The problem is that this config isn't being enforced upon auto topic creation. If an attempt to auto create the internal topic is made when there are fewer brokers than "offsets.topic.replication.factor
", the topic ends up getting created anyway with the current number of live brokers. The current behavior is pretty surprising when you have clients or tooling running as the cluster is getting setup. Even if your cluster ends up being huge, you'll find out much later that __consumer_offsets
was was setup with no replication.
The cluster not meeting the "offsets.topic.replication.factor
" requirement requirement on the internal topic is another way of saying the cluster isn't fully setup yet. The right behavior should be for "offsets.topic.replication.factor
" to to be enforced.
Rationale for the prior behavior can be found in
Jira | ||||||
---|---|---|---|---|---|---|
|
Public Interfaces
Set the offsets.topic.replication.factor
to to 1 in config/server.properties
to to maintain existing single-broker quickstart behavior. Note that this does not change the default offsets.topic.replication.factor
value of 3 in KafkaConfig
.
Proposed Changes
Internal topic creation can happen in five paths:
- By a broker upon
GroupCoordinatorRequest
. - By a broker from
MetadataRequest
querying querying the internal topic even if "auto.create.topics.enable
" is is false. - By a user when using
AdminUtils
. - By a user when running
kafka-topics.sh
(which callsAdminUtils
). - By a broker through
AdminManager
(which callsAdminUtils
) handlingCreateTopicsRequest
. - By a user directly writing to the topic znode in zookeeper.
...
- will now fail topic creation of the internal topic with
GROUP_COORDINATOR_NOT_AVAILABLE
until until the "offsets.topic.replication.factor
" requirement requirement is met. - will now fail topic creation of the internal topic and retain existing behavior of failing fail topic creation of the internal topic with INVALID_REPLICATION_FACTOR until the " offsets.topic.replication.factor " requirement is met.
- will retain existing behavior.
AdminUtils
compares compares cluster size vs. replication factor comparison and throws anInvalidReplicationFactorException
if if the manually specified replication factor isn't met. If the replication factor is met, it creates the topic, ignoring the broker'soffsets.topic.replication.factor
config config. - Same as 3.
- Same as 3.
CreateTopicsResponse
including including an internal topic will returnINVALID_REPLICATION_FACTOR
if if the manually specified replication factor isn't met. - is unrelated, as the zk zookeeper write will not receive any error from kafka.
...
This is a bug fix KIP impacting the setup of new clusters. Users setting up a cluster should keep in mind that the __consumer_offsets
topic topic will not be created until their cluster satisfies the offsets.topic.replication.factor
.
Rejected Alternatives
One rejected alternative was to push __consumer_offsets
topic topic creation logic out of the brokers and into the KafkaController
. Since the KafkaController
can can detect broker membership changes through zookeeper, it can create the __consumer_offets
topic topic as soon as the offsets.topic.replication.factor
is is met. While doable, it is more complicated as it would add even more logic to keep track of in the already complicated KafkaController
and and would additionally require KafkaApis
to to instead lookup cluster readiness based on its MetadataCache
when when responding to GroupCoordinatorRequests GroupCoordinatorRequest
s.