You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current stateUnder Discussion

Discussion thread: thread

JIRA: KAFKA-8403

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Sometimes, the user may want to query the intermediate state of the suppression. However, as of 2.3, the KTable#suppress operator (added 2.1) lacks this functionality.

Proposed Changes

This KIP proposes to make the suppression buffer state queriable, and add a variant method to specify the queryableStoreName: KTable#suppress(Suppressed, String). User can query the state of suppression buffer like the following:

    // A queryable interface to the suppression buffer, named "suppression-buffer".
    final ReadOnlyKeyValueStore<String, String> myMapStore =
        kafkaStreams.store("suppression-buffer", QueryableStoreTypes.suppressionBuffer(Serdes.String(), Serdes.String()));

Public Interfaces

Add a new method, KTable#suppress(Suppressed, String)

public interface KTable<K, V> {

    ...

    /**
     * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration.
     * <p>
     * This controls what updates downstream table and stream operations will receive.
     *
     * @param suppressed            Configuration object determining what, if any, updates to suppress
     * @param queryableStoreName    A queryableStoreName of suppression buffer
     * @return A new {@code KTable} with the desired suppression characteristics.
     */
    KTable<K, V> suppress(final Suppressed<? super K> suppressed, final String queryableStoreName);

    ....

Add a new QueryableStoreType, SuppressionBufferType

public final class QueryableStoreTypes {

    ...

    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> timestampedKeyValueStore() {
        return new TimestampedKeyValueStoreType<>();
    }

    ...

    public static class SuppressionBufferType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {

        private final Serde<K> keySerde;
        private final Serde<V> valueSerde;

        SuppressionBufferType(final Serde<K> keySerde, final Serde<V> valueSerde) {
            super(new HashSet<>(Arrays.asList(
                    TimeOrderedKeyValueBuffer.class,
                    ReadOnlyKeyValueStore.class)));
            this.keySerde = keySerde;
            this.valueSerde = valueSerde;
        }

        @Override
        public ReadOnlyKeyValueStore<K, V> create(final StateStoreProvider storeProvider,
                                                  final String storeName) {
            return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, storeName);
        }

        public Serde<K> keySerde() {
            return keySerde;
        }

        public Serde<V> valueSerde() {
            return valueSerde;
        }

    }
}

Compatibility, Deprecation, and Migration Plan

None.

Rejected Alternatives

Adding KTable#suppress(Suppressed, Materialized)

Since KTable#suppress does not change the Key, Value type of the KTable instance, Materialized#with[KeySerde, ValueSerde] should be ignored. Since the underlying data structure, InMemoryTimeOrderedKeyValueBuffer, does not support caching or logging, Materialized#withCaching[Enabled, Disabled] and Materialized#withLogging[Enabled, Disabled] options are also ignored. For these reasons, a Materialized instance is too big for parameter, unless InMemoryTimeOrderedKeyValueBuffer follows the other StateStore classes.

  • No labels