You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 23 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-8904

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When a Kafka producer sends a record to a topic, it must have metadata about the topic in order to determine the partition to which the record will be delivered. Therefore, an unknown topic's metadata must be fetched whenever it is encountered. In the producer's current incarnation, no prior knowledge of the working set of topics is provided by the client so all topic metadata is requested on-demand.

For the majority of use-cases, where a fixed and/or manageable number of topics are processed, fetching topic metadata is a cost that's incurred upon startup but subsequently mitigated by maintaining a metadata cache. However, in the case where a large or variable number of topics are produced to, clients may encounter degraded performance that severely limits processing, or in extreme degenerate cases, behavior which impedes progress altogether.

There are a couple factors in the producer that hinder client processing when working with a large number of topics:

  1. The number of metadata RPCs generated.
  2. The size of the metadata RPCs.

For (1), an RPC is generated every time an uncached topic's metadata must be fetched. During periods when a large number of uncached topics are processed (e.g. producer startup), a large number of RPCs may be sent out to the controller in a short period of time. Generally, if there's n unknown topics, then O(n) metadata RPCs will be sent regardless to their proximity in time.

For (2), requests for metadata will also ask to refresh metadata about all known topics. As the number of topics becomes large, this will inflate the response size to be quite large and require non-trivial processing. This further exacerbates (1) in that every subsequent metadata request will result in an increasing amount of data transmitted back to the client for every RPC.

In concert, these factors amplify the negative effects of each other, and improvements should be made in order to alleviate topic scalability issues.

Public Interfaces

Adds producer configuration flag metadata.evict.ms (default: 5 minutes) to control topic eviction duration.

/** <code>metadata.evict.ms</code> */
public static final String METADATA_EVICT_MS_CONFIG = "metadata.evict.ms";
private static final String METADATA_EVICT_MS_DOC =
        "Controls how long the producer will cache metadata for a topic that's not being accessed. " +
        "If the elapsed time since a topic was last produce to exceeds the metadata eviction duration, " +
        "then the topic's metadata is forgotten and the next access to it will force a metadata " +
        "fetch request.";

...
    .define(METADATA_EVICT_MS_CONFIG,
            Type.LONG,
            2 * 60 * 1000,
            atLeast(5000),
            Importance.LOW,
            METADATA_EVICT_MS_DOC)


Proposed Changes

The proposal is to resolve (2), which should reduce the cost of (1) considerably.

The producer has two values of interest: an eviction threshold for topic metadata, which is used to remove an unused topic from the working set at a future time (currently hard-coded to 5 minutes), and a metadata refresh threshold, which is used to periodically refresh topic metadata (defined by metadata.max.age.ms). While seemingly similar, these two thresholds fundamentally differ: you could imagine a short eviction threshold in cases where records may be produced to a topic and then subsequently forgotten, or a long eviction where topics are intermittently produced to over the lifetime of the producer.

Therefore, the producer should add configuration flag 'metadata.evict.ms' (default: 5 minutes) to control topic eviction.

Changes will be made to permit a subset of topics to refresh their metadata. In determining which topics' metadata to refresh, the following algorithm will be used:

  • Define a target topic fetch RPC size (note this value is only respected by non-urgent topic metadata refreshes):
    • Let metadataRefreshSecs = metadata.max.age.ms / 1000
    • Set topicsPerSec = <number of cached topics> / metadataRefreshSecs
    • Set targetMetadataFetchSize = Math.max(topicsPerSec * 10, 20)
      • Rationale: this sets the target size to be approximate a metadata refresh at least every 10 seconds, while also maintaining a reasonable batch size of '20' for setups with a lower number of topics. '20' has no significance other than it's a small-but-appropriate trade-off between RPC metadata response size and necessary RPC frequency.
  • Maintain a collection T (urgent) of all topics
    • with no cached metadata and buffered producer request data
    • that were notified of a metadata change (e.g. NOT_LEADER_FOR_PARTITION encountered)
    • with metadata that hasn't been refreshed at least 'metadata.max.age.ms' ago
  • Maintain a collection U (non-urgent) of all topics
    • not contained in T
    • with metadata that hasn't been refreshed for at least 'metadata.max.age.ms * 0.5' ago
      • Rationale: A multiplier of 0.5 is presumed to be a reasonable time at which refreshing the metadata could be beneficial, with enough duration to allow for slack between when the metadata update is necessary.
  • If T is non-empty
    • fetch metadata for all topics in T (even if |T| exceeds targetMetadataFetchSize)
    • fetch metadata for a subset of topics in U, such that '|T + sub(U)| <= targetMetadataFetchSize'
  • If '|U| >= targetMetadataFetchSize'
    • fetch metadata for targetMetadataFetchSize topics in U with the oldest last refresh times

Following this, urgent metadata is always fetched immediately (when possible), and non-urgent topics are piggy-backed onto the request such that it doesn't exceed the target metadata fetch size. When enough non-urgent topics are candidates for metadata refresh, a request is issued, which has the effect of spacing out metadata fetches over time in appropriately-sized chunks.

Therefore, during conditions like producer startup, only urgent topics' metadata will be fetched, as opposed to all topics in the working set. While it doesn't reduce the number of generated RPCs, it dramatically reduces the response payload in the worst-case, and reduces overall processing by both server and client.

Note in the event of request failures (timeouts), there is no plan to change the current behavior, which is to wait 'retry.backoff.ms' before retrying.

Compatibility, Deprecation, and Migration Plan

Impact on client will be strictly internal performance improvements; no public APIs, protocols, or other external factors are being changed.

Rejected Alternatives

  • Allow the producer to specify interested topics. In doing so, many uncached topics could be fetched in a single request before records were produced for them, e.g. at startup. This would greatly alleviate the problem, however requires the clients to (1) implement a new producer call, and (2) know the working set of topics a priori. It'd obviate the need for fetching metadata asynchronously, which wouldn't resolve the throughput "bubble" that individual producer threads encounter when waiting for new topic metadata to be fetched.
  • No labels