Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The producer has two values of interest: an eviction threshold for topic metadata, which is used to remove an unused topic from the working set at a future time (currently hard-coded to 5 minutes), and a metadata refresh threshold, which is used to periodically refresh topic metadata (defined by metadata.max.age.ms). While seemingly similar, these two thresholds fundamentally differ: you could imagine a short evcition eviction threshold in cases where records may be produced to a topic and then subsequently forgotten, or a long eviction where topics are intermittently produced to over the lifetime of the producer.

...

  • Define a target topic fetch RPC size (note this value is only respected by non-urgent topic metadata refreshes):
    • Let metadataEvictionSecs = metadata.evict.ms / 1000
    • Set topicsPerSec = <number of cached topics> / metadataEvictionSecs
    • Set targetMetadataFetchSize = Math.max(topicsPerSec * 10, 20)
      • Rationale: this sets the target size to be approximate a metadata refresh at least every 10 seconds, while also maintaining a reasonable batch size of '20' for setups with a lower number of topics. '20' has no significance other than it's a small-but-appropriate trade-off between RPC metadata response size and necessary RPC frequency.
  • Maintain a collection T (urgent) of all topics
    • with no cached metadata and buffered producer request data
    • that were notified of a metadata change (e.g. NOT_LEADER_FOR_PARTITION encountered)
    • with metadata that hasn't been refreshed at least 'metadata.max.age.ms' ago
  • Maintain a collection U (non-urgent) of all topics
    • not contained in T
    • with metadata that hasn't been refreshed for at least 'metadata.max.age.ms * 0.5' ago
      • Rationale: A multiplier of 0.5 is presumed to be a reasonable time at which refreshing the metadata could be beneficial, with enough duration to allow for slack between when the metadata update is necessary.
  • If T is non-empty
    • fetch metadata for all topics in T (even if |T| exceeds targetMetadataFetchSize)
    • fetch metadata for a subset of topics in U, such that '|T + sub(U)| <= TARGET_METADATA_FETCH_SIZEtargetMetadataFetchSize'
  • If '|U| >= TARGET_METADATA_FETCH_SIZEtargetMetadataFetchSize'
    • fetch metadata for TARGET_METADATA_FETCH_SIZE targetMetadataFetchSize topics in U with the oldest last refresh times

...