Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store. 
   */
  public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
                                                  final K key,
                                                  final Serializer<K> keySerializer) {}

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using the
   * the supplied partitioner 
   */
  public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
                                                  final K key,
                                                  final StreamPartitioner<? super K, ?> partitioner) {}
...

 /**
   * Deprecating this function to get more users accepted to queryMetadataForKey.
   */
 public <K> StreamsMetadata metadataForKey(final String storeName,
                                              final K key,
                                              final Serializer<K> keySerializer) {}

 /**
   * Deprecating this function to get more users accepted to queryMetadataForKey.
   */
 public <K> StreamsMetadata metadataForKey(final String storeName,
                                              final K key,
                                              final StreamPartitioner<? super K, ?> partitioner) {}

  /**
   * Returns mapping from store name to another map of partition to offset lag info, for all stores local to this Streams instance. It includes both active and standby store partitions, with active partitions always reporting 0 lag. 
   */
  public Map<String, Map<Integer, Long>> allLocalOffsetLags() {}


...


}

...

  • Adding a StreamsConfig to have a universal enableStandbyServing flag for the application. This would restrict the functionality as there could be multiple stores in an application and we need to have the flexibility to extend different consistency guarantees in such cases, which would be restricted by having a StreamsConfig.
  • Making the Streams APIs lag aware e.g only return standbys within a certain time/offset lag. It did not add a lot of value to push this into Streams. Instead, the KIP keeps Streams agnostic of what acceptable values for lag is and provides the application/user flexibility to choose.
  • Propagating lag information using the Rebalance protocol. Although it seemed like logical thing to do, with KIP-441 in mind, we decided against it due to various reasons. Foremost, the lag information is quickly outdated
  • We decided against making multiple calls to StreamsMetadataState to fetch active tasks, standby tasks and the partition to which key belongs(as it is required to fetch lag) and have a new public class KeyQueryMetadata as it makes the code leaner and returns in a single fetch.
  • Since it's sufficient to just support offsetLag initially without needing time-based lag information right away and adding time-based lag needs broker changes to be implemented efficiently, we decided against implementing time-based offsets for now. 

  • Two methods(KafkaStreams#metadataForKey()) are deprecated and users can migrate their code to use the two new methods(KafkaStreams#queryMetadataForKey()) instead.