Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKeyQueryMetadata
package org.apache.kafka.streams;

public class KeyQueryMetadata {
    // Active streams instance for key
    private HostInfo activeHost;
    // Streams instances that host the key as standbys
    private List<HostInfo> standbyHosts;
    // Store partition corresponding to the key.
    private int partition;

    // Standard getters/setters will be added.
    ...

}

...

We will also introduce a LagInfo, class that encapsulates the lag information

Code Block
package org.apache.kafka.streams;

public class LagInfo {

    private final long currentOffsetPosition;

    private final long endOffsetPosition;

    private final long offsetLag;
    ...
}


Changes to org.apache.kafka.streams.KafkaStreams, to introduce two methods for querying the metadata specific to a given key/store and one method for fetching offset lag information for all stores hosts locally on that streams instance. In addition, we will add deprecated .KafkaStreams, to introduce two methods for querying the metadata specific to a given key/store and one method for fetching offset lag information for all stores hosts locally on that streams instance. In addition, we will add deprecated annotation to the existing `metadataFoKey()` methods and their javadocs will clearly reflect that it returns active replica information.  Also, we introduce a new method to get stores for querying with IQ that includes a flag indicating whether to allow queries over standbys and restoring stores, or only on running active ones. The behavior of the pre-existing "store(name, type)" method is unchanged: it only allows querying running active stores.

Code Block
package
Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store. 
   */
  public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
;

public class KafkaStreams {

...

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store. 
   */
  public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
                                                  final K key,
                                                  final Serializer<K> keySerializer) {}

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using the
   * the supplied partitioner 
   */
  public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
                                                  final K key,
                                                  final StreamPartitioner<? super K, ?> partitioner) {}
...

 /**
   * Deprecating this function to get more users accepted to queryMetadataForKey.
   */
 public <K> StreamsMetadata metadataForKey(final String storeName,
                                              final K key,
                                              final Serializer<K> keySerializer) {}

 /**
   * Deprecating this function to get more users accepted to queryMetadataForKey.
   */
 public <K> StreamsMetadata metadataForKey(final String storeName,
        final K key,
                                    final K key,
            final Serializer<K> keySerializer) {}

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using the
   * the supplied partitioner 
  final */
StreamPartitioner<? super publicK, <K>?> KeyQueryMetadata queryMetadataForKey(final String storeName,partitioner) {}

  /**
   * Returns mapping from store name to another map of partition to offset lag info, for all stores local to this Streams instance. It includes both active and standby store partitions, with active partitions always reporting 0 lag in RUNNING state and actual lag from changelog end offset when RESTORING. 
 final K key,*/
  public Map<String, Map<Integer, LagInfo>>   allLocalStorePartitionLags() {}

  /**
   * Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's
   * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
   * The returned object can final StreamPartitioner<? super K, ?> partitioner) {}
...

 /*be used to query the {@link StateStore} instances.
   *
   * Deprecating@param storeName   this function to get more users accepted to queryMetadataForKey.
name of the */
store publicto <K>find
 StreamsMetadata metadataForKey(final String* storeName,
@param queryableStoreType  accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
   * @param includeStaleStores      If false, only permit queries on the active replica for a partition, and only if the
   *      final K key,
                    task for that partition is running. I.e., the state store is not a standby replica,
   *         final Serializer<K> keySerializer) {}

 /**
   * Deprecating this function to get more users accepted to queryMetadataForKey.
  and */
it publicis <K>not StreamsMetadatarestoring metadataForKey(finalfrom Stringthe storeName,changelog.
   *                            If true, allow queries on standbys and restoring replicas in addition to active ones.
  final K* key,
@param <T>                 return type
   * @return A facade wrapping the local {@link StateStore} instances
   * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with final StreamPartitioner<? super K, ?> partitioner) {}

  /**
   * Returns mapping from store name to another map of partition to offset lag info, for all stores local to this Streams instance. It includes both active and standby store partitions, with active partitions always reporting 0 lag. 
   */
  public Map<String, Map<Integer, Long>> allLocalOffsetLags() {}
{@code storeName} and
   * {@code queryableStoreType} doesn't exist
   */
  public <T> T store(final String storeName,
                     final QueryableStoreType<T> queryableStoreType,
                     final boolean includeStaleStores) {}

...


}


Below is some pseudo code to showing sample usage of these two APIs by a real Streams application.

...

Code Block
// Global map containing latest lag information across hosts. This is collected by the Streams application instances outside of Streams, just relying on the local lag APIs in each.
// KafkaStreams#allMetadata() is used by each application instance to discover other application instances to exchange this lag information. 
// Exact communication mechanism is left to the application. Some options : a gossip protocol, maintain another single partition Kafka topic to implement lag broadcast
// Accumulated lag information from other streams instances.
final Map<HostInfo, Map<String, Map<Integer, Long>>> globalLagInformation; 

// Key which needs to be routed
final K key;

// Store to be queried
final String storeName;

// Fetch the metadata related to the key
KeyQueryMetadata queryMetadata = queryMetadataForKey(store, key, serializer);

// Acceptable lag for the query
final long acceptableOffsetLag = 10000;


if (isAlive(queryMetadata.getActiveHost()) {
  // always route to active if alive
  query(store, key, queryMetadata.getActiveHost());
} else {
  // filter out all the standbys with unacceptable lag than acceptable lag & obtain a list of standbys that are in-sync
  List<HostInfo> inSyncStandbys = queryMetadata.getStandbyHosts().stream()
      // get the lag at each standby host for the key's store partition
      .map(standbyHostInfo -> new Pair(standbyHostInfo, globalLagInforation.get(standbyHostInfo).get(storeName).get(queryMetadata.partition())).offsetLag())
      // Sort by offset lag, i.e smallest lag first
      .sorted(Comaparator.comparing(Pair::getRight()) 
      .filter(standbyHostLagPair -> standbyHostLagPair.getRight() < acceptableOffsetLag)
      .map(standbyHostLagPair -> standbyHostLagPair.getLeft())
      .collect(Collectors.toList()); 

  // Query standbys most in-sync to least in-sync
  for (HostInfo standbyHost : inSyncStandbys) {
    try { 
      query(store, key, standbyHost);
    } catch (Exception e) {
      System.err.println("Querying standby failed");
    }
  }
}

...

  • AssignmentInfo changes to include Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost;  so that each machine knows which machine holds which standby partitions along with the active partitions(which they currently do). Consequently AssignmentInfo version will be bumped up to VERSION 6

  • Changing signature of setPartitionsByHostState(partitionsByHostState, standbyPartitionsByHost) and to onChange() and further rebuildMetadata() to add Set<TopicPartition> standbyTopicPartitions in StreamsMetadata. This will add standby partitions in the metadata.

  • Addition of StreamsMetadataState::getStandbyMetadataListForKey() to returns a list of StreamsMetadata which contains all the standbys available in the system for the partition. We would have access to allMetadata containing activePartitions as well as standby partitions in the StreamsMetadataState.java with the above changes.

  • Overloading KafkaStreams#store() to add a new boolean parameter includeStaleStores which will be false by default if current KafkaStreams#store() is called.
  • Renaming partitionsForHost to activePartitionsForHost in StreamsMetadataState.java and partitionsByHostState to activePartitionsByHostState in StreamsPartitionAssignor.java
  • We also might need to make changes to make the offset lag information tracked in-memory accessible for the lag APIs.

...