Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When suppressing record updates in a KTable, an internal changelog topic is created. The suppress operator sends a record to the topic when it is received from upstream, and sends a tombstone after it is emtited.  Thus, data in the buffer is much shorter lived than data in other KTables, and the buffer changelog also may be much more compactable than KTable changelogs. Currently the configuration for this topic cannot be set using the Streams DSL, instead a user has to run an external tool to override the defaults.

Other parts of the Streams API that create internal topics do allow for a user to set their configuration. When aggregating a stream, for example, an instance of Materialized is passed to the aggregate method which holds a topicConfig of type Map<String, String>.

Public Interfaces

We add two methods  to Suppressed.BufferConfig.

...

Code Block
titleMethods to add to Suppressed.BufferConfig
public interface Suppressed<K> {

    interface BufferConfig<BC extends BufferConfig<BC>> {
        /**
         * Disable change logging for the currently suppressed KTablerecords.
         */
        BC withLoggingDisabled();

        /**
         * Indicates that a changelog topic should be created forcontaining the currently suppressed KTable.
         * The changelog will be created with the provided configs records. Due to the short-lived nature of records in this topic it is likely more
         * compactable than changelog topics for KTables.
         *
         * @param config Configs that should be applied to the changelog. Note: Any unrecognized
         * configs will be ignored.
         * @return itself
         */
        BC withLoggingEnabled(final Map<String, String> config);
    }
}

...