You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »


JIRA:  Unable to render Jira issues macro, execution error.

Discussion: here

Motivation

Currently Streams interactive queries (IQ) fail during the time period where there is a rebalance in progress. 

Consider the following scenario in a three-node Streams cluster with node A, node S and node R, executing a stateful sub-topology/topic group with 1 partition and `num.standby.replicas=1`  

  • t0: A is the active instance owning the partition, B is the standby that keeps replicating the A's state into its local disk, R just routes streams IQs to active instance using StreamsMetadata
  • t1: IQs pick node R as router, R forwards query to A, A responds back to R which reverse forwards back the results.
  • t2: Active A instance is killed and rebalance begins. IQs start failing to A
  • t3: Rebalance assignment happens and standby B is now promoted as active instance. IQs continue to fail
  • t4: B fully catches up to changelog tail and rewinds offsets to A's last commit position, IQs continue to fail
  • t5: IQs to R, get routed to B, which is now ready to serve results. IQs start succeeding again


Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can take few seconds (~10 seconds based on defaults values). Depending on how laggy the standby B was prior to A being killed, t4 can take few seconds-minutes. 

While this behavior favors consistency over availability at all times, the long unavailability window might be undesirable for certain classes of applications (e.g simple caches or dashboards). 

This issue aims to also expose information about standby B to R, during each rebalance such that the queries can be routed by an application to a standby to serve stale reads, choosing availability over consistency.

Public Interfaces

  • StreamsMetadata

Proposed Changes

In the current code, t0 and t1 serve queries from Active(Running) partition. For case t2, we are planning to return List<StreamsMetadata> such that it returns <StreamsMetadata(A), StreamsMetadata(B)> so that if IQ fails on A, the standby on B can serve the data by enabling serving from replicas. This still does not solve case t3 and t4 since B has been promoted to active but it is in Restoring state to catchup till A’s last committed position as we don’t serve from Restoring state in active and new replica on R is building itself from scratch. Both these cases can be solved if we start serving from Restoring state of active as well, since it is almost equivalent to previous active.

There could be a case where all standby of a partition become unavailable and active and all replicas of that partition are building themselves from scratch, in this case state in active is far behind even though it is in Restoring state. To cater to such cases that we don’t serve from this state we can either add another state before Restoring or check difference between last committed offset and current position. Only if it is within a permissible range (say 10000) we will serve from Restoring state of active.

  • AssignmentInfo changes to include Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost;  so that each machine knows which machine holds which standby partitions along with the active partitions(which they currently do).

  • Changing singnature of setPartitionsByHostState(partitionsByHostState, standbyPartitionsByHost) and to onChange() and further rebuildMetadata() to add Set<TopicPartition> standbyTopicPartitions in StreamsMetadata. This will add standby partitions in the metadata.

  • To serve queries from this we will add a flag in *StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing, String storeName, K key, Serializer<K> keySerializer) to get from users if they are okay to serve from standbys, default would be false. This function returns a list of StreamsMetadata which contains Active first and then all the standbys in the system for the partition. In allMetadataForKey we would have access to allMetadata containing activePartitions as well as ReplicaPartitions.

We need to distinguish between genuine state Restoration (in case a replica becomes active) and Buildup of state from Scratch since we can serve data from the prior but not from the latter.  Let's discuss this case by case basis.

  • num.standby.replicas=0 : The system should be unavailable for reads even if there is 1 failure. Since now this partition has to recreate the whole state from scratch.


  •  num.standby.replicas=1 : In case of 1 failure, we can serve from the available replica (if it doesn't lag too much) till the available replica is promoted to active and from this point we can start serving from this new active(which will undergo state changes from restoring to running).

  • num.standby.replicas>1 : If the user decides to shutdown all 3 instances, then only outcome should be unavailability until all of them come back or state is rebuilt on other nodes in the cluster. In normal operations, f <= 2 and when a failure does happen we can provide a list of all available in-sync standbys which don't lag too much and user can decide to query from any of those.

Future Enhancements

Once KIP-441 is completed, the lag on each of the standby would be available in the assignment itself. This would enable us to add only those standby's in the metadata which have lag less than 10000 from the active as well send a sorted list back as a result which contains active node first and then the standbys sorted by freshness of data they have.

Compatibility, Deprecation, and Migration Plan

This KIP affects StreamsMetadata and AssignmentInfo. Currently, StreamsMetadata is not following any versioning so we might have to upgrade the version for it. Also, since this would include AssignmentInfo changes to add replicaPartitionsByHost, we would need to upgrade AssignmentInfo as well.

Rejected Alternatives

  • Adding a StreamsConfig to have a universal enableReplicaServing flag for the application. This would restrict the functionality as there could be multiple stores in an application and we need to have the flexibility to extend different consistency guarantees in such cases, which would be restricted by having a StreamsConfig.


  • No labels