Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will introduce a new class  KeyQueryMetadata, which contains all the metadata corresponding to a queried key, specifically it contains the topicPartition value to which the key belongs, so that it can used to correlate with the lag information

Code Block
languagejava
titleKeyQueryMetadata
package org.apache.kafka.streams;

public class KeyQueryMetadata {

private Key K;
private String StoreName;
private StreamsMetadata activeTaskMetadata;
private List<StreamsMetadata> standbyTaskMetadataList;
private TopicPartition topicPartition;

...

} // Metadata about Streams instance that hosts the key as the active instance
    private StreamsMetadata activeMetadata;
    // Metadata about Streams instances that host the key as standbys
    private List<StreamsMetadata> standbyMetadata;
    // Topic Partition corresponding to the key.
    private TopicPartition topicPartition;

    // Standard getters/setters will be added.
    ...

}



Changes to org.apache.kafka.streams.KafkaStreams,  to expose instances that host standby state Changes to org.apache.kafka.streams.KafkaStreams,  to expose instances that host standby state stores. We will introduce a standby equivalent for each of the four current methods, that expose active replica host information. In addition, Javadocs for existing methods will clearly reflect that it returns active replica information. 

Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...

  // Actual javadoc will be comprehensive, notes below are for illustration for the KIP
  /**
   * Returns {@link StreamsMetadata} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores 
   */
  public Collection<StreamsMetadata> allStandbyMetadata() {}

  /**
   * Returns {@link StreamsMetadata} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores for the given store.
   */
  public Collection<StreamsMetadata> allStandbyMetadataForStore(final String storeName) {}

  /**
   * Returns {@link KeyQueryMetadata} forcontaining all instancesmetadata thatabout hostshosting the given key as a standby
   * for the given store. 
   */
  public <K> KeyQueryMetadata standbyMetadataForKeyallMetadataForKey(final String storeName,
                                                               final K key,
                                                               final Serializer<K> keySerializer) {}

  /**
   * Returns {@link KeyQueryMetadata} forcontaining all instancesmetadata thatabout hostshosting the given key asfor athe standby,given store, using the
   * determined by the givensupplied partitioner, for the given store. 
   */
  public <K> KeyQueryMetadata standbyMetadataForKeyallMetadataForKey(final String storeName,
                                                final K key,
                                                final StreamPartitioner<? super K, ?> partitioner) {}

...
}


Changes to org.apache.kafka.streams.KafkaStreams, to expose lag for the given state store or across all stores hosted by the Streams instance (either as active or standby). We will introduce a new Public API class `StoreLagInfo` (inspired by HostInfo). The lag information would be computed by periodically (configurable) reading the record at the tail of the changelog, to obtain the timestamp for the ConsumerRecord. 


Code Block
package org.apache.kafka.streams;

public class    final K key,StoreLagInfo {

  // Estimate of number messages the store is behind by from the tail          of the changelog
  private long messageLagEstimate;

  // Estimate of number of ms the store is behind from the tail of the changelog.
  private long timeLagEstimateMs;

  // Store name 
  private String storeName;

  // Changelog topic finalpartition
 StreamPartitioner<? superprivate K, ?> partitioner) {}

...
}

Changes to org.apache.kafka.streams.KafkaStreams, to expose lag for the given state store or across all stores hosted by the Streams instance (either as active or standby). 

TopicPartition topicPartition;
  
  ...
  // standard getters/setters will be added
}

We will introduce two new methods into We will introduce a new Public API class `StoreLagInfo` (inspired by HostInfo). The lag information would be computed by periodically (configurable) reading the record at the tail of the changelog, to obtain the timestamp for the ConsumerRecord. 


Code Block
package org.apache.kafka.streams;

public class StoreLagInfo {

public class KafkaStreams {

...

  /**
   * Returns lag information for all the stores local to this Kafka instance
   */
  public List<StoreLagInfo> allLagInfo() {}


  // Estimate of number messages the store is behind by from the tail of the changelog
  private long messageLagEstimate;

  // Estimate of number of ms the store is behind from the tail of the changelog.
  private long timeLagEstimateMs;

  // Store name 
  private String storeName;

  // Changelog topic partition ??
  private TopicPartition topicPartition;
  
  ...
  // standard getters/setters will be added
}

We will introduce two new methods into 

Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...

  /**
   * Returns lag information for all the stores local to this Kafka instance
   */
  public List<StoreLagInfo> allLagInfo() {}


  /**
   * Returns lag information for all topic partitions for a given store
   */
  public List<StoreLagInfo> lagInfoForStore(String storeName) {}



...
} // end of KafkaStreams 

Below is some pseudo code to show how an real IQ application would use these two sets of APIs

**
   * Returns lag information for all topic partitions for a given store
   */
  public List<StoreLagInfo> lagInfoForStore(String storeName) {}



...
} // end of KafkaStreams 


Below is some pseudo code to showing sample usage of these two APIs by a real Streams application.


Code Block
// Global map containing latest lag information across hosts. This is collected by the Streams application instances outside of Streams, just relying on the local lag APIs in each. 
// Exact communication mechanism is left to the application. Some options : a gossip protocol, maintain another single partition Kafka topic to implement lag broadcast
final Map<HostInfo, List<StoreLagInfo>> globalLagInformation;

// Key which needs to be routed
final K key;

// Store to be queried
final String storeName;

// Fetch the metadata related to the key
KeyQueryMetadata queryMetadata = allMetadataForKey(store, key, serializer);

// Acceptable lag for the query
final long acceptableMessageLag = 10000;

// Stream of active + standby metadata
Stream<StreamsMetadata> allHostMetadata = Stream.concat(
    Stream.of(queryMetadata.activeMetadata()).stream(), 
    queryMetadata.standbyMetadata().stream())

// filter out all the hosts with more than acceptable lag for the TopicPartition where the key resides using queryMetadata.topicPartition()
List<StreamsMetdata> inSyncHostMetadata = hostMetadata.filter(metadata -> {
      StoreLagInfo lagForHostPartition = globalLagInforation.get(metadata.hostInfo()).stream()
          .filter(lagInfo -> lagInfo.storeName().equals(storeName) && lagInfo.topicPartition().equals(queryMetadata.topicPartition()))
          .findAny().get()
      return lagForHostPartition.messageLagEstimate() < acceptableMessageLag;
Code Block
Map<HostInfo, List<StoreLagInfo>> globalLagInformation = // collected by gossiping amongst the Streams App instances, using the local lag APIs in each. 
K key = // key which needs to be routed.
KeyQueryMetadata metadata = // active + standby replicas hosting the key , obtained by calling metadataForKey() and standbyMetadataForKey()

// filter out all the hosts with more than acceptable lag for the TopicPartition where the key resides using metadata.getTopicPartition()
List<StreamsMetdata> inSyncHostMetadata = hostMetadata.stream().filter(metadata -> {
      List<StoreLagInfo> lagsForHost = globalLagInforation.get(metadata.hostInfo())
      ..
}).collect(Collectors.toList()); 

// Proceed to query, as documented today
query(store, key, inSyncHostMedata);


Proposed Changes

In the current code, t0 and t1 serve queries from Active(Running) partition. For case t2, we are planning to return List<StreamsMetadata> such that it returns <StreamsMetadata(A), StreamsMetadata(B)> so that if IQ fails on A, the standby on B can serve the data by enabling serving from replicas. This still does not solve case t3 and t4 since B has been promoted to active but it is in Restoring state to catchup till A’s last committed position as we don’t serve from Restoring state in active and new replica on R is building itself from scratch. Both these cases can be solved if we start serving from Restoring state of active as well, since it is almost equivalent to previous active.

...