...
The producer has two values of interest: an eviction threshold for topic metadata, which is used to remove an unused topic from the working set at a future time (currently hard-coded to 5 minutes), and a metadata refresh threshold, which is used to periodically refresh topic metadata (defined by metadata.max.age.ms
). While seemingly similar, these two thresholds fundamentally differ: you could imagine a short evcition eviction threshold in cases where records may be produced to a topic and then subsequently forgotten, or a long eviction where topics are intermittently produced to over the lifetime of the producer.
...
- Define a target topic fetch RPC size (note this value is only respected by non-urgent topic metadata refreshes):
- Let
metadataEvictionSecs = metadata.evict.ms / 1000
- Set
topicsPerSec = <number of cached topics> / metadataEvictionSecs
- Set
targetMetadataFetchSize = Math.max(topicsPerSec * 10, 20)
- Rationale: this sets the target size to be approximate a metadata refresh at least every 10 seconds, while also maintaining a reasonable batch size of '20' for setups with a lower number of topics. '20' has no significance other than it's a small-but-appropriate trade-off between RPC metadata response size and necessary RPC frequency.
- Let
- Maintain a collection T (urgent) of all topics
- with no cached metadata and buffered producer request data
- that were notified of a metadata change (e.g. NOT_LEADER_FOR_PARTITION encountered)
- with metadata that hasn't been refreshed at least
'metadata.max.age.ms'
ago
- Maintain a collection U (non-urgent) of all topics
- not contained in T
- with metadata that hasn't been refreshed for at least
'metadata.max.age.ms * 0.5'
ago- Rationale: A multiplier of 0.5 is presumed to be a reasonable time at which refreshing the metadata could be beneficial, with enough duration to allow for slack between when the metadata update is necessary.
- If T is non-empty
- fetch metadata for all topics in T (even if |
T|
exceedstargetMetadataFetchSize
) - fetch metadata for a subset of topics in U, such that '|
T + sub(U)| <= TARGET_METADATA_FETCH_SIZEtargetMetadataFetchSize'
- fetch metadata for all topics in T (even if |
- If
'|U| >= TARGET_METADATA_FETCH_SIZEtargetMetadataFetchSize'
- fetch metadata for
TARGET_METADATA_FETCH_SIZE
targetMetadataFetchSize
topics in U with the oldest last refresh times
- fetch metadata for
...