You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 27 Next »


JIRA:  Unable to render Jira issues macro, execution error.

Discussion: here

Motivation

Currently Streams interactive queries (IQ) fail during the time period where there is a rebalance in progress. 

Consider the following scenario in a three-node Streams cluster with node A, node S and node R, executing a stateful sub-topology/topic group with 1 partition and `num.standby.replicas=1`  

  • t0: A is the active instance owning the partition, B is the standby that keeps replicating the A's state into its local disk, R just routes streams IQs to active instance using StreamsMetadata
  • t1: IQs pick node R as router, R forwards query to A, A responds back to R which reverse forwards back the results.
  • t2: Active A instance is killed and rebalance begins. IQs start failing to A
  • t3: Rebalance assignment happens and standby B is now promoted as active instance. IQs continue to fail
  • t4: B fully catches up to changelog tail and rewinds offsets to A's last commit position, IQs continue to fail
  • t5: IQs to R, get routed to B, which is now ready to serve results. IQs start succeeding again


Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can last a few seconds (~10 seconds based on defaults values). Depending on how laggy the standby B was prior to A being killed, t4 can take few seconds-minutes. While this behavior favors consistency over availability at all times, the long unavailability window might be undesirable for certain classes of applications (e.g simple caches or dashboards). These applications might also be able tolerate eventual consistency.

This KIP aims to also expose additional metadata, that would help performing highly available interactive queries against Streams state. Specifically

  • Exposes information about the current lag (time/offset) of the state stores, at each Kafka Streams instance locally.
  • During rebalance, AssignmentInfo also provides host information about standbys for all topic partitions globally

Thus an application relying on IQs can 

  • Route queries to standby replicas for better availability, if active is currently down (window t2-t4 discussed above) 
  • Implement a control plane to exchange lag information (e.g piggybacked on requests or heartbeats) to choose the best standby to route to 
  • Or even choose to fail the query if all the replicas are lagging behind very much (e.g node S was down for a long time before time t0, and was still catching up when node A went down.)

It's worth noting that with this change, Streams API can be used to build stateful application that offer different tradeoffs for consistency and availability. 

Public Interfaces

A single StreamsMetadata object represents the state of a Kafka Streams instance. It already has methods to obtain host/port of the instance, stores/topic partitions it hosts as an active. We will extend this class to also add standby stores/topic partitions, which will be filled in based on the AssignmentInfo object obtained during rebalance. 

StreamsMetadata.java
package org.apache.kafka.streams.state;

public class StreamsMetadata {
...
    private final Set<TopicPartition> standbyTopicPartitions;
    private final Set<String> standbyStateStoreNames;

    public Set<TopicPartition> standbyTopicPartitions() {
        return standbyTopicPartitions;
    }

    public Set<String> standbyStateStoreNames() {
        return standbyStateStoreNames;
    }
...
}


We will introduce a new class  KeyQueryMetadata, which contains all the metadata corresponding to a queried key, specifically it contains the topicPartition value to which the key belongs, so that it can used to correlate with the lag information

KeyQueryMetadata
package org.apache.kafka.streams;

public class KeyQueryMetadata {
    // Metadata about Streams instance that hosts the key as the active instance
    private StreamsMetadata activeMetadata;
    // Metadata about Streams instances that host the key as standbys
    private List<StreamsMetadata> standbyMetadata;
    // Topic Partition corresponding to the key.
    private TopicPartition topicPartition;

    // Standard getters/setters will be added.
    ...

}

Changes to org.apache.kafka.streams.KafkaStreams,  to expose instances that host standby state stores. We will introduce a standby equivalent for each of the four current methods, that expose active replica host information. In addition, Javadocs for existing methods will clearly reflect that it returns active replica information. We also plan to introduce two new methods, to expose lag for the given state store or across all stores hosted by the Streams instance (either as active or standby). 

package org.apache.kafka.streams;

public class KafkaStreams {

...

  // Actual javadoc will be comprehensive, notes below are for illustration for the KIP
  /**
   * Returns {@link StreamsMetadata} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores 
   */
  public Collection<StreamsMetadata> allStandbyMetadata() {}

  /**
   * Returns {@link StreamsMetadata} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores for the given store.
   */
  public Collection<StreamsMetadata> allStandbyMetadataForStore(final String storeName) {}

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store. 
   */
  public <K> KeyQueryMetadata allMetadataForKey(final String storeName,
                                                final K key,
                                                final Serializer<K> keySerializer) {}

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using the
   * the supplied partitioner 
   */
  public <K> KeyQueryMetadata allMetadataForKey(final String storeName,
                                                final K key,
                                                final StreamPartitioner<? super K, ?> partitioner) {}

...

  /**
   * Returns lag information for all the stores local to this Kafka instance
   */
  public List<StoreLagInfo> allLagInfo() {}


  /**
   * Returns lag information for all topic partitions for a given store
   */
  public List<StoreLagInfo> lagInfoForStore(String storeName) {}

...


}

We will introduce a new Public API class `StoreLagInfo` (inspired by HostInfo). The lag information would be computed by periodically (configurable) reading the record at the tail of the changelog, to obtain the timestamp for the ConsumerRecord. 

package org.apache.kafka.streams;

public class StoreLagInfo {

  // Estimate of number messages the store is behind by from the tail of the changelog
  private long offsetLagEstimate;

  // Estimate of number of ms the store is behind from the tail of the changelog.
  private long timeLagEstimateMs;

  // Store name 
  private String storeName;

  // Changelog topic partition
  private TopicPartition topicPartition;
  
  ...
  // standard getters/setters will be added
}

Below is some pseudo code to showing sample usage of these two APIs by a real Streams application.

// Global map containing latest lag information across hosts. This is collected by the Streams application instances outside of Streams, just relying on the local lag APIs in each. 
// Exact communication mechanism is left to the application. Some options : a gossip protocol, maintain another single partition Kafka topic to implement lag broadcast
final Map<HostInfo, List<StoreLagInfo>> globalLagInformation;

// Key which needs to be routed
final K key;

// Store to be queried
final String storeName;

// Fetch the metadata related to the key
KeyQueryMetadata queryMetadata = allMetadataForKey(store, key, serializer);

// Acceptable lag for the query
final long acceptableOffsetLag = 10000;

// Stream of active + standby metadata
Stream<StreamsMetadata> allHostMetadata = Stream.concat(
    Stream.of(queryMetadata.activeMetadata()).stream(), 
    queryMetadata.standbyMetadata().stream())

// filter out all the hosts with more than acceptable lag for the TopicPartition where the key resides using queryMetadata.topicPartition()
List<StreamsMetdata> inSyncHostMetadata = hostMetadata.filter(metadata -> {
      StoreLagInfo lagForHostPartition = globalLagInforation.get(metadata.hostInfo()).stream()
          .filter(lagInfo -> lagInfo.storeName().equals(storeName) && lagInfo.topicPartition().equals(queryMetadata.topicPartition()))
          .findAny().get()
      return lagForHostPartition.offsetLagEstimate() < acceptableOffsetLag;
}).collect(Collectors.toList()); 

// Proceed to query, as documented today
query(store, key, inSyncHostMedata);

Proposed Changes

In the current code, t0 and t1 serve queries from Active(Running) partition. For case t2, we are planning to return List<StreamsMetadata> such that it returns <StreamsMetadata(A), StreamsMetadata(B)> so that if IQ fails on A, the standby on B can serve the data by enabling serving from replicas. This still does not solve case t3 and t4 since B has been promoted to active but it is in Restoring state to catchup till A’s last committed position as we don’t serve from Restoring state in active and new replica on R is building itself from scratch. Both these cases can be solved if we start serving from Restoring state of active as well, since it is almost equivalent to previous active.

The new plan is to enhance the serving query capabilities to include standbys as well and have minimal changes in the core streams code. Initially, when a query comes a user can request for streams Metadata holding the active task(where the key resides) and then query for the key on that host which will return back the response along with the record lag(0 in case of Active/Running tasks) and time lag for the specific store being queried. If such a query fails due to Rebalancing/Node-unavailable user can decide to query streams Metadata holding standby tasks for the partition(where the key resides). This will just return a list of all available standby’s in the system and the user can make IQ query any of those nodes which will return the response, and the record lag and time lag. Based on which user can decide if the user wants to return the response back or call another standby.

  • AssignmentInfo changes to include Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost;  so that each machine knows which machine holds which standby partitions along with the active partitions(which they currently do). Consequently AssignmentInfo version will be bumped up to VERSION 6

  • Changing signature of setPartitionsByHostState(partitionsByHostState, standbyPartitionsByHost) and to onChange() and further rebuildMetadata() to add Set<TopicPartition> standbyTopicPartitions in StreamsMetadata. This will add standby partitions in the metadata.

  • Addition of StreamsMetadataState::getStandbyMetadataListForKey() to returns a list of StreamsMetadata which contains all the standbys available in the system for the partition. We would have access to allMetadata containing activePartitions as well as standby partitions in the StreamsMetadataState.java with the above changes.

  • Renaming partitionsForHost to activePartitionsForHost in StreamsMetadataState.java and partitionsByHostState to activePartitionsByHostState in StreamsPartitionAssignor.java
  • We will introduce a new config in Streams, to control how often the lag information is refreshed by reading the tail record of the change-log topic partition 
public static final String LAG_REFRESH_MS_CONFIG = "lag.refresh.ms"; // default : 3 seconds
private static final String LAG_REFRESH_DOC = "The amount of time in milliseconds to wait before fetching latest lag for a state store against its change-log";


With this KIP, the onus is left on a user of how much lag they are okay to serve queries with. Since, after this KIP there is the capability of serving from a restoring active as well as running standby task and each response returns the lag along with the actual value, so a user can either decide to discard it or return it back to the client. 

Compatibility, Deprecation, and Migration Plan

This KIP affects StreamsMetadata and AssignmentInfo. Currently, StreamsMetadata is not following any versioning so we might have to upgrade the version for it. Also, since this would include AssignmentInfo changes to add replicaPartitionsByHost, we would need to upgrade AssignmentInfo as well.

Rejected Alternatives

  • Adding a StreamsConfig to have a universal enableStandbyServing flag for the application. This would restrict the functionality as there could be multiple stores in an application and we need to have the flexibility to extend different consistency guarantees in such cases, which would be restricted by having a StreamsConfig.
  • Making the Streams APIs lag aware e.g only return standbys within a certain time/offset lag. It did not add a lot of value to push this into Streams. Instead, the KIP keeps Streams agnostic of what acceptable values for lag is and provides the application/user flexibility to choose.
  • Propagating lag information using the Rebalance protocol. Although it seemed like logical thing to do, with KIP-441 in mind, we decided against it due to various reasons. Foremost, the lag information is quickly outdated


  • No labels