Status
Current state: Under Discussion
Discussion thread: [DISCUSS] KIP-446: Add changelog topic configuration to KTable suppress
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
When suppressing record updates in a KTable, an internal changelog topic is created. The suppress operator sends a record to the topic when it is received from upstream, and sends a tombstone after it is emtited. Thus, data in the buffer is much shorter lived than data in other KTables, and the buffer changelog also may be much more compactable than KTable changelogs. Currently the configuration for this topic cannot be set using the Streams DSL, instead a user has to run an external tool to override the defaults.
Other parts of the Streams API that create internal topics do allow for a user to set their configuration. When aggregating a stream, for example, an instance of Materialized is passed to the aggregate method which holds a topicConfig of type Map<String, String>.
Public Interfaces
We add two methods to Suppressed.BufferConfig.
withLogginEnabled
: (default) enables logging and allows for configuration of the changelog topic- withLoggingDisabled: disables logging to the changelog topic
When none of the above methods is called, the behavior will be no different from the current implementation, i.e., records will be written to a changelog topic with the default settings.
Public Interfaces
The public interface of BufferConfig
will change to include the previously mentioned methods.
public interface Suppressed<K> { interface BufferConfig<BC extends BufferConfig<BC>> { /** * Disable change logging for the currently suppressed records. */ BC withLoggingDisabled(); /** * Indicates that a changelog topic should be created containing the currently suppressed * records. Due to the short-lived nature of records in this topic it is likely more * compactable than changelog topics for KTables. * * @param config Configs that should be applied to the changelog. Note: Any unrecognized * configs will be ignored. * @return itself */ BC withLoggingEnabled(final Map<String, String> config); } }
Compatibility, Deprecation, and Migration Plan
The impact on existing applications will be non-existant as this change does not remove or alter existing behavior.
Rejected Alternatives
N/A