Status
Current state: Under Discussion
Discussion thread: here
JIRA: TODO
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-42 added the ConsumerInterceptor
interface to support the introspection, and potentially, modification of messages before they are consumed. Many uses of message interception such as metrics collection/monitoring (which was a primary motivator for KIP-42) require the interceptor to keep some sort of per-partition state. Unfortunately, the ConsumerInterceptor
interface does not provide a way for the interceptor to be made aware of consumer rebalances, making maintenance of such state difficult. To remedy this, we propose having ConsumerInterceptor
extend ConsumerRebalanceListener
.
Public Interfaces
We propose augmenting ConsumerInterceptor
as follows:
public interface ConsumerInterceptor<K, V> extends Configurable, ConsumerRebalanceListener, AutoCloseable { // Existing methods elided @Override default void onPartitionsRevoked(Collection<TopicPartition> partitions) { } @Override default void onPartitionsAssigned(Collection<TopicPartition> partitions) { } }
Note the methods from ConsumerRebalanceListener
have no-op default
implementations for backwards compatibility. Note also there is an additional method on ConsumerRebalanceListener
, onPartitionsLost()
, which is already a default method.
Proposed Changes
We propose modifying ConsumerInterceptor
as detailed above, and wiring these up to be called from ConsumerCoordinator
, similarly to existing interceptor methods.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- None. Existing
ConsumerInterceptor
implementations are still fully compatible due to additional methods having default implementations.
- None. Existing
- If we are changing behavior how will we phase out the older behavior?
- No need. Empty default methods are equivalent to not receiving callbacks at all.
- If we need special migration tools, describe them here.
- N/A
- When will we remove the existing behavior?
- N/A
Rejected Alternatives
We could not add these methods and try to infer partition assignment based on which partitions are being committed, which is obtainable through the existing onCommit
method. But there is no guarantee when or even if consumers commit offsets, and even if they do, it's not reasonable to assume that because a partition was not committed it must have been revoked.
We could not have ConsumerInterceptor
extend the existing ConsumerRebalanceListener
interface and instead just add similarly named methods. This allows the interfaces to diverge in the future and it's not clear we would ever want rebalance listeners to have additional callbacks but not also provide them to interceptors.