Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
package org.apache.kafka.streams;

public class StoreLagInfo {

  // Estimate of number messages the store is behind by from the tail of the changelog
  private long messageLagEstimateoffsetLagEstimate;

  // Estimate of number of ms the store is behind from the tail of the changelog.
  private long timeLagEstimateMs;

  // Store name 
  private String storeName;

  // Changelog topic partition
  private TopicPartition topicPartition;
  
  ...
  // standard getters/setters will be added
}

...

Code Block
// Global map containing latest lag information across hosts. This is collected by the Streams application instances outside of Streams, just relying on the local lag APIs in each. 
// Exact communication mechanism is left to the application. Some options : a gossip protocol, maintain another single partition Kafka topic to implement lag broadcast
final Map<HostInfo, List<StoreLagInfo>> globalLagInformation;

// Key which needs to be routed
final K key;

// Store to be queried
final String storeName;

// Fetch the metadata related to the key
KeyQueryMetadata queryMetadata = allMetadataForKey(store, key, serializer);

// Acceptable lag for the query
final long acceptableMessageLagacceptableOffsetLag = 10000;

// Stream of active + standby metadata
Stream<StreamsMetadata> allHostMetadata = Stream.concat(
    Stream.of(queryMetadata.activeMetadata()).stream(), 
    queryMetadata.standbyMetadata().stream())

// filter out all the hosts with more than acceptable lag for the TopicPartition where the key resides using queryMetadata.topicPartition()
List<StreamsMetdata> inSyncHostMetadata = hostMetadata.filter(metadata -> {
      StoreLagInfo lagForHostPartition = globalLagInforation.get(metadata.hostInfo()).stream()
          .filter(lagInfo -> lagInfo.storeName().equals(storeName) && lagInfo.topicPartition().equals(queryMetadata.topicPartition()))
          .findAny().get()
      return lagForHostPartition.messageLagEstimateoffsetLagEstimate() < acceptableMessageLagacceptableOffsetLag;
}).collect(Collectors.toList()); 

// Proceed to query, as documented today
query(store, key, inSyncHostMedata);

...