Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Internally, when the consumer handles fetch responses from the brokers, it also receives metadata about the end offset of the partition(s), which it uses to update its metrics, including specifically a metric indicating the current lag. Theoretically, this could supply the information we need, except that we don't know how old those metrics are. To provide the desired semantics, we would like to use the lag information only if it is not too fresh, but the existing metrics may be arbitrarily stale.

To overcome these challenges, we propose to expose this fetched metadata in a new method on ConsumerRecords: ConsumerRecords#metadata(): Map<TopicPartition, Metadata>, where Metadata includes the receivedTime, position, lag, startOffset, and endOffset information.

Streams will be able to use this new method by maintaining internal flags of which partitions have been fetched, what the lag was at each fetch, and when the fetches took place. Essentially, each time we get a response back from poll(), we will persist the receivedTimestamp and lag for each partition. Then, when it comes time to decide whether we are ready to process, if we don't have data buffered for a partition, then we can consult this local metadata to see what the lag is. If the lag is missing or too stale, we would wait to fetch the metadata. If the lag is greater than zero, then we know there is data on the brokers, and we should wait to poll it before processing anything. If the lag is zero, then we can apply the config to idle the desired amount of time before enforcing processing.

...