Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: add metadata protocol change

...

  • Allows clients in any language to administrate Kafka (Wire protocol is supported by any language)
  • Provides public client for performing admin operations
  • Ensures integration test code in other projects and clients maintains compatibility
  • Prevents users from needing to use the Command classes and work around standard output and system exits
  • Removing the need for admin scripts (kafka-configs.sh, kafka-topics.sh, etc) to talk directly to Zookeeper.
  • Allows ZNodes to be completely locked down via ACLs
  • Further hides the Zookeeper details of Kafka

Public Interfaces

...

Command-line Options

A few extra options will be added to kafka-configs.sh:

...

Code Block
languagescala
titleBootstrap Server Option
val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The Kafka servers to connect to, separated by commas, for instance "localhost:9091,localhost:9092". In case of providing this, a direct Zookeeper connection won't be required.")
                      .withRequiredArg
                      .describedAs("server to connect to")
                      .ofType(classOf[String])
val commandConfigOpt = parser.accepts("command-config", "Property file containing connection configs to be passed to the AdminClient. " +
                      "This is used only with --bootstrap-server option.")
                      .withRequiredArg
                      .describedAs("command config property file")
                      .ofType(classOf[String])

Protocol Changes

Currently KafkaAdminClient.describeTopics() and KafkaAdminClient.listTopics() uses the Metadata protocol to acquire topic information. The returned response however won't contain the topics that are under deletion but couldn't complete yet (for instance because of some replicas offline), therefore it is not possible to implement the current client's "marked for deletion" feature. To get around this we can introduce some changes in the Metadata protocol, such as:

  • Cache topics that are under deletion but some of their replicas are offline.
  • Create a new error, called TOPIC__DELETION_IN_PROGRESS
  • Bump the Metadata request version. The format of the protocol won't change, only the fact that there is a new Error type that we're introducing, but that requires bumping the protocol as old clients won't be able to handle it and most probably end up in an UNKNOWN_SERVER_ERROR.
  • Smarten up the KafkaApis.handleTopicMetadataRequest to also return the list of topics under deletion with the above error

Code Block
titleTOPIC_DELETION_IN_PROGRESS
public enum Errors {
    // ...
    TOPIC_DELETION_IN_PROGRESS(74, "Topic deletion is in progress.",
        TopicDeletionInProgressException::new);
    // ...
}


public class TopicDeletionInProgressException extends ApiException {

    private static final long serialVersionUID = 4767321103391338488L;

    public TopicDeletionInProgressException(String message) {
        super(message);
    }
}


Proposed Changes

The change proposed in this KIP is to add an extra option as stated above and to migrate create, delete, list and describe operations to use a broker connection. This would be a backward compatible change, meaning the zookeeper option would be still available and fully working until a further point in time but hopefully would deprecate it as part of this KIP.

...

The existing tests will be run with the --bootstrap-server mode too. Additionally we can refactor some of the kafka-topics.sh usages in the smokes to use the AdminClient mode.

Rejected Alternatives

Protocol Changes

At an early stage of the KIP discussion it occured that there is a need for a protocol that would handle topic partition changes, such as increasing the partition number. It got rejected as a similar api, called CreatePartitions already exists and we don't need a new protocol.

...