Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add "staleStores" flag for IQ, as discussed on the mailing list.

...

Changes to org.apache.kafka.streams.KafkaStreams, to introduce two methods for querying the metadata specific to a given key/store and one method for fetching offset lag information for all stores hosts locally on that streams instance. In addition, we will add deprecated annotation to the existing `metadataFoKey()` methods and their javadocs will clearly reflect that it returns active replica information.  Also, we introduce a new method to get stores for querying with IQ that includes a flag indicating whether to allow queries over standbys and restoring stores, or only on running active ones. The behavior of the pre-existing "store(name, type)" method is unchanged: it only allows querying running active stores.

Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store. 
   */
  public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
                                                  final K key,
                                                  final Serializer<K> keySerializer) {}

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using the
   * the supplied partitioner 
   */
  public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
                                                  final K key,
                                                  final StreamPartitioner<? super K, ?> partitioner) {}
...

 /**
   * Deprecating this function to get more users accepted to queryMetadataForKey.
   */
 public <K> StreamsMetadata metadataForKey(final String storeName,
                                              final K key,
                        
Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store. 
   */
  public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
                      final Serializer<K> keySerializer) {}

 /**
   * Deprecating this function to get more users accepted to queryMetadataForKey.
   */
 public <K> StreamsMetadata metadataForKey(final String storeName,
  final K key,
                                          final K key,
      final Serializer<K> keySerializer) {}

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using the
   * the supplied partitioner 
   */
  public <K> KeyQueryMetadata queryMetadataForKey(final StreamPartitioner<? Stringsuper storeName,K, ?> partitioner) {}

  /**
   * Returns mapping from store name to another map of partition to offset lag info, for all stores local to this Streams instance. It includes both active and standby store partitions, with active partitions always reporting 0 lag. 
   */
  public Map<String, Map<Integer, Long>>  final K key,allLocalOffsetLags() {}

  /**
   * Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's
   * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
   * The returned object can be used to query the {@link StateStore} instances.
   *
 final StreamPartitioner<? super* K,@param ?>storeName partitioner) {}
...

 /**
   * Deprecating this function to getname moreof usersthe acceptedstore to queryMetadataForKey.find
   */
 public <K> StreamsMetadata metadataForKey(final String storeName,
        @param queryableStoreType  accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
   * @param includeStaleStores      If false, only permit queries on the active replica for a partition, and only if the
   *               final K key,
           task for that partition is running. I.e., the state store is not a standby replica,
   *                  final Serializer<K> keySerializer) {}

 /**
   * Deprecating this functionand toit getis morenot usersrestoring acceptedfrom tothe queryMetadataForKeychangelog.
   */
   public <K> StreamsMetadata metadataForKey(final String storeName,
                    If true, allow queries on standbys and restoring replicas in addition to active ones.
   * @param <T>        final K key,
       return type
   * @return A facade wrapping the local {@link StateStore} instances
   * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
   * {@code queryableStoreType} doesn't exist
  final StreamPartitioner<?*/
 super K,public ?><T> partitioner) {}

  /**
   * Returns mapping from store name to another map of partition to offset lag info, for all stores local to this Streams instance. It includes both active and standby store partitions, with active partitions always reporting 0 lag. 
   */
  public Map<String, Map<Integer, Long>> allLocalOffsetLags() {}
T store(final String storeName,
                     final QueryableStoreType<T> queryableStoreType,
                     final boolean includeStaleStores) {}

...


}


Below is some pseudo code to showing sample usage of these two APIs by a real Streams application.

...