Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current stateUnder Discussion

Discussion thread: thread

JIRA: KAFKA-8403

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Sometimes, the user may want to query the intermediate state of the suppression. However, as of 2.3, the KTable#suppress operator (added 2.1) lacks this functionality.

Proposed Changes

This KIP proposes to make the suppression buffer state queriable, and add a variant method to specify the queryableStoreName: KTable#suppress(Suppressed, String). User can query the state of suppression buffer like the following:

Code Block
languagejava
    // A queryable interface to the suppression buffer, named "suppression-buffer".
    final ReadOnlyKeyValueStore<String, String> myMapStore =
        kafkaStreams.store("suppression-buffer", QueryableStoreTypes.suppressionBuffer(Serdes.String(), Serdes.String()));

Public Interfaces

From 2.1, Kafka Streams provides a way to suppress the intermediate state of KTable (KIP-328: Ability to suppress updates for KTables). The 'KTable#suppress' operator introduced in KIP-328 controls what updates downstream table and stream operations will receive. With this feature, the contents of the upstream table are disjointed into two groups, one for the intermediate state in the suppression buffer and the other for final states emitted to the downstream table. The user can query the associated value to a specific key in the downstream table by querying the upstream table (KIP-67: Queryable state for Kafka Streams), since all of the key-value mappings in the downstream table are also stored in the upstream table.

However, there is a limitation; if the user only wants to retrieve the associated value to a specified key (like `ReadOnlyKeyValueStore#get`), it is okay. But if what the user wants is getting an iterator to a suppressed view (like `ReadOnlyKeyValueStore#range` or `ReadOnlyKeyValueStore#all`), we stuck in trouble - since there is no way to identify which key is flushed out beforehand.

One available workaround is materializing the downstream table like `downstreamTable.filter(e -> true, Materialized.as("final-state"))`. However, this way is cumbersome.

Proposed Changes

This KIP proposes to add an option to make suppression state queriable by adding a queriable flag to Suppressed.

Public Interfaces

...

Code Block
languagejava
public interface KTable<K, V>Suppressed<K> extends NamedOperation<Suppressed<K>> {

    ...

    /**
     * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configurationMake the suppression queryable.
     * <p>
     * This@return controlsThe whatsame updatesconfiguration downstreamwith table and stream operations will receivequery enabled.
     */
     * @param suppressed            Configuration object determining what, if any, updates to suppressSuppressed<K> enableQuery();

     * @param queryableStoreName    A queryableStoreName of suppression buffer/**
     * @return AReturns new {@code KTable} withtrue iff the desiredquery suppressionis characteristicsenabled.
     */
    KTable<K, V> suppress(final Suppressed<? super K> suppressed, final String queryableStoreNameboolean isQueryEnabled();

    ....

Add a new QueryableStoreType, SuppressionBufferType

Code Block
languagejava
public final class QueryableStoreTypes {

    ...

    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> timestampedKeyValueStore() {
        return new TimestampedKeyValueStoreType<>();
    }

    ...

    public static class SuppressionBufferType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {

        private final Serde<K> keySerde;
        private final Serde<V> valueSerde;

        SuppressionBufferType(final Serde<K> keySerde, final Serde<V> valueSerde) {
            super(new HashSet<>(Arrays.asList(
                    TimeOrderedKeyValueBuffer.class,
                    ReadOnlyKeyValueStore.class)));
            this.keySerde = keySerde;
            this.valueSerde = valueSerde;
        }

        @Override
        public ReadOnlyKeyValueStore<K, V> create(final StateStoreProvider storeProvider,
                                                  final String storeName) {
            return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, storeName);
        }

        public Serde<K> keySerde() {
            return keySerde;
        }

        public Serde<V> valueSerde() {
            return valueSerde;
        }

    }
}

Compatibility, Deprecation, and Migration Plan

None.

Rejected Alternatives

Adding KTable#suppress(Suppressed, Materialized)

The user can query the suppressed view with Suppressed#name, if Suppressed.isQueryEnabled is true.

Calling Suppressed#enableQuery without specifying name with Suppressed#withName is not allowed. For this case, IllegalArgumentException is thrown.

Compatibility, Deprecation, and Migration Plan

None.

Rejected Alternatives

KTable#suppress(Suppressed, Materialized<K, V, KeyValueStore>)

This approach feels more consistent with existing APIs with Materialized variant (e.g., KTable#filter(Predicate) - KTable#filter(Predicate, Materialized)) at first appearance. However, this approach introduces two concepts of the name for the same operation: Suppressed#name and Materialized#name. It is not feasible.

The current API for the Materialized variant is just a legacy of nameless operators before KIP-307. In this case, we already have Suppressed class and don't need to keep consistency with the old Materialized variant methods. So rejected.

KTable#suppress(Suppressed, String)

Another alternative is passing the state store name directly. This approach is neither consistent with the existing APIs nor has clear semantics, since it also introduces two concepts for the same operation. So rejectedSince KTable#suppress does not change the Key, Value type of the KTable instance, Materialized#with[KeySerde, ValueSerde] should be ignored. Since the underlying data structure, InMemoryTimeOrderedKeyValueBuffer, does not support caching or logging, Materialized#withCaching[Enabled, Disabled] and Materialized#withLogging[Enabled, Disabled] options are also ignored. For these reasons, a Materialized instance is too big for parameter, unless InMemoryTimeOrderedKeyValueBuffer follows the other StateStore classes.