You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Current »

Status

Current stateUnder Discussion

Discussion thread: thread

JIRA: KAFKA-8403

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

From 2.1, Kafka Streams provides a way to suppress the intermediate state of KTable (KIP-328: Ability to suppress updates for KTables). The 'KTable#suppress' operator introduced in KIP-328 controls what updates downstream table and stream operations will receive. With this feature, the contents of the upstream table are disjointed into two groups, one for the intermediate state in the suppression buffer and the other for final states emitted to the downstream table. The user can query the associated value to a specific key in the downstream table by querying the upstream table (KIP-67: Queryable state for Kafka Streams), since all of the key-value mappings in the downstream table are also stored in the upstream table.

However, there is a limitation; if the user only wants to retrieve the associated value to a specified key (like `ReadOnlyKeyValueStore#get`), it is okay. But if what the user wants is getting an iterator to a suppressed view (like `ReadOnlyKeyValueStore#range` or `ReadOnlyKeyValueStore#all`), we stuck in trouble - since there is no way to identify which key is flushed out beforehand.

One available workaround is materializing the downstream table like `downstreamTable.filter(e -> true, Materialized.as("final-state"))`. However, this way is cumbersome.

Proposed Changes

This KIP proposes to add a variant of suppress method with Materialize configuration, 'KTable#suppress(Suppressed, Materialized)'.

Public Interfaces

1. Java DSL

public interface KTable<K, V> {

    ...

    /**
     * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration.
     * <p>
     * This controls what updates downstream table and stream operations will receive.
     *
     * @param suppressed            Configuration object determining what, if any, updates to suppress
     * @param queryableStoreName    A queryableStoreName of suppression buffer
     * @return A new {@code KTable} with the desired suppression characteristics.
     */
    KTable<K, V> suppress(final Suppressed<? super K> suppressed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

    ....

2. Scala DSL

class KTable[K, V](val inner: KTableJ[K, V]) {

    ...

    /**
     * Suppress some updates from this changelog stream, determined by the supplied [[Suppressed]] configuration.
     *
     * This controls what updates downstream table and stream operations will receive.
     *
     * @param suppressed Configuration object determining what, if any, updates to suppress.
     * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
     *                      should be materialized.
     * @return A new KTable with the desired suppression characteristics.
     * @see `org.apache.kafka.streams.kstream.KTable#suppress`
     */
    def suppress(suppressed: org.apache.kafka.streams.kstream.Suppressed[_ >: K],
                 materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
      new KTable(inner.suppress(suppressed, materialized))

    ....

Compatibility, Deprecation, and Migration Plan

None.

  • No labels