Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...

  /**
   * Returns lag information for all the stores local to this Kafka instance
   */
  public List<StoreLagInfo> allLagInfo() {}


  /**
   * Returns lag information for all topic partitions for a given store
   */
  public List<StoreLagInfo> lagInfoForStore(String storeName) {}



...
} // end of KafkaStreams 


Below is some pseudo code to show how an real IQ application would use these two sets of APIs


Code Block
Map<HostInfo, List<StoreLagInfo>> globalLagInformation = // collected by gossiping amongst the Streams App instances, using the local lag APIs in each. 
K key = // key which needs to be routed.
Collection<StreamsMetadata> hostMetadata = // active + standby replicas hosting the key , obtained by calling metadataForKey() and standbyMetadataForKey()

// filter out all the hosts with more than acceptable lag
Collection<StreamsMetdata> inSyncHostMetadata = filter(hostMetadata, globalLagInforation) 





Proposed Changes

In the current code, t0 and t1 serve queries from Active(Running) partition. For case t2, we are planning to return List<StreamsMetadata> such that it returns <StreamsMetadata(A), StreamsMetadata(B)> so that if IQ fails on A, the standby on B can serve the data by enabling serving from replicas. This still does not solve case t3 and t4 since B has been promoted to active but it is in Restoring state to catchup till A’s last committed position as we don’t serve from Restoring state in active and new replica on R is building itself from scratch. Both these cases can be solved if we start serving from Restoring state of active as well, since it is almost equivalent to previous active.

...