You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current stateUnder Discussion

Discussion thread: thread

JIRA: KAFKA-8403

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Sometimes, the user may want to query the intermediate state of the suppression. However, as of 2.4, the KTable#suppress operator (added 2.1) lacks this functionality.

Proposed Changes

This KIP proposes the following to make the suppression buffer state queriable:

  1. Add ReadOnlySuppressionBuffer, SuppressionBuffer.
  2. Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder].
  3. Add a new QueryableStoreType, SuppressionBufferType.
  4. Add KTable#suppress(Suppressed, Materialized).
  5. Add TopologyTestDriver#getSuppressionBuffer.

User can query the state of suppression buffer like the following:

    // A queryable interface to the suppression buffer, named "suppression-buffer".
    final ReadOnlySuppressionBuffer<String, String> myMapStore =
        kafkaStreams.store(StoreQueryParameters.fromNameAndType("suppression-buffer", QueryableStoreTypes.suppressionBuffer()));

Public Interfaces

1. Add ReadOnlySuppressionBuffer, SuppressionBuffer

SuppressionBuffer is a writable interface of suppression buffer; dislike to TimeOrderedKeyValueStore (current implementation), this is user-face interface.

/**
 * Interface for storing the suppressed records.
 * <p>
 * This class is not thread-safe.
 *
 * @param <K> type of the record keys
 * @param <V> type of the record values
 */
public interface SuppressionBuffer<K, V> extends StateStore, ReadOnlySuppressionBuffer<K, V> {

    /**
     * An evicted record.
     *
     * @param <K> type of the record keys
     * @param <V> type of the record values
     * @see
     */
    final class Eviction<K, V> {
        private final K key;
        private final Change<V> value;
        private final RecordContext recordContext;

        public Eviction(final K key, final Change<V> value, final RecordContext recordContext) {
            this.key = key;
            this.value = value;
            this.recordContext = recordContext;
        }

        public K key() {
            return key;
        }

        public Change<V> value() {
            return value;
        }

        public RecordContext recordContext() {
            return recordContext;
        }

        @Override
        public String toString() {
            return "Eviction{key=" + key + ", value=" + value + ", recordContext=" + recordContext + '}';
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            final Eviction<?, ?> eviction = (Eviction<?, ?>) o;
            return Objects.equals(key, eviction.key) &&
                    Objects.equals(value, eviction.value) &&
                    Objects.equals(recordContext, eviction.recordContext);
        }

        @Override
        public int hashCode() {
            return Objects.hash(key, value, recordContext);
        }
    }

    /**
     * Add a value associated with this key.
     *
     * @param time
     * @param key
     * @param value
     * @param recordContext
     */
    void put(long time, K key, Change<V> value, RecordContext recordContext);

    /**
     * Evict stored records which satisfy {@code predicate}.
     *
     * @param predicate a boolean {@link java.util.function.Supplier}
     * @param callback  the callback invoked after the eviction
     */
    void evictWhile(final Supplier<Boolean> predicate, final Consumer<Eviction<K, V>> callback);

ReadOnlySuppressionBuffer provides read-only view of SuppressionBuffer. It extends ReadOnlyKeyValueStore so can be treated like a key-value mappings.

/**
 * A suppression buffer that only supports read operations.
 *
 * @param <K> type of the record keys
 * @param <V> type of the record values
 */
public interface ReadOnlySuppressionBuffer<K, V> extends ReadOnlyKeyValueStore<K, V> {

    /**
     * Returns suppressed view of the value associated with {@code key} with timestamp, if exists.
     * If not, returns null.
     *
     * @param key record key
     * @return Suppressed view of the value associated with given key, with timestamp (if exists)
     */
    NullableValueAndTimestamp<V> priorValueForBuffered(K key);

    /**
     * Returns the number of key/value pairs suppressed in this buffer.
     *
     * @return the number of key/value pairs suppressed in this buffer
     */
    int numRecords();

    /**
     * Returns the size of this buffer, in bytes.
     *
     * @return the size of the buffer, in bytes
     */
    long bufferSize();

    /**
     * Returns the timestamp of the oldest record in this buffer. {@link Long#MAX_VALUE} iff the buffer is empty.
     *
     * @return the timestamp of the oldest record in this buffer
     */
    long minTimestamp();
}

Dislike to ValueAndTimestamp, NullableValueAndTimestamp can contain null as its value; it was introduced since suppression buffer can return a null value with a timestamp.

public final class NullableValueAndTimestamp<V> {
    protected final V value;
    protected final long timestamp;

    protected NullableValueAndTimestamp(final V value, final long timestamp) {
        this.value = value;
        this.timestamp = timestamp;
    }

    /**
     * Create a new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance.
     *
     * @param value      the value
     * @param timestamp  the timestamp
     * @param <V> the type of the value
     * @return a new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance
     */
    public static <V> NullableValueAndTimestamp<V> make(final V value,
                                                        final long timestamp) {
        return new NullableValueAndTimestamp<>(value, timestamp);
    }

    /**
     * Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter
     * if the parameter is not {@code null}.
     *
     * @param nullableValueAndTimestamp a {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance; can be {@code null}
     * @param <V> the type of the value
     * @return the wrapped {@code value} of {@code valueAndTimestamp} if not {@code null}; otherwise {@code null}
     */
    public static <V> V getValueOrNull(final NullableValueAndTimestamp<V> nullableValueAndTimestamp) {
        return nullableValueAndTimestamp == null ? null : nullableValueAndTimestamp.value();
    }

    public V value() {
        return value;
    }

    public long timestamp() {
        return timestamp;
    }

    public ValueAndTimestamp<V> toValueAndTimestamp() {
        return value == null ? null : ValueAndTimestamp.make(value, timestamp);
    }

    @Override
    public String toString() {
        return "(" + value + "," + timestamp + ")";
    }

    @Override
    public boolean equals(final Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        final NullableValueAndTimestamp<?> that = (NullableValueAndTimestamp<?>) o;
        return timestamp == that.timestamp &&
            Objects.equals(value, that.value);
    }

    @Override
    public int hashCode() {
        return Objects.hash(value, timestamp);
    }
}

2. Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder]

These methods are counterparts of other Stores methods, like Stores#[inMemoryKeyValueStore, keyValueStoreBuilder]. Using this methods, users can instantiate suppression buffer even in the lower level api.

public final class Stores {

    ...

    /**
     * Create an in-memory {@link SuppressionBytesStoreSupplier}.
     *
     * @param name  name of the store (cannot be {@code null})
     * @return an instance of a {@link SuppressionBytesStoreSupplier}
     */
    public static SuppressionBytesStoreSupplier inMemorySuppressionBuffer(final String name);

    ...

    public static <K, V> StoreBuilder<SuppressionBuffer<K, V>> suppressionBufferBuilder(final SuppressionBytesStoreSupplier supplier,
                                                                                        final Serde<K> keySerde,
                                                                                        final Serde<V> valueSerde);

    ...


}

Here are the public interface of SuppressionBytesStoreSupplier, SuppressionBufferBuilder:

/**
 * A store supplier that can be used to create one or more {@link SuppressionBuffer SuppressionBuffer<Byte, byte[]>} instances.
 */
public interface SuppressionBytesStoreSupplier extends StoreSupplier<SuppressionBuffer<Bytes, byte[]>> {

    /**
     * Returns {@link SuppressionBuffer SuppressionBuffer<Byte, byte[]>} with logging enabled/disabled.
     */
    SuppressionBuffer<Bytes, byte[]> get(boolean loggingEnabled);
}
public class SuppressionBufferBuilder<K, V> extends AbstractStoreBuilder<K, V, SuppressionBuffer<K, V>> {

    private final SuppressionBytesStoreSupplier storeSupplier;

    public SuppressionBufferBuilder(final SuppressionBytesStoreSupplier storeSupplier,
                                    final Serde<K> keySerde,
                                    final Serde<V> valueSerde,
                                    final Time time) {
        super(Objects.requireNonNull(storeSupplier, "supplier cannot be null").name(), keySerde, valueSerde, time);
        this.storeSupplier = storeSupplier;
    }

    @Override
    public SuppressionBuffer<K, V> build() {
        return new MeteredSuppressionBuffer<>(
                storeSupplier.get(enableLogging),
                storeSupplier.metricsScope(),
                time,
                keySerde,
                valueSerde);
    }
}

Add a new QueryableStoreType, SuppressionBufferType

This change makes SuppressionBuffer queriable.

public final class QueryableStoreTypes {

    ...

    /**
     * A {@link QueryableStoreType} that accepts {@link ReadOnlySuppressionBuffer}.
     *
     * @param <K> key type of the store
     * @param <V> value type of the store
     * @return {@link QueryableStoreTypes.SuppressionBufferType}
     */
    public static <K, V> QueryableStoreType<ReadOnlySuppressionBuffer<K, V>> suppressionBuffer() {
        return new SuppressionBufferType<>();
    }

    ...

    
public static class SuppressionBufferType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySuppressionBuffer<K, V>> {

        SuppressionBufferType() {
            super(Collections.singleton(ReadOnlySuppressionBuffer.class));
        }

        @Override
        public ReadOnlySuppressionBuffer<K, V> create(final StateStoreProvider storeProvider,
                                                      final String storeName) {
            return new CompositeReadOnlySuppressionBuffer<>(storeProvider, this, storeName);
        }
    }
}

4. Add a new method, KTable#suppress(Suppressed, Materialized)

Using thid method, users can specify the queriable name of suppression buffer.

public interface KTable<K, V> {

    ...

    /**
     * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration.
     * <p>
     * This controls what updates downstream table and stream operations will receive.
     *
     * @param suppressed            Configuration object determining what, if any, updates to suppress
     * @param queryableStoreName    A queryableStoreName of suppression buffer
     * @return A new {@code KTable} with the desired suppression characteristics.
     */
    KTable<K, V> suppress(final Suppressed<? super K> suppressed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

    ....

5. Add TopologyTestDriver#getSuppressionBuffer

This method provides a testing functionality on suppression buffer.

public class TopologyTestDriver implements Closeable {

    ...

    public <K, V> SuppressionBuffer<K, V> getSuppressionBuffer(final String name) {

    ....

Compatibility, Deprecation, and Migration Plan

None.

  • No labels