Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Changes to org.apache.kafka.streams.KafkaStreams,  to expose instances that host standby state stores. We will introduce a standby equivalent for each of the four current methods, that expose active replica host information. In addition, Javadocs for existing methods will clearly reflect that it returns active replica information.  We also plan to introduce two new methods, that expose the lag information local to the streams instance

Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...

  // Actual javadoc will be comprehensive, notes below are for illustration for the KIP
  /**
   * Returns {@link StreamsMetadata} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores 
   */
  public Collection<StreamsMetadata> allStandbyMetadata() {}

  /**
   * Returns {@link StreamsMetadata} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores for the given store.
   */
  public Collection<StreamsMetadata> allStandbyMetadataForStore(final String storeName) {}

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store. 
   */
  public <K> KeyQueryMetadata allMetadataForKey(final String storeName,
                                                final K key,
                                                final Serializer<K> keySerializer) {}

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using the
   * the supplied partitioner 
   */
  public <K> KeyQueryMetadata allMetadataForKey(final String storeName,
                                                final K key,
                                                final StreamPartitioner<? super K, ?> partitioner) {}

...
}

Changes to org.apache.kafka.streams.KafkaStreams, to expose lag for the given state store or across all stores hosted by the Streams instance (either as active or standby). We will introduce a new Public API class `StoreLagInfo` (inspired by HostInfo). The lag information would be computed by periodically (configurable) reading the record at the tail of the changelog, to obtain the timestamp for the ConsumerRecord. 

Code Block
package org.apache.kafka.streams;

public class StoreLagInfo {

  // Estimate of number messages the store is behind by from the tail of the changelog
  private long messageLagEstimate;

  // Estimate of number of ms the store is behind from the tail of the changelog.
  private long timeLagEstimateMs;

  // Store name 
  private String storeName;

  // Changelog topic partition
  private TopicPartition topicPartition;
  
  ...
  // standard getters/setters will be added
}


  /**
   * Returns lag information for all the stores local to this Kafka instance
   */
  public List<StoreLagInfo> allLagInfo() {}


  /**
   * Returns lag information for all topic partitions for a given store
   */
  public List<StoreLagInfo> lagInfoForStore(String storeName) {}

...


}


Changes to org.apache.kafka.streams.KafkaStreams, to expose lag for the given state store or across all stores hosted by the Streams instance (either as active or standby). We will introduce a new Public API class `StoreLagInfo` (inspired by HostInfo). The lag information would be computed by periodically (configurable) reading the record at the tail of the changelog, to obtain the timestamp for the ConsumerRecord. We will introduce two new methods into 


Code Block
package org.apache.kafka.streams;

public class KafkaStreamsStoreLagInfo {

...

  /**
   * Returns lag information for all the stores local to this Kafka instance
   */
  public List<StoreLagInfo> allLagInfo() {}


  /**
   * Returns lag information for all topic partitions for a given store
   */
  public List<StoreLagInfo> lagInfoForStore(String storeName) {}



...
} // end of KafkaStreams   // Estimate of number messages the store is behind by from the tail of the changelog
  private long messageLagEstimate;

  // Estimate of number of ms the store is behind from the tail of the changelog.
  private long timeLagEstimateMs;

  // Store name 
  private String storeName;

  // Changelog topic partition
  private TopicPartition topicPartition;
  
  ...
  // standard getters/setters will be added
}



Below is some pseudo code to showing sample usage of these two APIs by a real Streams application.

...