Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...

  // Actual javadoc will be comprehensive, notes below are for illustration for the KIP
  /**
   * Returns {@link StreamsMetadata} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores 
   */
  public Collection<StreamsMetadata> allStandbyMetadata() {}

  /**
   * Returns {@link StreamsMetadata} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores for the given store.
   */
  public Collection<StreamsMetadata> allStandbyMetadataForStore(final String storeName) {}

  /**
   * Returns {@link StreamsMetadata} for all instances that hosts the given key as a standby
   * for the given store. 
   */
  public <K> Collection<StreamsMetadata> standbyMetadataForKey(final String storeName,
                                                               final K key,
                                                               final Serializer<K> keySerializer) {}

  /**
   * Returns {@link StreamsMetadata} for all instances that hosts the given key as a standby,
   * determined by the given partitioner, for the given store. 
   */
  public <K> Collection<StreamsMetadata> standbyMetadataForKey(final String storeName,
                                                               final K key,
                                                               final StreamPartitioner<? super K, ?> partitioner) {}

...

}


Changes to org.apache.kafka.streams.KafkaStreams, to expose lag for the given state store or across all stores hosted by the Streams instance (either as active or standby)


We will introduce a new Public API class `StoreLagInfo` (inspired by HostInfo). The lag information would be computed by periodically (configurable) reading the record at the tail of the changelog, to obtain the timestamp for the ConsumerRecord. The 


Code Block
package org.apache.kafka.streams;

public class StoreLagInfo {

  // Estimate of number messages the store is behind by from the tail of the changelog
  private long messageLagEstimate;

  // Estimate of number of ms the store is behind from the tail of the changelog.
  private long timeLagEstimateMs;

  // Store name 
  private String storeName;

  // Changelog topic partition ??
  private TopicPartition topicPartition;
  
  ...
  // standard getters/setters will be added
}

We will introduce two new methods into 


Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...

  /**
   * Returns lag information for all the stores local to this Kafka instance
   */
  public List<StoreLagInfo> allLagInfo() {}


  /**
   * Returns lag information for all topic partitions for a given store
   */
  public List<StoreLagInfo> lagInfoForStore(String storeName) {}



...
} // end of KafkaStreams 


Proposed Changes

...

With this KIP, the onus is left on a user of how much lag they are okay to serve queries with. Since, after this KIP there is the capability of serving from a restoring active as well as running standby task and each response returns the lag along with the actual value, so a user can either decide to discard it or return it back to the client


Open items : Navinder Pal Singh Brar FYI 

  • How frequently do we fetch the changelog offsets today? Is there a config? Need to introduce a config to control how frequently we fetch the tail record from changelog.
  • lagInfoForStore(String storeName) is not quite right. We need to be able to use StreamsMetadata object and find the right StreamsLagInfo object from the list

Compatibility, Deprecation, and Migration Plan

...