THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- Define a target topic fetch RPC size:
- Let
metadataExpirySecs = metadata.expiry.ms / 1000
- Set
topicsPerSec = <number of cached topics> / metadataExpirySecs
- Set
targetMetadataFetchSize = Math.max(topicsPerSec / 10, 20)
- Rationale: this sets the target size to be approximate a metadata refresh at least every 10 seconds, while also maintaining a reasonable batch size of '20' for setups with a lower number of topics. '20' has no significance other than it's a small-but-appropriate trade-off between RPC metadata response size and necessary RPC frequency.
- Let
- Maintain a collection T (urgent) of all topics
- with no cached metadata and buffered producer request data
- that were notified of a metadata change (e.g. NOT_LEADER_FOR_PARTITION encountered)
- with metadata that hasn't been refreshed at least
'metadata.max.age.ms'
ago
- Maintain a collection U (non-urgent) of all topics
- not contained in T
- with metadata that hasn't been refreshed for at least
'metadata.max.age.ms * 0.5'
ago- Rationale: A multiplier of 0.5 is presumed to be a reasonable time at which refreshing the metadata could be beneficial, with enough duration to allow for slack between when the metadata update is necessary.
- If T is non-empty
- fetch metadata for all topics in T
- fetch metadata for a subset of topics in U, such that '|
T + sub(U)| <= TARGET_METADATA_FETCH_SIZE'
- If
'|U| >= TARGET_METADATA_FETCH_SIZE'
- fetch metadata for
TARGET_METADATA_FETCH_SIZE
topics in U with the oldest last refresh times
- fetch metadata for
...