Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Define a target topic fetch RPC size:
    • Let metadataExpirySecs = metadata.expiry.ms / 1000
    • Set topicsPerSec = <number of cached topics> / metadataExpirySecs
    • Set targetMetadataFetchSize = Math.max(topicsPerSec / 10, 20)
      • Rationale: this sets the target size to be approximate a metadata refresh at least every 10 seconds, while also maintaining a reasonable batch size of '20' for setups with a lower number of topics. '20' has no significance other than it's a small-but-appropriate trade-off between RPC metadata response size and necessary RPC frequency.
  • Maintain a collection T (urgent) of all topics
    • with no cached metadata and buffered producer request data
    • that were notified of a metadata change (e.g. NOT_LEADER_FOR_PARTITION encountered)
    • with metadata that hasn't been refreshed at least 'metadata.max.age.ms' ago
  • Maintain a collection U (non-urgent) of all topics
    • not contained in T
    • with metadata that hasn't been refreshed for at least 'metadata.max.age.ms * 0.5' ago
      • Rationale: A multiplier of 0.5 is presumed to be a reasonable time at which refreshing the metadata could be beneficial, with enough duration to allow for slack between when the metadata update is necessary.
  • If T is non-empty
    • fetch metadata for all topics in T
    • fetch metadata for a subset of topics in U, such that '|T + sub(U)| <= TARGET_METADATA_FETCH_SIZE'
  • If '|U| >= TARGET_METADATA_FETCH_SIZE'
    • fetch metadata for TARGET_METADATA_FETCH_SIZE topics in U with the oldest last refresh times

...