Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can take last a few seconds (~10 seconds based on defaults values). Depending on how laggy the standby B was prior to A being killed, t4 can take few seconds-minutes. While  While this behavior favors consistency over availability at all times, the long unavailability window might be undesirable for certain classes of applications (e.g simple caches or dashboards). These applications might also be able tolerate eventual consistency.

This issue KIP aims to also expose information about standby B to R, during each rebalance such that the queries can be routed by an application to a standby to serve stale reads, choosing availability over consistency.

Public Interfaces

additional metadata, that would help performing highly available interactive queries against Streams state. Specifically

  • Exposes information about the current lag (time/offset) of the state stores, at each Kafka Streams instance locally.
  • During rebalance, AssignmentInfo also provides host information about standbys for all topic partitions globally

Thus an application relying on IQs can 

  • Route queries to standby replicas for better availability, if active is currently down (window t2-t4 discussed above) 
  • Implement a control plane to exchange lag information (e.g piggybacked on requests or heartbeats) to choose the best standby to route to 
  • Or even choose to fail the query if all the replicas are lagging behind very much (e.g node S was down for a long time before time t0, and was still catching up when node A went down.)

It's worth noting that with this change, Streams API can be used to build stateful application that offer different tradeoffs for consistency and availability. 

Public Interfaces

A single StreamsMetadata object represents the state of a Kafka Streams instance. It already has methods to obtain host/port of the instance, stores/topic partitions it hosts as an active. We will extend this class to also add standby stores/topic partitions, which will be filled in based on the AssignmentInfo object obtained during rebalance. 

Code Block
languagejava
themeConfluence
titleStreamsMetadata.java
package org.apache.kafka.streams.state;

public class StreamsMetadata {
...
    private final Set<TopicPartition> standbyTopicPartitions;
    private final Set<String> standbyStateStoreNames;

    public Set<TopicPartition> standbyTopicPartitions() {
        return standbyTopicPartitions;
    }

    public Set<String> standbyStateStoreNames() {
        return topicPartitions;
}
...
}standbyStateStoreNames;
    }
...
}


Changes to org.apache.kafka.streams.KafkaStreams,  to expose instances that host standby state stores. We will introduce a standby equivalent for each of the four current methods, that expose active replica host information. In addition, Javadocs for existing methods will clearly reflect that it returns active replica information. 

Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...

  // Actual javadoc will be comprehensive, notes below are for illustration for the KIP
  /**
   * Returns {@link StreamsMetadata} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores 
   */
  public Collection<StreamsMetadata> allStandbyMetadata() {}

  /**
   * Returns {@link StreamsMetadata} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores for the given store.
   */
  public Collection<StreamsMetadata> allStandbyMetadataForStore(final String storeName) {}

  /**
   * Returns {@link StreamsMetadata} for all instances that hosts the given key as a standby
   * for the given store. 
   */
  public <K> Collection<StreamsMetadata> standbyMetadataForKey(final String storeName,
                                                               final K key,
                                                               final Serializer<K> keySerializer) {}

  /**
   * Returns {@link StreamsMetadata} for all instances that hosts the given key as a standby,
   * determined by the given partitioner, for the given store. 
   */
  public <K> Collection<StreamsMetadata> standbyMetadataForKey(final String storeName,
                                                               final K key,
                                                               final StreamPartitioner<? super K, ?> partitioner) {}

...


Changes to org.apache.kafka.streams.KafkaStreams, to expose lag for the given state store or across all stores hosted by the Streams instance (either as active or standby)


Code Block
<WIP>


Proposed Changes

In the current code, t0 and t1 serve queries from Active(Running) partition. For case t2, we are planning to return List<StreamsMetadata> such that it returns <StreamsMetadata(A), StreamsMetadata(B)> so that if IQ fails on A, the standby on B can serve the data by enabling serving from replicas. This still does not solve case t3 and t4 since B has been promoted to active but it is in Restoring state to catchup till A’s last committed position as we don’t serve from Restoring state in active and new replica on R is building itself from scratch. Both these cases can be solved if we start serving from Restoring state of active as well, since it is almost equivalent to previous active.

...