Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For the majority of use-cases, where a fixed and/or manageable number of topics are processed, fetching topic metadata is a cost that's incurred upon startup but subsequently mitigated by maintaining a metadata cache. However, in the case where a large or variable number of topics are writtenproduced to, clients may encounter degraded performance that severely limits processing, or in extreme degenerate timeout cases, behavior which impedes progress altogether.

There are three primary a couple factors in the producer that hinder client processing when working with a large number of topics:

  1. The number of metadata RPCs generated.
  2. The size of the metadata RPCs.Throughput constriction while fetching metadata.

For (1), an RPC is generated every time an uncached topic's metadata must be fetched. During periods when a large number of uncached topics are processed (e.g. producer startup), a large number of RPCs may be sent out to the controller in a short period of time. Generally, if there's n unknown topics, then O(n) metadata RPCs will be sent regardless to their proximity in time.

For (2), requests for metadata will also ask to refresh metadata about all known topics. As the number of topics becomes large, this will inflate the response size to be quite large and require non-trivial processing. This further exacerbates (1) in that every subsequent metadata request will result in an increasing amount of data transmitted back to the client for every RPC.

For (3), fetching of a topic's metadata is a blocking operation in the producer, which is all the more surprising because it blocks in a function that's advertised as asynchronous. This means that a pipeline of records to be submitted for various uncached topics will serially block while fetching an individual topic's metadata.

In concert, these three factors amplify the negative effects of each other, and improvements should be resolved made in order to alleviate any topic scalability issues.

...

No public interfaces will be modified.

Proposed Changes

The first step to addressing the above changes is to make the fetching of metadata asynchronous within the producer. This directly fixes (3), and opens the path for resolving (1) by enabling the metadata requests to be batched together. Since the producer's interface is asynchronous and it inherently batches the sending of records to partitions, subjecting the metadata fetching to a subset of the batching delay doesn't change the interaction or expectations of the client. This change alone should be good enough to bring performance back to acceptable (pending verification).

Specific modifications are to make KafkaProducer#waitOnMetadata to be asynchronous when it must block. For uncached topics, the producer will maintain a queue of its outstanding records to ensure proper ordering (in the accumulator and for callback invocations) once topic metadata is resolved. Proper care must be taken to maintain the linger period for fetching metadata, individual record timeouts while queued, and record buffer memory limits.

proposal is to resolve (2), which should reduce the cost of (1) considerably.

The producer has two values of interest: an expiry threshold for topic metadata, which is used to remove an unused To address (2), the producer currently maintains an expiry threshold for every topic, which is used to remove a topic from the working set at a future time (currently hard-coded to 5 minutes, this should be modified to use ), and a metadata refresh threshold, which is used to periodically refresh topic metadata (defined by metadata.max.age.ms). While this does work to reduce the size of the topic working set, the producer will continue fetching metadata for these topics in every metadata request for seemingly similar, these two thresholds fundamentally differ: you could imagine a short expiry threshold in cases where records may be produced to a topic and then subsequently forgotten, or a long expiry where topics are intermittently produced to over the lifetime of the producer.

Therefore, the producer should add configuration flag 'metadata.expiry.ms' (default: 5 minutes) to control topic expiry.

Changes will be made to permit a subset of topics to refresh their metadata. In determining which topics' metadata to refresh, the following algorithm will be used:

  • Define a target topic fetch RPC size 'TARGET_METADATA_FETCH_SIZE = 25'
  • Maintain a collection T (urgent) of all topics
    • with no cached metadata
    • that were notified of a metadata change (e.g. NOT_LEADER_FOR_PARTITION encountered)
    • with metadata that hasn't been refreshed at least 'metadata.max.age.ms' ago
  • Maintain a collection U (non-urgent) of all topics
    • not contained in T
    • with metadata that hasn't been refreshed for at least 'metadata.max.age.ms * 0.5' ago
  • If T is non-empty
    • fetch metadata for all topics in T
    • fetch metadata for a subset of topics in U, such that '|T + sub(U)| <= TARGET_METADATA_FETCH_SIZE'
  • If '|U| >= TARGET_METADATA_FETCH_SIZE'
    • fetch metadata for TARGET_METADATA_FETCH_SIZE topics in U with the oldest last refresh times

Following this, urgent metadata is always fetched immediately (when possible), and non-urgent topics are piggy-backed onto the request such that it doesn't exceed the target metadata fetch size. When enough non-urgent topics are candidates for metadata refresh, a request is issued, which has the effect of spacing out metadata fetches over time in appropriately-sized chunks.

Therefore, during conditions like producer startup, only urgent topics' metadata will be fetched, as opposed to all topics in the working set. While it doesn't reduce the number of generated RPCs, it dramatically reduces the response payload in the worst-case, and reduces overall processing by both server and client.

Note in the event of request failures (timeouts), there is no plan to change the current behavior, which is to wait 'retry.backoff.ms' before retrying the full expiry duration. This logic can be made more intelligent by managing the expiry from when the topic was last used, enabling the expiry duration to be reduced to improve cases where a large number of topics are touched intermittently.

Compatibility, Deprecation, and Migration Plan

...