Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. The number of metadata RPCs generated.
  2. The size of the metadata RPCs.
  3. Metadata ages out of the cache even though it's expected to be used in the near future.

For (1), an RPC is generated every time an uncached topic's metadata must be fetched. During periods when a large number of uncached topics are processed (e.g. producer startup), a large number of RPCs may be sent out to the controller in broker(s) in a short period of time. Generally, if there's n unknown topics, then O(n) metadata RPCs will be sent regardless to their proximity in time.

For (2), requests for metadata will also ask to refresh metadata about all known topics. As the number of topics becomes large, this will inflate the response size to be quite large and require non-trivial processing. This further exacerbates (1) in that every subsequent metadata request will result in an increasing amount of data transmitted back to the client for every RPC.

For (3), implementing (2) will reduce the cost for these occurrences. However, the duration after which metadata is evicted from the producer's metadata cache is currently a fixed value, and therefore cannot be modified the client, even if the value is too short of a duration. Implementing a way to control the eviction period should remove the need for the metadata RPC in these cases.

In concert, these factors amplify the negative effects of each other, and improvements should be made in order to alleviate topic scalability issues.

Public Interfaces

Adds producer configuration flag metadata.expiry.ms (default: 5 minutes) to control topic expiry duration.

Code Block
languagejava
/** <code>metadata.expiry.ms</code> */
public static final String METADATA_EXPIRY_MS_CONFIG = "metadata.expiry.ms";
private static final String METADATA_EXPIRY_MS_DOC =
        "Controls how long the producer will cache metadata for a topic that's not being accessed. " +
        "If the elapsed time since a topic was last produce to exceeds the metadata expiry duration, " +
        "then the topic's metadata is forgotten and the next access to it will force a metadata " +
        "fetch request.";

...
    .define(METADATA_EXPIRY_MS_CONFIG,
            Type.LONG,
            5 * 60 * 1000,
            atLeast(1000),
            Importance.MEDIUM,
            METADATA_EXPIRY_MS_DOC)

Proposed Changes

The proposal is to resolve (2) and (3), which should reduce the cost of (1) considerably.

The producer has two values of interest: an expiry eviction threshold for topic metadata, which is used to remove an unused topic from the working set at a future time (currently hard-coded to 5 minutes), and a metadata refresh threshold, which is used to periodically refresh topic metadata (defined by metadata.max.age.ms). While seemingly similar, these two thresholds fundamentally differ: you could imagine a short expiry eviction threshold in cases where records may be produced to a topic and then subsequently forgotten, or a long expiry eviction where topics are intermittently produced to over the lifetime of the producer.

Therefore, the producer should add configuration flag 'metadata.expirymax.idle.ms' (default: 5 minutes) to control topic expiryeviction.

Changes will be made to permit a subset of topics to refresh their metadata. In When determining which topics' metadata to refresh, the following algorithm criteria will be used:

  • If a new (uncached) topic is encountered, only fetch metadata for that particular topic. This is new.
  • If a topic was
  • Define a target topic fetch RPC size:
    • Let metadataExpirySecs = metadata.expiry.ms / 1000
    • Set topicsPerSec = <number of cached topics> / metadataExpirySecs
    • Set targetMetadataFetchSize = Math.max(topicsPerSec / 10, 20)
      • Rationale: this sets the target size to be approximate a metadata refresh at least every 10 seconds, while also maintaining a reasonable batch size of '20' for setups with a lower number of topics. '20' has no significance other than it's a small-but-appropriate trade-off between RPC metadata response size and necessary RPC frequency.
  • Maintain a collection T (urgent) of all topics
  • with no cached metadata and buffered producer request data
  • that were notified of a metadata change (e.g. NOT_LEADER_FOR_PARTITION encountered)
  • with metadata that hasn't been refreshed at least 'metadata.max.age.ms' ago
  • Maintain a collection U (non-urgent) of all topics, then update all topics in the working set.
    • The rationale is that, when such changes are encountered, it's highly probable that other topics' metadata will also need to be refreshed. This is unchanged from how the logic works today.
  • If a topic's
  • not contained in T
  • with metadata that hasn't been refreshed for at least least 'metadata.max.age.ms * 0.5' ago
    • Rationale: A multiplier of 0.5 is presumed to be a reasonable time at which refreshing the metadata could be beneficial, with enough duration to allow for slack between when the metadata update is necessary.
  • If T is non-empty
    • fetch metadata for all topics in T
    • fetch metadata for a subset of topics in U, such that '|T + sub(U)| <= TARGET_METADATA_FETCH_SIZE'
  • If '|U| >= TARGET_METADATA_FETCH_SIZE'
    • fetch metadata for TARGET_METADATA_FETCH_SIZE topics in U with the oldest last refresh times

...

  • ' ago, then update all topics in the working set.
    • The rationale is that, when encountered, other topics will also be nearing their metadata max age. This is unchanged from how the logic works today.

Therefore, during conditions like producer startup, only urgent new topics' metadata will be fetched, as opposed to all topics in the working set. While it doesn't reduce the number of generated RPCs, it dramatically reduces the response payload in the worst-case, and reduces overall processing by both server and client.

Note in the event of request failures (timeouts), there is no plan to change the current behavior, which is to wait 'retry.backoff.ms' before retrying.

Public Interfaces

Adds producer configuration flag metadata.max.idle.ms (default: 5 minutes) to control topic eviction duration.

Code Block
languagejava
/** <code>metadata.max.idle.ms</code> */
public static final String METADATA_MAX_IDLE_CONFIG = "metadata.max.idle.ms";
private static final String METADATA_MAX_IDLE_DOC =
        "Controls how long the producer will cache metadata for a topic that's idle. If the elapsed " +
        "time since a topic was last produced to exceeds the metadata idle duration, then the topic's " +
        "metadata is removed from the cache and the next access to it will force a metadata fetch request.";

...
    .define(METADATA_MAX_IDLE_CONFIG,
            Type.LONG,
            5 * 60 * 1000,
            atLeast(5000),
            Importance.LOW,
            METADATA_MAX_IDLE_DOC)


Compatibility, Deprecation, and Migration Plan

...