Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under Discussion"Adopted

Discussion threadhttp://kafka.markmail.org/thread/glcyw3bvngtvobbs

JIRAKAFKA-4093

GitHub PRhttps://github.com/apache/kafka/pull/1830

Released: 0.10.1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • All Kafka clusters should have a cluster id (i.e. it should not be optional) in order to avoid complex logic in downstream tools that require it.

  • No additional work should be required of users when creating or upgrading a Kafka cluster (e.g. no additional mandatory configs should be introduced).
  • The cluster id should be unique and immutable so that external tools can associate data to a particular cluster.a message with a cluster. If the cluster id is allowed to change, the correlation between messages and clusters breaks (i.e. a single cluster will appear as multiple clusters).

  • It is acceptable for users to be able to set or update the cluster id via expert-level tools even though that can cause the uniqueness and immutability guarantees to be violated. Users are recommended not to do this apart from exceptional situations.

  • The cluster id should be available via broker metrics so that it's easily exported to monitoring tools with existing metric reporters (e.g. JmxReporter).

  • The cluster id should be exposed to client and broker metric reporters so that they can tag, categorise or namespace metrics based on the cluster id.

  • Client interceptors should have access to the cluster id so that they can associate it with the message metadata being tracked.

  • Client serializers should have access to the cluster id so that they can include it in the message, if desired (e.g. as part of a standard message header).

...

 
package org.apache.kafka.common;
class ClusterResource {
  private final String clusterId;
  public ClusterResource(String clusterId) {
    this.clusterId = clusterId;
  }
  public String clusterId() {
    return clusterId;
  }
}
 
package org.apache.kafka.common;
interface ClusterResourceListener {
  void onUpdate(ClusterResource cluster);
}

...

Client interceptors, serializers and metric reporters who optionally implement the interface ClusterResourceListener will receive an instance of ClusterResource once the cluster id is available. For clients, that is when a client receives a metadata response (that means that methods like ProducerInterceptor.onSend may be called before the cluster id is available). For brokers, that is during start-up, shortly after the connection to ZooKeeper is established (the broker case is only relevant for metric reporters).

...

This seems like a good option, but it's not backwards compatible while we still support Java 7. In Java 8, we would have the option to introduce a default method with an empty implementation. This is also specific to interceptors. If we wanted to apply the same approach to metric reporters and serializers, we would probably want a common interface (as in the current proposal).

2. Expose the cluster id in ProducerRecord and ConsumerRecord

This would make it available to some of the client interceptor methods. The cluster id looks out of place in either of these classes as they represent records while the cluster id is a mostly static piece of data.

3. Don't expose the cluster id to client pluggable interfaces

We leave it to the implementations to do the metadata request themselves if they need the data. Since we don't have public API for this yet, it would require using internal request classes or reimplementing them.

4. Pass the cluster id via the configure method of Configurable classes

In order to make the cluster id available via the configure method, we would need to delay its invocation until after we get the first metadata response for each client.

The configure method of interceptors and serializers would no longer be invoked during the constructor of KafkaConsumer and KafkaProducer. Instead, it would be invoked after the first metadata response is received. This could pose problems for implementations that were relying on the previous behaviour. Furthermore, the first ProducerInterceptor.onSend() call would happen after the first metadata response is received instead of immediately after KafkaProducer.send() is called. 

5. Pass an instance Cluster to the listener

 

Using Cluster is appealing because it has the information we need and it is already a public class, so we would not need to introduce a new class with a name that can potentially be confusing. There are a few issues, however: 

  • On the broker, we would have to invoke the listener on every topic/partition change instead of just once during broker startup
  • Cluster only includes one endpoint, which makes sense for the client but not the broker where it's not clear which endpoint should be used
  • It is unclear if it's useful for serializers, interceptors and metric reporters to know all brokers endpoints and topic/partition metadata

Potential Future Improvements

  1. Expose cluster information including cluster id via AdminClient: this is likely to happen as part of KIP-4.

  2. Add a human-readable cluster name to complement the id. This is useful, but it's worth exploring separately as there are a few options in the design space. One option that looks promising is to allow user-defined tags on resources (cluster, brokers, topics, etc.). There are also simpler (but less general) alternatives like setting the cluster id via a broker config or via a ZK-based cluster config.

  3. Use the cluster id to ensure that brokers are connected to the right cluster: it's useful, but something that can be done later via a separate KIP. One of the discussion points is how the broker knows its cluster id (e.g. via a config or by storing it after the first connection to the cluster).

  4. Use the cluster id to ensure that clients are connected to the right cluster: given that clients could potentially connect to multiple clusters for failover reasons (i.e. bootstrap.servers can point to a VIP), it may make sense for the config be a list of cluster ids or a cluster id regex.

  5. Expose cluster id as a client metric. Kafka Metrics doesn't support non-numeric metric values at the moment so it would have to be extended to support that.

...