Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Exposes information about the current lag (time/offset) of the state stores, at each Kafka Streams instance locally.
  • During rebalance, AssignmentInfo also provides host information about standbys for all topic partitions globally

...

We will introduce a new class  KeyQueryMetadata, which contains all the metadata routing information corresponding to a queried key, specifically . Notably it contains the topicPartition partition value to which the key belongs, so that it can used to correlate with the lag information

Code Block
languagejava
titleKeyQueryMetadata
package org.apache.kafka.streams;

public class KeyQueryMetadata {
    // MetadataActive about Streamsstreams instance that hostsfor the key as the active instance
    private StreamsMetadataHostInfo activeMetadataactiveHost;
    // Metadata about Streams instances that host the key as standbys
    private List<StreamsMetadata>List<HostInfo> standbyMetadatastandbyHosts;
    // TopicStore Partitionpartition corresponding to the key.
    private TopicPartitionint topicPartitionpartition;

    // Standard getters/setters will be added.
    ...

}

Changes to org.apache.kafka.streams.KafkaStreams,  to expose instances that host standby state stores. We will introduce a standby equivalent for each of the four current methods, that expose active replica host information. In addition, Javadocs for existing methods will clearly reflect that it returns active replica information. We will also introduce a LagInfo, class that encapsulates the lag information

Code Block
package org.apache.kafka.streams;

public class KafkaStreamsLagInfo {

...

   // Actualprivate javadocfinal will be comprehensive, notes below are for illustration for the KIP
  /**long currentOffsetPosition;

    private final long endOffsetPosition;

   * Returnsprivate {@linkfinal StreamsMetadatalong offsetLag;
    ...
}


Changes to org.apache.kafka.streams.KafkaStreams, to introduce two methods for querying the metadata specific to a given key/store and one method for fetching offset lag information for all stores hosts locally on that streams instance. In addition, we will add deprecated annotation to the existing `metadataFoKey()` methods and their javadocs will clearly reflect that it returns active replica information. Also, we introduce a new method to get stores for querying with IQ that includes a flag indicating whether to allow queries over standbys and restoring stores, or only on running active ones. The behavior of the pre-existing "store(name, type)" method is unchanged: it only allows querying running active stores.

Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores 
   */
  public Collection<StreamsMetadata> allStandbyMetadata() {}

  /**
   * Returns {@link StreamsMetadata} for each {@code KafkaStreams} instance of this application,
   * that hosts standby state stores for the given store.
   */
  public Collection<StreamsMetadata> allStandbyMetadataForStore(final String storeName) {}

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store. 
   */
  public <K> KeyQueryMetadata allMetadataForKeyqueryMetadataForKey(final String storeName,
                                                  final K key,
                                                  final Serializer<K> keySerializer) {}

  /**
   * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using the
   * the supplied partitioner 
   */
  public <K> KeyQueryMetadata allMetadataForKeyqueryMetadataForKey(final String storeName,
                                                  final K key,
                                                  final StreamPartitioner<? super K, ?> partitioner) {}

...
}

Changes to org.apache.kafka.streams.KafkaStreams, to expose lag for the given state store or across all stores hosted by the Streams instance (either as active or standby). We will introduce a new Public API class `StoreLagInfo` (inspired by HostInfo). The lag information would be computed by periodically (configurable) reading the record at the tail of the changelog, to obtain the timestamp for the ConsumerRecord. 

Code Block
package org.apache.kafka.streams;

public class StoreLagInfo {

  // Estimate of number messages the store is behind by from the tail of the changelog
  private long messageLagEstimate;

  // Estimate of number of ms the store is behind from the tail of the changelog.
  private long timeLagEstimateMs;

  // Store name 
  private String storeName;

  // Changelog topic partition
  private TopicPartition topicPartition;
  
  ...
  // standard getters/setters will be added
}

We will introduce two new methods into 

Code Block
package org.apache.kafka.streams;

public class KafkaStreams {

...

  /**
   * Returns lag information for all the stores local to this Kafka instance
   */
  public List<StoreLagInfo> allLagInfo() {}


  /**
   * Returns lag information for all topic partitions for a given store
   */
  public List<StoreLagInfo> lagInfoForStore(String storeName) {}



...
} // end of KafkaStreams 

Below is some pseudo code to showing sample usage of these two APIs by a real Streams application.



 /**
   * Deprecating this function to get more users accepted to queryMetadataForKey.
   */
 public <K> StreamsMetadata metadataForKey(final String storeName,
                                              final K key,
                                              final Serializer<K> keySerializer) {}

 /**
   * Deprecating this function to get more users accepted to queryMetadataForKey.
   */
 public <K> StreamsMetadata metadataForKey(final String storeName,
                                              final K key,
                                              final StreamPartitioner<? super K, ?> partitioner) {}

  /**
   * Returns mapping from store name to another map of partition to offset lag info, for all stores local to this Streams instance. It includes both active and standby store partitions, with active partitions always reporting 0 lag in RUNNING state and actual lag from changelog end offset when RESTORING. 
   */
  public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags() {}

  /**
   * Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's
   * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
   * The returned object can be used to query the {@link StateStore} instances.
   *
   * @param storeName           name of the store to find
   * @param queryableStoreType  accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
   * @param includeStaleStores      If false, only permit queries on the active replica for a partition, and only if the
   *                            task for that partition is running. I.e., the state store is not a standby replica,
   *                            and it is not restoring from the changelog.
   *                            If true, allow queries on standbys and restoring replicas in addition to active ones.
   * @param <T>                 return type
   * @return A facade wrapping the local {@link StateStore} instances
   * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
   * {@code queryableStoreType} doesn't exist
   */
  public <T> T store(final String storeName,
                     final QueryableStoreType<T> queryableStoreType,
                     final boolean includeStaleStores) {}

...


}


Below is some pseudo code to showing sample usage of these two APIs by a real Streams application.


Code Block
// Global map containing latest lag information across hosts. This is collected by the Streams application instances outside of Streams, just relying on the local lag APIs in each.
// KafkaStreams#allMetadata() is used by each application instance to discover other application instances to exchange this lag information. 
// Exact communication mechanism is left to the application. Some options : a gossip protocol, maintain another single partition Kafka topic to implement lag broadcast
// Accumulated lag information from other streams instances.
final Map<HostInfo, Map<String, Map<Integer, Long>>> globalLagInformation; 

// Key which needs to be routed
final K key;

// Store to be queried
final String storeName;

// Fetch the metadata related to the key
KeyQueryMetadata queryMetadata = queryMetadataForKey(store, key, serializer);

// Acceptable lag for the query
final long acceptableOffsetLag = 10000;


if (isAlive(queryMetadata.getActiveHost()) {
  // always route to active if alive
  query(store, key, queryMetadata.getActiveHost());
} else {
  // filter out all the standbys with unacceptable lag than acceptable lag & obtain a list of standbys that are in-sync
  List<HostInfo> inSyncStandbys = queryMetadata.getStandbyHosts().stream()
      // get the lag at each standby host for the key's store partition
      .map(standbyHostInfo -> new Pair(standbyHostInfo, globalLagInforation.get(standbyHostInfo).get(storeName).get(queryMetadata.partition())).offsetLag())
      // Sort by offset lag, i.e smallest lag first
      .sorted(Comaparator.comparing(Pair::getRight()) 
      .filter(standbyHostLagPair -> standbyHostLagPair.getRight() < acceptableOffsetLag)
      .map(standbyHostLagPair -> standbyHostLagPair.getLeft())
      .collect(Collectors.toList()); 

  // Query standbys most in-sync to least in-sync
  for (HostInfo standbyHost : inSyncStandbys) {
    try { 
      query(store, key, standbyHost);
    } catch (Exception e) {
      System.err.println("Querying standby failed");
    }
  }
}
Code Block
// Global map containing latest lag information across hosts. This is collected by the Streams application instances outside of Streams, just relying on the local lag APIs in each. 
// Exact communication mechanism is left to the application. Some options : a gossip protocol, maintain another single partition Kafka topic to implement lag broadcast
final Map<HostInfo, List<StoreLagInfo>> globalLagInformation;

// Key which needs to be routed
final K key;

// Store to be queried
final String storeName;

// Fetch the metadata related to the key
KeyQueryMetadata queryMetadata = allMetadataForKey(store, key, serializer);

// Acceptable lag for the query
final long acceptableMessageLag = 10000;

// Stream of active + standby metadata
Stream<StreamsMetadata> allHostMetadata = Stream.concat(
    Stream.of(queryMetadata.activeMetadata()).stream(), 
    queryMetadata.standbyMetadata().stream())

// filter out all the hosts with more than acceptable lag for the TopicPartition where the key resides using queryMetadata.topicPartition()
List<StreamsMetdata> inSyncHostMetadata = hostMetadata.filter(metadata -> {
      StoreLagInfo lagForHostPartition = globalLagInforation.get(metadata.hostInfo()).stream()
          .filter(lagInfo -> lagInfo.storeName().equals(storeName) && lagInfo.topicPartition().equals(queryMetadata.topicPartition()))
          .findAny().get()
      return lagForHostPartition.messageLagEstimate() < acceptableMessageLag;
}).collect(Collectors.toList()); 

// Proceed to query, as documented today
query(store, key, inSyncHostMedata);

Proposed Changes

In the current code, t0 and t1 serve queries from Active(Running) partition. For case t2, we are planning to return List<StreamsMetadata> such that it returns <StreamsMetadata(A), StreamsMetadata(B)> so that if IQ fails on A, the standby on B can serve the data by enabling serving from replicas. This still does not solve case t3 and t4 since B has been promoted to active but it is in Restoring state to catchup till A’s last committed position as we don’t serve from Restoring state in active and new replica on R is building itself from scratch. Both these cases can be solved if we start serving from Restoring state of active as well, since it is almost equivalent to previous active.

The new plan is to enhance the serving query capabilities to include standbys as well and have minimal changes in the core streams code . Initially, when a query comes a user can request for streams Metadata holding the active task(where the key resides) and then query for the key on that host which will return back the response along with the record lag(0 in case of Active/Running tasks) and time lag for the specific store being queried. If such a query fails due to Rebalancing/Node-unavailable user can decide to query streams Metadata holding standby tasks for the partition(where the key resides). This will just return a list of all available standby’s in the system and the user can make IQ query any of those nodes which will return the response, and the record lag and time lag. Based on which user can decide if the user wants to return the response back or call another standby.to expose local lag information to the streams application, such that query routing can happen in the following way

  1. queries get routed to any random streams instance in the cluster ("router" here on)
  2. router then uses Streams metadata to pick active/standby instances for that key's store/partition
  3. router instance also maintains global lag information for all stores and all their partitions, by a gossip/broadcast/heartbeat protocol (done outside of Streams framework), but using KafkaStreams#allMetadata() for streams instance discovery.
  4. router then uses information in 2 & 3 to determine which instance to send the query to  : always picks active instance if alive or the most in-sync live standby otherwise. 

With this KIP, the onus is left on a user to decide how much lag the queries are willing to tolerate, since there is the capability of serving from a restoring active as well as running standby task.

Implementation will incur the following changes

  • AssignmentInfo changes to include Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost;  so that each machine knows which machine holds which standby partitions along with the active partitions(which they currently do). Consequently AssignmentInfo version will be bumped up to VERSION 6

  • Changing signature of setPartitionsByHostState(partitionsByHostState, standbyPartitionsByHost) and to onChange() and further rebuildMetadata() to add Set<TopicPartition> standbyTopicPartitions in StreamsMetadata. This will add standby partitions in the metadata.

  • Addition of StreamsMetadataState::getStandbyMetadataListForKey() to returns a list of StreamsMetadata which contains all the standbys available in the system for the partition. We would have access to allMetadata containing activePartitions as well as standby partitions in the StreamsMetadataState.java with the above changes.partitions in the StreamsMetadataState.java with the above changes.

  • Overloading KafkaStreams#store() to add a new boolean parameter includeStaleStores which will be false by default if current KafkaStreams#store() is called.
  • Renaming partitionsForHost to activePartitionsForHost in StreamsMetadataState.java and partitionsByHostState to activePartitionsByHostState in StreamsPartitionAssignor.java

With this KIP, the onus is left on a user of how much lag they are okay to serve queries with. Since, after this KIP there is the capability of serving from a restoring active as well as running standby task and each response returns the lag along with the actual value, so a user can either decide to discard it or return it back to the client. 

Open items : Navinder Pal Singh Brar FYI 

  • How frequently do we fetch the changelog offsets today? Is there a config? Need to introduce a config to control how frequently we fetch the tail record from changelog.
  • lagInfoForStore(String storeName) is not quite right. We need to be able to use StreamsMetadata object and find the right StreamsLagInfo object from the list. 

Compatibility, Deprecation, and Migration Plan

This KIP affects StreamsMetadata and AssignmentInfo. Currently, StreamsMetadata is not following any versioning so we might have to upgrade the version for it. Also, since this would include AssignmentInfo changes to add replicaPartitionsByHost, we would need to upgrade AssignmentInfo as well.

Rejected Alternatives

  • We also might need to make changes to make the offset lag information tracked in-memory accessible for the lag APIs.


Compatibility, Deprecation, and Migration Plan

  • This KIP affects StreamsMetadata and AssignmentInfo. Our changes to StreamsMetadata will be compatible with existing streams apps. AssignmentInfo changes will add a standbyPartitionsByHost map, we would need to upgrade AssignmentInfo version and we expect the existing version probing mechanism to handle the migration path.
  • Two methods(KafkaStreams#metadataForKey()) are deprecated and users can migrate their code to use the two new methods(KafkaStreams#queryMetadataForKey()) instead.

Rejected Alternatives

  • Adding a StreamsConfig to have a universal enableStandbyServing flag for the application. This would restrict the functionality as there could be multiple stores in an application and we need to have the flexibility to extend different consistency guarantees in such cases, which would be restricted by having a StreamsConfig.
  • Making the Streams APIs lag aware e.g only return standbys within a certain time/offset lag. It did not add a lot of value to push this into Streams. Instead, the KIP keeps Streams agnostic of what acceptable values for lag is and provides the application/user flexibility to choose.
  • Propagating lag information using the Rebalance protocol. Although it seemed like logical thing to do, with KIP-441 in mind, we decided against it due to various reasons. Foremost, the lag information is quickly outdated
  • We decided against making multiple calls to StreamsMetadataState to fetch active tasks, standby tasks and the partition to which key belongs(as it is required to fetch lag) and have a new public class KeyQueryMetadata as it makes the code leaner and returns in a single fetch.
  • Since it's sufficient to just support offsetLag initially without needing time-based lag information right away and adding time-based lag needs broker changes to be implemented efficiently, we decided against implementing time-based offsets for now. 

  • Adding a StreamsConfig to have a universal enableStandbyServing flag for the application. This would restrict the functionality as there could be multiple stores in an application and we need to have the flexibility to extend different consistency guarantees in such cases, which would be restricted by having a StreamsConfig.
  • Making the Streams APIs lag aware e.g only return standbys within a certain time/offset lag. It did not add a lot of value to push this into Streams. Instead, the KIP keeps Streams agnostic of what acceptable values for lag is and provides the application/user flexibility to choose.
  • Propagating lag information using the Rebalance protocol. Although it seemed like logical thing to do, with KIP-441 in mind, we decided against it due to various reasons. Foremost, the lag information is quickly outdated