Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Sometimes, the user may want to query the intermediate state of the suppression. However, as of 2.34, the KTable#suppress operator (added 2.1) lacks this functionality.

Proposed Changes

This KIP proposes the following to make the suppression buffer state queriable, and add a variant method to specify the queryableStoreName: :

  1. Add ReadOnlySuppressionBuffer, SuppressionBuffer.
  2. Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder].
  3. Add a new QueryableStoreType, SuppressionBufferType.
  4. Add KTable#suppress(Suppressed,

...

  1. Materialized).
  2. Add TopologyTestDriver#getSuppressionBuffer.

User can query the state of suppression buffer like the following:

Code Block
languagejava
    // A queryable interface to the suppression buffer, named "suppression-buffer".
    final ReadOnlyKeyValueStore<StringReadOnlySuppressionBuffer<String, String> myMapStore =
        kafkaStreams.store(StoreQueryParameters.fromNameAndType("suppression-buffer", QueryableStoreTypes.suppressionBuffer(Serdes.String(), Serdes.String()));

Public Interfaces

...

1. Add ReadOnlySuppressionBuffer, SuppressionBuffer

SuppressionBuffer is a writable interface of suppression buffer; dislike to TimeOrderedKeyValueStore (current implementation), this is user-face interface.

Code Block
languagejava
/**
 * Interface for storing the suppressed records.
 * <p>
 * This class is not thread-safe.
 *
 * @param <K> type of the record keys
 * @param <V> type of the record values
 */
public interface KTable<KSuppressionBuffer<K, V> extends StateStore, ReadOnlySuppressionBuffer<K, V> {

    /**
     * An ...
evicted record.
     *
     * @param <K> type of the record keys
    /** * @param <V> type of the record values
     * @see
     * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration.
     * <p>
     * This controls what updates downstream table and stream operations will receive./
    final class Eviction<K, V> {
        private final K key;
        private final Change<V> value;
        private final RecordContext recordContext;

        public Eviction(final K key, final Change<V> value, final RecordContext recordContext) {
            this.key = key;
            this.value = value;
            this.recordContext = recordContext;
        }

        public K key() {
            return key;
        }

        public Change<V> value() {
            return value;
        }

        public RecordContext recordContext() {
            return recordContext;
        }

        @Override
        public String toString() {
            return "Eviction{key=" + key + ", value=" + value + ", recordContext=" + recordContext + '}';
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            final Eviction<?, ?> eviction = (Eviction<?, ?>) o;
            return Objects.equals(key, eviction.key) &&
                    Objects.equals(value, eviction.value) &&
                    Objects.equals(recordContext, eviction.recordContext);
        }

        @Override
        public int hashCode() {
            return Objects.hash(key, value, recordContext);
        }
    }

    /**
     * Add a value associated with this key.
     *
     * @param time
     * @param key
     * @param suppressedvalue
     * @param recordContext
     Configuration object determining what, if any, updates to suppress*/
    void put(long time, K key, Change<V> value, RecordContext recordContext);

    /**
     * Evict stored records which satisfy {@code predicate}.
     *
     * @param predicate a boolean {@link java.util.function.Supplier}
     * @param callback  the callback invoked after the eviction
     */
    void evictWhile(final Supplier<Boolean> predicate, final Consumer<Eviction<K, V>> callback);

ReadOnlySuppressionBuffer provides read-only view of SuppressionBuffer. It extends ReadOnlyKeyValueStore so can be treated like a key-value mappings.

Code Block
languagejava
/**
 * A suppression buffer that only supports read operations.
 *
 * @param <K> type of the record keys
 * @param <V> type of the record values
 */
public interface ReadOnlySuppressionBuffer<K, V> extends ReadOnlyKeyValueStore<K, V> {

    /** queryableStoreName    A queryableStoreName of suppression buffer
     * @return A new Returns suppressed view of the value associated with {@code KTablekey} with the desired suppression characteristics. timestamp, if exists.
     * If not, returns null.
     *
     * @param key record key
     * @return Suppressed view of the value associated with given key, with timestamp (if exists)
     */
    NullableValueAndTimestamp<V> priorValueForBuffered(K key);

    /**
     * Returns the number of key/value pairs suppressed in this buffer.
     *
     * @return the number of key/value pairs suppressed in this buffer
     */
    KTable<K, V> suppress(final Suppressed<? super K> suppressed, final String queryableStoreName);

    ....

...

 int numRecords();

    /**
     * Returns the size of this buffer, in bytes.
     *
     * @return the size of the buffer, in bytes
     */
    long bufferSize();

    /**
     * Returns the timestamp of the oldest record in this buffer. {@link Long#MAX_VALUE} iff the buffer is empty.
     *
     * @return the timestamp of the oldest record in this buffer
     */
    long minTimestamp();
}

2. Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder]

These methods are counterparts of other Stores methods, like Stores#[inMemoryKeyValueStore, keyValueStoreBuilder]. Using this methods, users can instantiate suppression buffer even in the lower level api.

Code Block
languagejava
public final class QueryableStoreTypes { Stores {

    ...

    /**
     * Create an in-memory {@link SuppressionBytesStoreSupplier}.
     *
     * @param name  name of the store (cannot be {@code null})
     * @return an instance of a {@link SuppressionBytesStoreSupplier}
     */
    public static SuppressionBytesStoreSupplier inMemorySuppressionBuffer(final String name);

    ...

    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<KStoreBuilder<SuppressionBuffer<K, ValueAndTimestamp<V>>>V>> timestampedKeyValueStore() {
suppressionBufferBuilder(final SuppressionBytesStoreSupplier supplier,
                                                                                        final Serde<K> keySerde,
           return new TimestampedKeyValueStoreType<>();
    }

    ...

    public static class SuppressionBufferType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {

                                                                             final Serde<V> valueSerde);

    ...


}

Here are the public interface of SuppressionBytesStoreSupplier, SuppressionBufferBuilder:

Code Block
languagejava
/**
 * A store supplier that can be used to create one or more {@link SuppressionBuffer SuppressionBuffer<Byte, byte[]>} instances.
 */
public interface SuppressionBytesStoreSupplier extends StoreSupplier<SuppressionBuffer<Bytes, byte[]>> {

    /**
     * Returns {@link SuppressionBuffer SuppressionBuffer<Byte, byte[]>} with logging enabled/disabled.
     */
    SuppressionBuffer<Bytes, byte[]> get(boolean loggingEnabled);
}
Code Block
languagejava
public class SuppressionBufferBuilder<K, V> extends AbstractStoreBuilder<K, V, SuppressionBuffer<K, V>> {

    private final SuppressionBytesStoreSupplier storeSupplier;

    public SuppressionBufferBuilder(final SuppressionBytesStoreSupplier storeSupplier,
                      private              final Serde<K> keySerde;,
            private                        final Serde<V> valueSerde;
,
                                    final Time time) {
        SuppressionBufferType(final Serde<K> keySerde, final Serde<V> valueSerde) {
super(Objects.requireNonNull(storeSupplier, "supplier cannot be null").name(), keySerde, valueSerde, time);
        this.storeSupplier = storeSupplier;
    }

    @Override
    public SuppressionBuffer<K, V> build() {
        return new MeteredSuppressionBuffer<>(
             super(new HashSet<>(Arrays.asList(   storeSupplier.get(enableLogging),
                storeSupplier.metricsScope(),
                time,
                keySerde,
                valueSerde);
    TimeOrderedKeyValueBuffer.class,}
}

Add a new QueryableStoreType, SuppressionBufferType

This change makes SuppressionBuffer queriable.

Code Block
languagejava
public final class QueryableStoreTypes {

    ...

    /**
     * A {@link QueryableStoreType} that accepts {@link ReadOnlySuppressionBuffer}.
     *
     * @param <K> key type of the store
     * @param <V> value type of the store
     * @return {@link ReadOnlyKeyValueStore.class)))QueryableStoreTypes.SuppressionBufferType}
     */
    public static <K, V> QueryableStoreType<ReadOnlySuppressionBuffer<K, V>> suppressionBuffer() {
        return new SuppressionBufferType<>();
    }

    ...

    
public static class SuppressionBufferType<K, V> extends this.keySerde = keySerde;
QueryableStoreTypeMatcher<ReadOnlySuppressionBuffer<K, V>> {

        SuppressionBufferType() {
        this.valueSerde   = valueSerde super(Collections.singleton(ReadOnlySuppressionBuffer.class));
        }

        @Override
        public ReadOnlyKeyValueStore<KReadOnlySuppressionBuffer<K, V> create(final StateStoreProvider storeProvider,
                                                      final String storeName) {
            return new CompositeReadOnlyKeyValueStore<>CompositeReadOnlySuppressionBuffer<>(storeProvider, this, storeName);
        }
    }
}

4. Add a new method, KTable#suppress(Suppressed, Materialized)

Using thid method, users can specify the queriable name of suppression buffer.

Code Block
languagejava
public interface KTable<K, V> {

    ...

    /**
     * Suppress some updates  public Serde<K> keySerde() {from this changelog stream, determined by the supplied {@link Suppressed} configuration.
     * <p>
     * This returncontrols keySerde;
what updates downstream table and stream operations will }

receive.
     *
     * @param suppressed            Configuration object determining  public Serde<V> valueSerde() {
what, if any, updates to suppress
     * @param queryableStoreName    A queryableStoreName of returnsuppression valueSerde;buffer
     * @return A new {@code KTable} with the desired suppression characteristics.
     */
    KTable<K,  }
}

Compatibility, Deprecation, and Migration Plan

None.

Rejected Alternatives

Adding KTable#suppress(Suppressed, Materialized)

V> suppress(final Suppressed<? super K> suppressed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

    ....

5. Add TopologyTestDriver#getSuppressionBuffer

This method provides a testing functionality on suppression buffer.

Code Block
languagejava
public class TopologyTestDriver implements Closeable {

    ...

    public <K, V> SuppressionBuffer<K, V> getSuppressionBuffer(final String name) {

    ....

Compatibility, Deprecation, and Migration Plan

NoneSince KTable#suppress does not change the Key, Value type of the KTable instance, Materialized#with[KeySerde, ValueSerde] should be ignored. Since the underlying data structure, InMemoryTimeOrderedKeyValueBuffer, does not support caching or logging, Materialized#withCaching[Enabled, Disabled] and Materialized#withLogging[Enabled, Disabled] options are also ignored. For these reasons, a Materialized instance is too big for parameter, unless InMemoryTimeOrderedKeyValueBuffer follows the other StateStore classes.