Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Rather than building this special case into the code, we can simply have the members report their lag on non-logged stores as zero. Then, the task would automatically be considered stateful for assignment, but all instances would be considered equal assignment candidates. Further, because zero is the additive identity, this strategy works seamlessly when a task has both logged and non-logged stores. I.e., the "synthetic" zero-lag for the non-logged store gets added to the non-zero lag for the logged store, and we sensibly report the total lag for the task to just be the lag on the logged store.

Race between assignment and state cleanup

Streams starts a background thread to clean up (delete) any state directories that are currently unassigned and have not been updated in `state.cleanup.delay.ms` amount of time.

This KIP introduces a race by reporting the lag on an instance's stores, not just the assigned ones. It is possible that a member will report a lag on an unassigned store, then, while the rebalance is ongoing, the state directory could be deleted (invalidating the lag). If the leader assigns the task to that instance, based on the now-stale lag information, then the member would actually have to rebuild the entire state from the start of the changelog.

Currently, this race is prevented because the state cleaner only runs if the Streams instance is not in "REBALANCING" state. However, as part of KIP-429, the REBALANCING state would be removed from Streams (as it would no longer be necessary). Thus, we need to prevent the race condition without relying on the REBALANCING state.

It is sufficient to create a mutex that the state cleaner must acquire before running (and hold for the duration of its run). If we also acquire the mutex before we report store lags (as part of the JoinGroup), and hold it until we get an assignment back (when the rebalance completes), then we can ensure the race cannot occur.

Therefore, we propose to create such a mutex, and acquire it during org.apache.kafka.clients.consumer.ConsumerPartitionAssignor#subscriptionUserData (when we would compute the store lags), and release it in org.apache.kafka.clients.consumer.ConsumerRebalanceListener#onPartitionsAssigned (when we know that the state directory locks have already been acquired for any newly assigned tasks). Also, we will hold the lock for the entire duration of any call to `org.apache.kafka.streams.processor.internals.StateDirectory#cleanRemovedTasks(long)`.

This builds an unfortunate three-way dependency between the state cleanup, Streams's ConsumerPartitionAssignor, and Streams's ConsumerRebalanceListener. Any cleaner solution would require changing the Consumer APIs, though.

Example Scenarios

Scaling Out

...