Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Reverted from v. 6

...

It is increasingly common for organisations to have multiple Kafka clusters and there are a number of situations where it would be useful if the different clusters could be uniquely identified: monitoring, auditing, log aggregation, preventing clients and brokers from connecting to the wrong cluster and more. This KIP focuses on the monitoring and auditing use cases while ensuring that the design can be evolved to satisfy other use cases (this is covered in more detail in the "Potential Future Improvements" section).

...

We will add an id field to the Cluster class, which is populated from MetadataResponse and accessible from the internal Metadata class . We will also introduce the following class and interfaceand the newly introduced ClusterListener interface:

 

package org.apache.kafka.common;
classinterface ClusterResourceClusterListener {
  private final String clusterId;
  public ClusterResource(String clusterId) {
    this.clusterId = clusterId;
  }
  public String clusterId() {
    return clusterId;
  }
}
 
package org.apache.kafka.common;
interface ClusterResourceListener {
  void onUpdate(ClusterResource cluster);
}

 

void onClusterUpdate(Cluster cluster);
}
 

Note that Cluster is currently a public API class, but it is not exposed to users via public APIs. ClusterListener changes the latter.

Client interceptors, serializers and metric reporters who optionally implement ClusterResourceListener the interface above will receive an instance of ClusterResource once the cluster id is available. For clients, that is when a client receives a metadata response. For brokers, that is during start-up, shortly after the connection to ZooKeeper is established (the broker case is relevant for metric reporters, only)updated cluster once each client receives it. Note that this is optional and existing implementations will not be affected.

ZooKeeper and Tools

We propose the introduction of a new ZooKeeper znode /cluster/id where we store a unique and immutable id automatically generated by the first broker that is able to create the znode. For consistency with other znodes stored by Kafka, we will store the data as JSON. An example follows:

...

The implementation of getOrGenerateClusterId() is as follows:

...

straightforward. It first tries to get the cluster id

...

from /cluster/id

...

.

...

If the znode does not exist

...

, it generates an id via UUID.randomUUID(),

...

converts it to a String via URL-safe Base64 encoding and

...

tries to create the znode. If the creation succeeds,

...

the generated id

...

is returned. If it fails with a ZkNodeExistsException, then another broker won the race, so we can just retrieve the cluster id from the expected znode path

...

and return it.

...

The returned cluster id will be logged at info level.

We will introduce a clusterId method to KafkaServer, which will be used by KafkaApis.handleTopicMetadataRequest to populate the cluster_id field in MetadataResponse version 2.

...

A new protocol version will be added for the Metadata protocol message, so it should not affect existing clients.

ClusterResourceListener ClusterListener is a new interface so existing serializers, client interceptors and metric reports will not be affected unless they choose to implement it.

Tools that want to take advantage of the cluster id while supporting older brokers will have to handle the fact that the older brokers won't have a cluster id.Moving a broker to a new ZooKeeper path may cause the broker to fail to start with an error if the cluster id in ZooKeeper is inconsistent with the one stored in meta.properties. An admin would have to either change meta.properties in the broker or update the value in ZooKeeper.

Test Plan

  1. Extend Metadata protocol tests to check that the cluster id is always returned.

  2. Modify the existing upgrade system tests to verify that the cluster id is set after the upgrade.

  3. Test that the cluster id is exposed correctly in client and broker metrics.

  4. Test that verifies that serializers, client interceptors and metric reporters that implement ClusterResourceListenerClusterListener receive the cluster id once a metadata response is received.

  5. Test that metric reporters in the broker receive the cluster id once it's available.
  6. Test that cluster id generation works correctly and that the broker fails to start if the locally stored cluster id is inconsistent with the one in ZooKeeper.

Rejected Alternatives

1. Add a method in each interceptor interface that is called on metadata updates to pass the cluster id (and potentially other metadata)

This seems like a good option, but it's not backwards compatible while we still support Java 7. In Java 8, we would have the option to introduce a default method with an empty implementation. This is also specific to interceptors. If we wanted to apply the same approach to metric reporters and serializers, we would probably want a common interface (as in the current proposal).

2. Expose the cluster id in ProducerRecord and ConsumerRecord

This would make it available to some of the client interceptor methods. The cluster id looks out of place in either of these classes as they represent records while the cluster id is a mostly static piece of data.

3. Don't expose the cluster id to client pluggable interfaces

We leave it to the implementations to do the metadata request themselves if they need the data. Since we don't have public API for this yet, it would require using internal request classes or reimplementing them.

4. Pass the cluster id via the configure method of Configurable classes

In order to make the cluster id available via the configure method, we would need to delay its invocation until after we get the first metadata response for each client.

The configure method of interceptors and serializers would no longer be invoked during the constructor of KafkaConsumer and KafkaProducer. Instead, it would be invoked after the first metadata response is received. This could pose problems for implementations that were relying on the previous behaviour. Furthermore, the first ProducerInterceptor.onSend() call would happen after the first metadata response is received instead of immediately after KafkaProducer.send() is called.

5. Pass an instance Cluster to the listener

Using Cluster is appealing because it has the information we need and it is already a public class, so we would not need to introduce a new class with a name that can potentially be confusing. There are a few issues, however:

  • On the broker, we would have to invoke the listener on every topic/partition change instead of just once during broker startup
  • Cluster only includes one endpoint, which makes sense for the client but not the broker where it's not clear which endpoint should be used
  • It is unclear if it's useful for serializers, interceptors and metric reporters to know all brokers endpoints and topic/partition metadata

     

    Potential Future Improvements

    1. Expose cluster information including cluster id via AdminClient: this is likely to happen as part of KIP-4.

    2. Add a human-readable cluster name to complement the id. This is useful, but it's worth exploring separately as there are a few options in the design space. One option that looks promising is to allow user-defined tags on resources (cluster, brokers, topics, etc.)

    3. Use the cluster id to ensure that brokers are connected to the right cluster: it's useful, but something that can be done later via a separate KIP. One of the discussion points is how the broker knows its cluster id (e.g. via a config or by storing it after the first connection to the cluster).

    4. Use the cluster id to ensure that clients are connected to the right cluster: given that clients could potentially connect to multiple clusters for failover reasons (i.e. bootstrap.servers can point to a VIP), it may make sense for the config be a list of cluster ids or a cluster id regex.

    5. Expose cluster id as a client metric. Kafka Metrics doesn't support non-numeric metric values at the moment so it would have to be extended to support that.

    ...