Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Sometimes, the user may want to query From 2.1, Kafka Streams provides a way to suppress the intermediate state of the suppression. However, as of 2.4, the KTable#suppress operator (added 2.1) lacks this functionality.

Proposed Changes

This KIP proposes the following to make the suppression buffer state queriable:

  1. Add ReadOnlySuppressionBuffer, SuppressionBuffer.
  2. Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder].
  3. Add a new QueryableStoreType, SuppressionBufferType.
  4. Add KTable#suppress(Suppressed, Materialized).
  5. Add TopologyTestDriver#getSuppressionBuffer.

User can query the state of suppression buffer like the following:

Code Block
languagejava
    // A queryable interface to the suppression buffer, named "suppression-buffer".
    final ReadOnlySuppressionBuffer<String, String> myMapStore =
        kafkaStreams.store(StoreQueryParameters.fromNameAndType("suppression-buffer", QueryableStoreTypes.suppressionBuffer()));

Public Interfaces

1. Add ReadOnlySuppressionBuffer, SuppressionBuffer

SuppressionBuffer is a writable interface of suppression buffer; dislike to TimeOrderedKeyValueStore (current implementation), this is user-face interface.

Code Block
languagejava
/**
 * Interface for storing the suppressed records.
 * <p>
 * This class is not thread-safe.
 *
 * @param <K> type of the record keys
 * @param <V> type of the record values
 */
public interface SuppressionBuffer<K, V> extends StateStore, ReadOnlySuppressionBuffer<K, V> {

    /**
     * An evicted record.
     *
     * @param <K> type of the record keys
     * @param <V> type of the record values
     * @see
     */
    final class Eviction<K, V> {
        private final K key;
        private final Change<V> value;
        private final RecordContext recordContext;

        public Eviction(final K key, final Change<V> value, final RecordContext recordContext) {
            this.key = key;
            this.value = value;
            this.recordContext = recordContext;
        }

        public K key() {
            return key;
        }

        public Change<V> value() {
            return value;
        }

        public RecordContext recordContext() {
            return recordContext;
        }

        @Override
        public String toString() {
            return "Eviction{key=" + key + ", value=" + value + ", recordContext=" + recordContext + '}';
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            final Eviction<?, ?> eviction = (Eviction<?, ?>) o;
            return Objects.equals(key, eviction.key) &&
                    Objects.equals(value, eviction.value) &&
                    Objects.equals(recordContext, eviction.recordContext);
        }

        @Override
        public int hashCode() {
            return Objects.hash(key, value, recordContext);
        }
    }

    /**
     * Add a value associated with this key.
     *
     * @param time
     * @param key
     * @param value
     * @param recordContext
     */
    void put(long time, K key, Change<V> value, RecordContext recordContext);

    /**
     * Evict stored records which satisfy {@code predicate}.
     *
     * @param predicate a boolean {@link java.util.function.Supplier}
     * @param callback  the callback invoked after the eviction
     */
    void evictWhile(final Supplier<Boolean> predicate, final Consumer<Eviction<K, V>> callback);

ReadOnlySuppressionBuffer provides read-only view of SuppressionBuffer. It extends ReadOnlyKeyValueStore so can be treated like a key-value mappings.

KTable (KIP-328: Ability to suppress updates for KTables). The 'KTable#suppress' operator introduced in KIP-328 controls what updates downstream table and stream operations will receive. With this feature, the contents of the upstream table are disjointed into two groups, one for the intermediate state in the suppression buffer and the other for final states emitted to the downstream table. The user can query the associated value to a specific key in the downstream table by querying the upstream table (KIP-67: Queryable state for Kafka Streams), since all of the key-value mappings in the downstream table are also stored in the upstream table.

However, there is a limitation; if the user only wants to retrieve the associated value to a specified key (like `ReadOnlyKeyValueStore#get`), it is okay. But if what the user wants is getting an iterator to a suppressed view (like `ReadOnlyKeyValueStore#range` or `ReadOnlyKeyValueStore#all`), we stuck in trouble - since there is no way to identify which key is flushed out beforehand.

One available workaround is materializing the downstream table like `downstreamTable.filter(e -> true, Materialized.as("final-state"))`. However, this way is cumbersome.

Proposed Changes

This KIP proposes to add an option to make suppression state queriable by adding a queriable flag to Suppressed.

Public Interfaces

Code Block
languagejava
public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {

    ...
Code Block
languagejava
/**
 * A suppression buffer that only supports read operations.
 *
 * @param <K> type of the record keys
 * @param <V> type of the record values
 */
public interface ReadOnlySuppressionBuffer<K, V> extends ReadOnlyKeyValueStore<K, V> {

    /**
     * Returns suppressed view of the value associated with {@code key} with timestamp, if exists.
     * If not, returns null.
     *
     * @param key record key
     * @return Suppressed view of the value associated with given key, with timestamp (if exists)
     */
    NullableValueAndTimestamp<V> priorValueForBuffered(K key);

    /**
     * ReturnsMake the number of key/value pairs suppressed in this buffersuppression queryable.
     *
     * @return theThe numbersame ofconfiguration key/valuewith pairs suppressed in this bufferquery enabled.
     */
    intSuppressed<K> numRecordsenableQuery();

    /**
     * Returns the size of this buffer, in bytes.
     *
     * @return the size of the buffer, in bytes
     */
    long bufferSize();

    /**
     * Returns the timestamp of the oldest record in this buffer. {@link Long#MAX_VALUE} Returns true iff the bufferquery is empty.
     *
     * @return the timestamp of the oldest record in this bufferenabled.
     */
    long minTimestamp();
}

Dislike to ValueAndTimestamp, NullableValueAndTimestamp can contain null as its value; it was introduced since suppression buffer can return a null value with a timestamp.

Code Block
languagejava
public final class NullableValueAndTimestamp<V> {
    protected final V value;
    protected final long timestamp;

    protected NullableValueAndTimestamp(final V value, final long timestamp) {
        this.value = value;
        this.timestamp = timestamp;
    }

    /**
     * Create a new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance.
     *
     * @param value      the value
     * @param timestamp  the timestamp
     * @param <V> the type of the value
     * @return a new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance
     */
    public static <V> NullableValueAndTimestamp<V> make(final V value,
                                                        final long timestamp) {
        return new NullableValueAndTimestamp<>(value, timestamp);
    }

    /**
     * Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter
     * if the parameter is not {@code null}.
     *
     * @param nullableValueAndTimestamp a {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance; can be {@code null}
     * @param <V> the type of the value
     * @return the wrapped {@code value} of {@code valueAndTimestamp} if not {@code null}; otherwise {@code null}
     */
    public static <V> V getValueOrNull(final NullableValueAndTimestamp<V> nullableValueAndTimestamp) {
        return nullableValueAndTimestamp == null ? null : nullableValueAndTimestamp.value();
    }

    public V value() {
        return value;
    }

    public long timestamp() {
        return timestamp;
    }

    public ValueAndTimestamp<V> toValueAndTimestamp() {
        return value == null ? null : ValueAndTimestamp.make(value, timestamp);
    }

    @Override
    public String toString() {
        return "(" + value + "," + timestamp + ")";
    }

    @Override
    public boolean equals(final Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        final NullableValueAndTimestamp<?> that = (NullableValueAndTimestamp<?>) o;
        return timestamp == that.timestamp &&
            Objects.equals(value, that.value);
    }

    @Override
    public int hashCode() {
        return Objects.hash(value, timestamp);
    }
}

2. Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder]

These methods are counterparts of other Stores methods, like Stores#[inMemoryKeyValueStore, keyValueStoreBuilder]. Using this methods, users can instantiate suppression buffer even in the lower level api.

Code Block
languagejava
public final class Stores {

    ...

    /**
     * Create an in-memory {@link SuppressionBytesStoreSupplier}.
     *
     * @param name  name of the store (cannot be {@code null})
     * @return an instance of a {@link SuppressionBytesStoreSupplier}
     */
    public static SuppressionBytesStoreSupplier inMemorySuppressionBuffer(final String name);

    ...

    public static <K, V> StoreBuilder<SuppressionBuffer<K, V>> suppressionBufferBuilder(final SuppressionBytesStoreSupplier supplier,
                                                                                        final Serde<K> keySerde,
                                                                                        final Serde<V> valueSerde);

    ...


}

Here are the public interface of SuppressionBytesStoreSupplier, SuppressionBufferBuilder:

Code Block
languagejava
/**
 * A store supplier that can be used to create one or more {@link SuppressionBuffer SuppressionBuffer<Byte, byte[]>} instances.
 */
public interface SuppressionBytesStoreSupplier extends StoreSupplier<SuppressionBuffer<Bytes, byte[]>> {

    /**
     * Returns {@link SuppressionBuffer SuppressionBuffer<Byte, byte[]>} with logging enabled/disabled.
     */
    SuppressionBuffer<Bytes, byte[]> get(boolean loggingEnabled);
}
Code Block
languagejava
public class SuppressionBufferBuilder<K, V> extends AbstractStoreBuilder<K, V, SuppressionBuffer<K, V>> {

    private final SuppressionBytesStoreSupplier storeSupplier;

    public SuppressionBufferBuilder(final SuppressionBytesStoreSupplier storeSupplier,
                                    final Serde<K> keySerde,
                                    final Serde<V> valueSerde,
                                    final Time time) {
        super(Objects.requireNonNull(storeSupplier, "supplier cannot be null").name(), keySerde, valueSerde, time);
        this.storeSupplier = storeSupplier;
    }

    @Override
    public SuppressionBuffer<K, V> build() {
        return new MeteredSuppressionBuffer<>(
                storeSupplier.get(enableLogging),
                storeSupplier.metricsScope(),
                time,
                keySerde,
                valueSerde);
    }
}

Add a new QueryableStoreType, SuppressionBufferType

This change makes SuppressionBuffer queriable.

Code Block
languagejava
public final class QueryableStoreTypes {

    ...

    /**
     * A {@link QueryableStoreType} that accepts {@link ReadOnlySuppressionBuffer}.
     *
     * @param <K> key type of the store
     * @param <V> value type of the store
     * @return {@link QueryableStoreTypes.SuppressionBufferType}
     */
    public static <K, V> QueryableStoreType<ReadOnlySuppressionBuffer<K, V>> suppressionBuffer() {
        return new SuppressionBufferType<>();
    }

    ...

    
public static class SuppressionBufferType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySuppressionBuffer<K, V>> {

        SuppressionBufferType() {
            super(Collections.singleton(ReadOnlySuppressionBuffer.class));
        }

        @Override
        public ReadOnlySuppressionBuffer<K, V> create(final StateStoreProvider storeProvider,
                                                      final String storeName) {
            return new CompositeReadOnlySuppressionBuffer<>(storeProvider, this, storeName);
        }
    }
}

4. Add a new method, KTable#suppress(Suppressed, Materialized)

Using thid method, users can specify the queriable name of suppression buffer.

Code Block
languagejava
public interface KTable<K, V> {

    ...

    /**
     * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration.
     * <p>
     * This controls what updates downstream table and stream operations will receive.
     *
     * @param suppressed            Configuration object determining what, if any, updates to suppress
     * @param queryableStoreName    A queryableStoreName of suppression buffer
     * @return A new {@code KTable} with the desired suppression characteristics.
     */
    KTable<K, V> suppress(final Suppressed<? super K> suppressed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

    ....

5. Add TopologyTestDriver#getSuppressionBuffer

This method provides a testing functionality on suppression buffer.

Code Block
languagejava
public class TopologyTestDriver implements Closeable {

    ...

    public <K, V> SuppressionBuffer<K, V> getSuppressionBuffer(final String name) {

    ....

Compatibility, Deprecation, and Migration Plan

boolean isQueryEnabled();

    ....

The user can query the suppressed view with Suppressed#name, if Suppressed.isQueryEnabled is true.

Calling Suppressed#enableQuery without specifying name with Suppressed#withName is not allowed. For this case, IllegalArgumentException is thrown.

Compatibility, Deprecation, and Migration Plan

None.

Rejected Alternatives

KTable#suppress(Suppressed, Materialized<K, V, KeyValueStore>)

This approach feels more consistent with existing APIs with Materialized variant (e.g., KTable#filter(Predicate) - KTable#filter(Predicate, Materialized)) at first appearance. However, this approach introduces two concepts of the name for the same operation: Suppressed#name and Materialized#name. It is not feasible.

The current API for the Materialized variant is just a legacy of nameless operators before KIP-307. In this case, we already have Suppressed class and don't need to keep consistency with the old Materialized variant methods. So rejected.

KTable#suppress(Suppressed, String)

Another alternative is passing the state store name directly. This approach is neither consistent with the existing APIs nor has clear semantics, since it also introduces two concepts for the same operation. So rejectedNone.