Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Adds producer configuration flag metadata.expiryevict.ms (default: 5 minutes) to control topic expiry eviction duration.

Code Block
languagejava
/** <code>metadata.expiryevict.ms</code> */
public static final String METADATA_EXPIRYEVICT_MS_CONFIG = "metadata.expiryevict.ms";
private static final String METADATA_EXPIRYEVICT_MS_DOC =
        "Controls how long the producer will cache metadata for a topic that's not being accessed. " +
        "If the elapsed time since a topic was last produce to exceeds the metadata expiryeviction duration, " +
        "then the topic's metadata is forgotten and the next access to it will force a metadata " +
        "fetch request.";

...
    .define(METADATA_EXPIRYEVICT_MS_CONFIG,
            Type.LONG,
            5 * 60 * 1000,
            atLeast(1000),
            Importance.LOW,
            METADATA_EXPIRYEVICT_MS_DOC)


Proposed Changes

The proposal is to resolve (2), which should reduce the cost of (1) considerably.

The producer has two values of interest: an expiry eviction threshold for topic metadata, which is used to remove an unused topic from the working set at a future time (currently hard-coded to 5 minutes), and a metadata refresh threshold, which is used to periodically refresh topic metadata (defined by metadata.max.age.ms). While seemingly similar, these two thresholds fundamentally differ: you could imagine a short expiry evcition threshold in cases where records may be produced to a topic and then subsequently forgotten, or a long expiry eviction where topics are intermittently produced to over the lifetime of the producer.

Therefore, the producer should add configuration flag 'metadata.expiryevict.ms' (default: 5 minutes) to control topic expiryeviction.

Changes will be made to permit a subset of topics to refresh their metadata. In determining which topics' metadata to refresh, the following algorithm will be used:

  • Define a target topic fetch RPC size:
    • Let metadataExpirySecs metadataevictionSecs = metadata.expiryevict.ms / 1000
    • Set topicsPerSec = <number of cached topics> / metadataExpirySecsmetadataevictionSecs
    • Set targetMetadataFetchSize = Math.max(topicsPerSec / 10, 20)
      • Rationale: this sets the target size to be approximate a metadata refresh at least every 10 seconds, while also maintaining a reasonable batch size of '20' for setups with a lower number of topics. '20' has no significance other than it's a small-but-appropriate trade-off between RPC metadata response size and necessary RPC frequency.
  • Maintain a collection T (urgent) of all topics
    • with no cached metadata and buffered producer request data
    • that were notified of a metadata change (e.g. NOT_LEADER_FOR_PARTITION encountered)
    • with metadata that hasn't been refreshed at least 'metadata.max.age.ms' ago
  • Maintain a collection U (non-urgent) of all topics
    • not contained in T
    • with metadata that hasn't been refreshed for at least 'metadata.max.age.ms * 0.5' ago
      • Rationale: A multiplier of 0.5 is presumed to be a reasonable time at which refreshing the metadata could be beneficial, with enough duration to allow for slack between when the metadata update is necessary.
  • If T is non-empty
    • fetch metadata for all topics in T
    • fetch metadata for a subset of topics in U, such that '|T + sub(U)| <= TARGET_METADATA_FETCH_SIZE'
  • If '|U| >= TARGET_METADATA_FETCH_SIZE'
    • fetch metadata for TARGET_METADATA_FETCH_SIZE topics in U with the oldest last refresh times

...