Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Sometimes, the user may want to query From 2.1, Kafka Streams provides a way to suppress the intermediate state of the suppression. However, as of 2.4, the KTable#suppress operator (added 2.1) lacks this functionality.

Proposed Changes

This KIP proposes the following to make the suppression buffer state queriable:

  1. Add ReadOnlySuppressionBuffer, SuppressionBuffer.
  2. Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder].
  3. Add a new QueryableStoreType, SuppressionBufferType.
  4. Add KTable#suppress(Suppressed, Materialized).
  5. Add TopologyTestDriver#getSuppressionBuffer.

User can query the state of suppression buffer like the following:

Code Block
languagejava
    // A queryable interface to the suppression buffer, named "suppression-buffer".
    final ReadOnlySuppressionBuffer<String, String> myMapStore =
        kafkaStreams.store(StoreQueryParameters.fromNameAndType("suppression-buffer", QueryableStoreTypes.suppressionBuffer()));

Public Interfaces

1. Add ReadOnlySuppressionBuffer, SuppressionBuffer

SuppressionBuffer is a writable interface of suppression buffer; dislike to TimeOrderedKeyValueStore (current implementation), this is user-face interface.

Code Block
languagejava
/**
 * Interface for storing the suppressed records.
 * <p>
 * This class is not thread-safe.
 *
 * @param <K> type of the record keys
 * @param <V> type of the record values
 */
public interface SuppressionBuffer<K, V> extends StateStore, ReadOnlySuppressionBuffer<K, V> {

    /**
     * An evicted record.
     *
     * @param <K> type of the record keys
     * @param <V> type of the record values
     * @see
     */
    final class Eviction<K, V> {
        private final K key;
        private final Change<V> value;
        private final RecordContext recordContext;

        public Eviction(final K key, final Change<V> value, final RecordContext recordContext) {
            this.key = key;
            this.value = value;
            this.recordContext = recordContext;
        }

        public K key() {
            return key;
        }

        public Change<V> value() {
            return value;
        }

        public RecordContext recordContext() {
            return recordContext;
        }

        @Override
        public String toString() {
            return "Eviction{key=" + key + ", value=" + value + ", recordContext=" + recordContext + '}';
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            final Eviction<?, ?> eviction = (Eviction<?, ?>) o;
            return Objects.equals(key, eviction.key) &&
                    Objects.equals(value, eviction.value) &&
                    Objects.equals(recordContext, eviction.recordContext);
        }

        @Override
        public int hashCode() {
            return Objects.hash(key, value, recordContext);
        }
    }

    /**
     * Add a value associated with this key.
     *
     * @param time
     * @param key
     * @param value
     * @param recordContext
     */
    void put(long time, K key, Change<V> value, RecordContext recordContext);

    /**
     * Evict stored records which satisfy {@code predicate}.
     *
     * @param predicate a boolean {@link java.util.function.Supplier}
     * @param callback  the callback invoked after the eviction
     */
    void evictWhile(final Supplier<Boolean> predicate, final Consumer<Eviction<K, V>> callback);

ReadOnlySuppressionBuffer provides read-only view of SuppressionBuffer. It extends ReadOnlyKeyValueStore so can be treated like a key-value mappings.

KTable (KIP-328: Ability to suppress updates for KTables). The 'KTable#suppress' operator introduced in KIP-328 controls what updates downstream table and stream operations will receive. With this feature, the contents of the upstream table are disjointed into two groups, one for the intermediate state in the suppression buffer and the other for final states emitted to the downstream table. The user can query the associated value to a specific key in the downstream table by querying the upstream table (KIP-67: Queryable state for Kafka Streams), since all of the key-value mappings in the downstream table are also stored in the upstream table.

However, there is a limitation; if the user only wants to retrieve the associated value to a specified key (like `ReadOnlyKeyValueStore#get`), it is okay. But if what the user wants is getting an iterator to a suppressed view (like `ReadOnlyKeyValueStore#range` or `ReadOnlyKeyValueStore#all`), we stuck in trouble - since there is no way to identify which key is flushed out beforehand.

One available workaround is materializing the downstream table like `downstreamTable.filter(e -> true, Materialized.as("final-state"))`. However, this way is cumbersome.

Proposed Changes

This KIP proposes to add a variant of suppress method with Materialize configuration, 'KTable#suppress(Suppressed, Materialized)'.

Public Interfaces

1. Java DSL

Code Block
languagejava
public interface KTable<K, V> {

    ...
Code Block
languagejava
/**
 * A suppression buffer that only supports read operations.
 *
 * @param <K> type of the record keys
 * @param <V> type of the record values
 */
public interface ReadOnlySuppressionBuffer<K, V> extends ReadOnlyKeyValueStore<K, V> {

    /**
     * Returns suppressed view of the value associated with {@code key} with timestamp, if exists.
     * If not, returns null.
     *
     * @param key record key
     * @return Suppressed view of the value associated with given key, with timestamp (if exists)
     */
    NullableValueAndTimestamp<V> priorValueForBuffered(K key);

    /**
     * Returns the number of key/value pairs suppressed in this buffer.
     *
     * @return the number of key/value pairs suppressed in this buffer
     */
    int numRecords();

    /**
     * ReturnsSuppress thesome sizeupdates offrom this buffer, in bytes.
     *
     * @return the size of the buffer, in bytes
     */
    long bufferSize();

    /**
     * Returns the timestamp of the oldest record in this buffer. {@link Long#MAX_VALUE} iff the buffer is emptychangelog stream, determined by the supplied {@link Suppressed} configuration.
     *
     * @return the timestamp of the oldest record in this buffer <p>
     */
 This controls what long minTimestamp();
}

Dislike to ValueAndTimestamp, NullableValueAndTimestamp can contain null as its value; it was introduced since suppression buffer can return a null value with a timestamp.

Code Block
languagejava
public final class NullableValueAndTimestamp<V> {
    protected final V value;
    protected final long timestamp;

    protected NullableValueAndTimestamp(final V value, final long timestamp) {
        this.value = value;
        this.timestamp = timestamp;
    }

    /**updates downstream table and stream operations will receive.
     *
     * Create@param asuppressed new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance.
     *
   Configuration object *determining @paramwhat, valueif any, updates to   the valuesuppress
     * @param timestampqueryableStoreName  the timestamp
 A    * @param <V> the type queryableStoreName of thesuppression valuebuffer
     * @return aA new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance
     */
    public static <V> NullableValueAndTimestamp<V> make(final V value,
                                                        final long timestamp) {
        return new NullableValueAndTimestamp<>(value, timestamp);
    }

    /**
     * Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter
     * if the parameter is not {@code null}.
     *
     * @param nullableValueAndTimestamp a {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance; can be {@code null}
     * @param <V> the type of the value
     * @return the wrapped {@code value} of {@code valueAndTimestamp} if not {@code null}; otherwise {@code null}@code KTable} with the desired suppression characteristics.
     */
    publicKTable<K, static <V> V getValueOrNull(final NullableValueAndTimestamp<V> nullableValueAndTimestamp) {
        return nullableValueAndTimestamp == null ? null : nullableValueAndTimestamp.value();
    }

    public V value() {
        return value;
    }

    public long timestamp() {
        return timestamp;
    }

    public ValueAndTimestamp<V> toValueAndTimestamp() {
        return value == null ? null : ValueAndTimestamp.make(value, timestamp);
    }

    @Override
    public String toString() {
        return "(" + value + "," + timestamp + ")";
    }

    @Override
    public boolean equals(final Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        final NullableValueAndTimestamp<?> that = (NullableValueAndTimestamp<?>) o;
        return timestamp == that.timestamp &&
            Objects.equals(value, that.value);
    }

    @Override
    public int hashCode() {
        return Objects.hash(value, timestamp);
    }
}

2. Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder]

These methods are counterparts of other Stores methods, like Stores#[inMemoryKeyValueStore, keyValueStoreBuilder]. Using this methods, users can instantiate suppression buffer even in the lower level api.

Code Block
languagejava
public final class Stores {

    ...

    /**
     * Create an in-memory {@link SuppressionBytesStoreSupplier}.
     *
     * @param name  name of the store (cannot be {@code null})
     * @return an instance of a {@link SuppressionBytesStoreSupplier}
     */
    public static SuppressionBytesStoreSupplier inMemorySuppressionBuffer(final String name);

    ...

    public static <K, V> StoreBuilder<SuppressionBuffer<K, V>> suppressionBufferBuilder(final SuppressionBytesStoreSupplier supplier,
                                                                                        final Serde<K> keySerde,
                                                                                        final Serde<V> valueSerde);

    ...


}

Here are the public interface of SuppressionBytesStoreSupplier, SuppressionBufferBuilder:

V> suppress(final Suppressed<? super K> suppressed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

    ....

2. Scala DSL

Code Block
languagescala
class KTable[K, V](val inner: KTableJ[K, V]) {

    ...

    /**
     * Suppress some updates from this changelog stream, determined by the supplied [[Suppressed]] configuration
Code Block
languagejava
/**
 * A store supplier that can be used to create one or more {@link SuppressionBuffer SuppressionBuffer<Byte, byte[]>} instances.
 */
public interface SuppressionBytesStoreSupplier extends StoreSupplier<SuppressionBuffer<Bytes, byte[]>> {

    /**
     * Returns {@link SuppressionBuffer SuppressionBuffer<Byte, byte[]>} with logging enabled/disabled.
     */
    SuppressionBuffer<Bytes, byte[]> get(boolean loggingEnabled);
}
Code Block
languagejava
public class SuppressionBufferBuilder<K, V> extends AbstractStoreBuilder<K, V, SuppressionBuffer<K, V>> {

    private final SuppressionBytesStoreSupplier storeSupplier;

    public SuppressionBufferBuilder(final SuppressionBytesStoreSupplier storeSupplier,
                                    final Serde<K> keySerde,
                                    final Serde<V> valueSerde,
                                    final Time time) {
        super(Objects.requireNonNull(storeSupplier, "supplier cannot be null").name(), keySerde, valueSerde, time); * This controls what updates downstream table and stream operations will receive.
        this.storeSupplier = storeSupplier;*
    }

 * @param suppressed @Override
Configuration object determining  public SuppressionBuffer<Kwhat, V> build() {
        return new MeteredSuppressionBuffer<>(
                storeSupplier.get(enableLogging),
                storeSupplier.metricsScope(),if any, updates to suppress.
     *           time,
                keySerde,
                valueSerde);
    }
}

3. Add a new QueryableStoreType, SuppressionBufferType

This change makes SuppressionBuffer queriable.

Code Block
languagejava
public final class QueryableStoreTypes {

    ...

    /**@param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
     * A {@link QueryableStoreType} that accepts {@link ReadOnlySuppressionBuffer}.
     *
     * @param <K> key type ofshould thebe storematerialized.
     * @param@return <V>A valuenew typeKTable ofwith the store
desired     * @return {@link QueryableStoreTypes.SuppressionBufferType}suppression characteristics.
     */
    public static <K, V> QueryableStoreType<ReadOnlySuppressionBuffer<K, V>> suppressionBuffer() {
        return new SuppressionBufferType<>();
    }

    ...

    
public static class SuppressionBufferType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySuppressionBuffer<K, V>> {

        SuppressionBufferType() {
            super(Collections.singleton(ReadOnlySuppressionBuffer.class));@see `org.apache.kafka.streams.kstream.KTable#suppress`
        }

*/
        @Override
        public ReadOnlySuppressionBuffer<K, V> create(final StateStoreProvider storeProvider,
                                                      final String storeName) {
            return new CompositeReadOnlySuppressionBuffer<>(storeProvider, this, storeName);
        }
    }
}

4. Add a new method, KTable#suppress(Suppressed, Materialized)

Using thid method, users can specify the queriable name of suppression buffer.

Code Block
languagejava
public interface KTable<K, V> {

    ...

    /**
     * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration.def suppress(suppressed: org.apache.kafka.streams.kstream.Suppressed[_ >: K],
     * <p>
     * This controls what updates downstream table and stream operations will receive.
     *
     * @param suppressed            Configuration object determining what, if any, updates to suppressmaterialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
     * @param queryableStoreName    A queryableStoreName of suppression buffer
     * @return A new {@code KTable} with the desired suppression characteristics.
     */
    KTable<K, V> suppress(final Suppressed<? super K> suppressed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

    ....

5. Add TopologyTestDriver#getSuppressionBuffer

This method provides a testing functionality on suppression buffer.

Code Block
languagejava
public class TopologyTestDriver implements Closeable {

    ...

    public <K, V> SuppressionBuffer<K, V> getSuppressionBuffer(final String name) {KTable(inner.suppress(suppressed, materialized))

    ....

Compatibility, Deprecation, and Migration Plan

...