...
Adds producer configuration flag metadata.expiryevict.ms (default: 5 minutes)
to control topic expiry eviction duration.
Code Block | ||
---|---|---|
| ||
/** <code>metadata.expiryevict.ms</code> */ public static final String METADATA_EXPIRYEVICT_MS_CONFIG = "metadata.expiryevict.ms"; private static final String METADATA_EXPIRYEVICT_MS_DOC = "Controls how long the producer will cache metadata for a topic that's not being accessed. " + "If the elapsed time since a topic was last produce to exceeds the metadata expiryeviction duration, " + "then the topic's metadata is forgotten and the next access to it will force a metadata " + "fetch request."; ... .define(METADATA_EXPIRYEVICT_MS_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(1000), Importance.LOW, METADATA_EXPIRYEVICT_MS_DOC) |
Proposed Changes
The proposal is to resolve (2), which should reduce the cost of (1) considerably.
The producer has two values of interest: an expiry eviction threshold for topic metadata, which is used to remove an unused topic from the working set at a future time (currently hard-coded to 5 minutes), and a metadata refresh threshold, which is used to periodically refresh topic metadata (defined by metadata.max.age.ms
). While seemingly similar, these two thresholds fundamentally differ: you could imagine a short expiry evcition threshold in cases where records may be produced to a topic and then subsequently forgotten, or a long expiry eviction where topics are intermittently produced to over the lifetime of the producer.
Therefore, the producer should add configuration flag 'metadata.expiryevict.ms
' (default: 5 minutes) to control topic expiryeviction.
Changes will be made to permit a subset of topics to refresh their metadata. In determining which topics' metadata to refresh, the following algorithm will be used:
- Define a target topic fetch RPC size:
- Let
metadataExpirySecs metadataevictionSecs = metadata.expiryevict.ms / 1000
- Set
topicsPerSec = <number of cached topics> / metadataExpirySecsmetadataevictionSecs
- Set
targetMetadataFetchSize = Math.max(topicsPerSec / 10, 20)
- Rationale: this sets the target size to be approximate a metadata refresh at least every 10 seconds, while also maintaining a reasonable batch size of '20' for setups with a lower number of topics. '20' has no significance other than it's a small-but-appropriate trade-off between RPC metadata response size and necessary RPC frequency.
- Let
- Maintain a collection T (urgent) of all topics
- with no cached metadata and buffered producer request data
- that were notified of a metadata change (e.g. NOT_LEADER_FOR_PARTITION encountered)
- with metadata that hasn't been refreshed at least
'metadata.max.age.ms'
ago
- Maintain a collection U (non-urgent) of all topics
- not contained in T
- with metadata that hasn't been refreshed for at least
'metadata.max.age.ms * 0.5'
ago- Rationale: A multiplier of 0.5 is presumed to be a reasonable time at which refreshing the metadata could be beneficial, with enough duration to allow for slack between when the metadata update is necessary.
- If T is non-empty
- fetch metadata for all topics in T
- fetch metadata for a subset of topics in U, such that '|
T + sub(U)| <= TARGET_METADATA_FETCH_SIZE'
- If
'|U| >= TARGET_METADATA_FETCH_SIZE'
- fetch metadata for
TARGET_METADATA_FETCH_SIZE
topics in U with the oldest last refresh times
- fetch metadata for
...