You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »


JIRA:  Unable to render Jira issues macro, execution error.

Discussion: here

Motivation

Currently Streams interactive queries (IQ) fail during the time period where there is a rebalance in progress. 

Consider the following scenario in a three-node Streams cluster with node A, node S and node R, executing a stateful sub-topology/topic group with 1 partition and `num.standby.replicas=1`  

  • t0: A is the active instance owning the partition, B is the standby that keeps replicating the A's state into its local disk, R just routes streams IQs to active instance using StreamsMetadata
  • t1: IQs pick node R as router, R forwards query to A, A responds back to R which reverse forwards back the results.
  • t2: Active A instance is killed and rebalance begins. IQs start failing to A
  • t3: Rebalance assignment happens and standby B is now promoted as active instance. IQs continue to fail
  • t4: B fully catches up to changelog tail and rewinds offsets to A's last commit position, IQs continue to fail
  • t5: IQs to R, get routed to B, which is now ready to serve results. IQs start succeeding again


Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can take few seconds (~10 seconds based on defaults values). Depending on how laggy the standby B was prior to A being killed, t4 can take few seconds-minutes. 

While this behavior favors consistency over availability at all times, the long unavailability window might be undesirable for certain classes of applications (e.g simple caches or dashboards). 

This issue aims to also expose information about standby B to R, during each rebalance such that the queries can be routed by an application to a standby to serve stale reads, choosing availability over consistency.

Public Interfaces

  • StreamsMetadata
  • AssignmentInfo

Proposed Changes

  • AssignmentInfo changes to include Map<HostInfo, Set<TopicPartition>> replicaPartitionsByHost;  so that each machine knows which machine holds which replica partitions along with the active partitions(which they currently do).

  • Changing singnature of setPartitionsByHostState(partitionsByHostState, replicaPartitionsByHost) and to onChange() and further rebuildMetadata() to add Set<TopicPartition> replicaTopicPartitions in StreamsMetadata. This will add replica partitions in the metadata.

  • To serve queries from this we will add a flag in hostWithPartitionForKey(boolean enableReplicaServing, String storeName, K key, Serializer<K> keySerializer) to get from users if they are okay to serve from replicas, default would be false. Then it is just a matter of getting the host via getHostWithPartitionForKey() where we would have access to allMetadata containing activePartitions as well as ReplicaPartitions. The decision to serve from either replica or active if a user with okay with serving from replicas can be chosen randomly to share the load across machines(Let me know if you have better thoughts).

Open Questions

  • What if num.standby.replicas=0 or >1

Compatibility, Deprecation, and Migration Plan

This KIP affects StreamsMetadata and AssignmentInfo. Currently, StreamsMetadata is not following any versioning so we might have to upgrade the version for it. Also, since this would include AssignmentInfo changes to add replicaPartitionsByHost, we would need to upgrade AssignmentInfo as well.

Rejected Alternatives

  • Serving data from Active machine during Restoration State. 


  • No labels