You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: Not created yet

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka and its upstream applications treat internal topics differently from non-internal topics. For example:

  1. Kafka handles topic creation response errors differently for internal topics
  2. Internal topic partitions cannot be added to a transaction
  3. Internal topic records cannot be deleted
  4. Appending to internal topics might get rejected
  5. ......

Clients and upstream applications may define their own internal topics. For example, Kafka Connect defines `connect-configs`, `connect-offsets`, and `connect-statuses`. Clients are fetching the internal topics by sending the MetadataRequest (ApiKeys.METADATA).

However, clients and upstream application cannot register their own internal topics in servers. As a result, servers have no knowledge about client-defined internal topics. They can only test if a given topic is internal or not simply by checking against a static set of internal topic string, which consists of two internal topic names `__consumer_offsets` and `__transaction_state`. As a result, MetadataRequest cannot provide any information about client created internal topics. 

To solve the pain point, I'm proposing support for clients to register and query their own internal topics. 

Public Interfaces

  1. TopicConfig will have a new topic config `internal`, which indicates if the topic is internal or not.
  2. KafkaZkClient will have a new method getInternalTopics() which returns a set of internal topic name strings.

Proposed Changes

Server-side:

The static internal topic testing is defined in Topic.java. At server-side, it's dependents are:

  1. KafkaApis
  2. MetadataCache
  3. ReplicaManager

The coupling of the KafkaController and KafkaApi is complicated. Let's start from a class diagram indicating existing and new class properties and methods. New method and properties proposed are prefixed by `+`

  1. When a topic gets created, clients will pass the topic config `internal` to servers so Zookeeper will aware of the client-created internal topics.
  2. KafkaZkClient will be able to query all the internal topics.
  3. KafkaController can then utilize the information provided by KafkaZkClient and wrap a boolean field in its UpdateMetadataRequests to all the servers, which indicates if the topic is internal or not.

After this process, all the servers will be aware of the latest set of internal topics and can cache internal topics in MedatadaCache. Thus, that KafkaApi can construct the metadata response with the information of all clients created internal topics by referring MetadataCache. 

Client-side:

To get the internal topic information, instead of using the static internal topic testing or implementing their own logic, clients can utilize KafkaAdminClients and make a MetadataRequest (ApiKey.METADATA). 

Compatibility, Deprecation, and Migration Plan

There are no compatibility concerns in this KIP.

Rejected Alternatives

  1. Specify a naming convention that all internal topic should start with the prefix `_`.
    1. It's hard to make all clients adjust their topic names.
  2. Change several public APIs to make the clients pass a flag indicating if the topic is internal or not when it creates a topic. Add a new ZK path such as `topics/internal`.
    1. May require a new flag in TopicCommand
  • No labels