Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeConfluence
titleStreamsMetadata.java
package org.apache.kafka.streams.state;

public class StreamsMetadata {
...
    private final Set<TopicPartition> standbyTopicPartitions;
    private final Set<String> standbyStateStoreNames;

    public Set<TopicPartition> standbyTopicPartitions() {
        return standbyTopicPartitions;
    }

    public Set<String> standbyStateStoreNames() {
        return standbyStateStoreNames;
    }
...
}


We will introduce a new class  KeyQueryMetadata, which contains all the metadata corresponding to a queried key. 

Code Block
languagejava
titleKeyQueryMetadata
package org.apache.kafka.streams;

public class KeyQueryMetadata {

private Key K;
private String StoreName;
private StreamsMetadata activeTaskMetadata;
private List<StreamsMetadata> standbyTaskMetadataList;
private TopicPartition topicPartition;

...

}



Changes to org.apache.kafka.streams.KafkaStreams,  to expose instances that host standby state stores. We will introduce a standby equivalent for each of the four current methods, that expose active replica host information. In addition, Javadocs for existing methods will clearly reflect that it returns active replica information. 

Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...

  // Actual javadoc will be comprehensive, notes below are for illustration for the KIP
  /**
   * Returns {@link StreamsMetadataKeyQueryMetadata} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores 
   */
  public Collection<StreamsMetadata> allStandbyMetadata() {}

  /**
   * Returns {@link StreamsMetadataKeyQueryMetadata} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores for the given store.
   */
  public Collection<StreamsMetadata>Collection<KeyQueryMetadata> allStandbyMetadataForStore(final String storeName) {}

  /**
   * Returns {@link StreamsMetadataKeyQueryMetadata} for all instances that hosts the given key as a standby
   * for the given store. 
   */
  public <K> Collection<StreamsMetadata>Collection<KeyQueryMetadata> standbyMetadataForKey(final String storeName,
                                                               final K key,
                                                               final Serializer<K> keySerializer) {}

  /**
   * Returns {@link StreamsMetadataKeyQueryMetadata} for all instances that hosts the given key as a standby,
   * determined by the given partitioner, for the given store. 
   */
  public <K> Collection<StreamsMetadata>Collection<KeyQueryMetadata> standbyMetadataForKey(final String storeName,
                                                               final K key,
                                                               final StreamPartitioner<? super K, ?> partitioner) {}

...
}

...

Code Block
Map<HostInfo, List<StoreLagInfo>> globalLagInformation = // collected by gossiping amongst the Streams App instances, using the local lag APIs in each. 
K key = // key which needs to be routed.
Collection<StreamsMetadata>KeyQueryMetadata hostMetadatametadata = // active + standby replicas hosting the key , obtained by calling metadataForKey() and standbyMetadataForKey()

// filter out all the hosts with more than acceptable lag for the TopicPartition where the key resides using metadata.getTopicPartition()
List<StreamsMetdata> inSyncHostMetadata = hostMetadata.stream().filter(metadata -> {
      List<StoreLagInfo> lagsForHost = globalLagInforation.get(metadata.hostInfo())
      ..
}).collect(Collectors.toList()); 




...