...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Sometimes, the user may want to query From 2.1, Kafka Streams provides a way to suppress the intermediate state of the suppression. However, as of 2.4, the KTable#suppress
operator (added 2.1) lacks this functionality.
Proposed Changes
This KIP proposes the following to make the suppression buffer state queriable:
- Add ReadOnlySuppressionBuffer, SuppressionBuffer.
- Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder].
- Add a new QueryableStoreType, SuppressionBufferType.
- Add KTable#suppress(Suppressed, Materialized).
- Add TopologyTestDriver#getSuppressionBuffer.
User can query the state of suppression buffer like the following:
Code Block | ||
---|---|---|
| ||
// A queryable interface to the suppression buffer, named "suppression-buffer".
final ReadOnlySuppressionBuffer<String, String> myMapStore =
kafkaStreams.store(StoreQueryParameters.fromNameAndType("suppression-buffer", QueryableStoreTypes.suppressionBuffer())); |
Public Interfaces
1. Add ReadOnlySuppressionBuffer, SuppressionBuffer
SuppressionBuffer
is a writable interface of suppression buffer; dislike to TimeOrderedKeyValueStore
(current implementation), this is user-face interface.
Code Block | ||
---|---|---|
| ||
/**
* Interface for storing the suppressed records.
* <p>
* This class is not thread-safe.
*
* @param <K> type of the record keys
* @param <V> type of the record values
*/
public interface SuppressionBuffer<K, V> extends StateStore, ReadOnlySuppressionBuffer<K, V> {
/**
* An evicted record.
*
* @param <K> type of the record keys
* @param <V> type of the record values
* @see
*/
final class Eviction<K, V> {
private final K key;
private final Change<V> value;
private final RecordContext recordContext;
public Eviction(final K key, final Change<V> value, final RecordContext recordContext) {
this.key = key;
this.value = value;
this.recordContext = recordContext;
}
public K key() {
return key;
}
public Change<V> value() {
return value;
}
public RecordContext recordContext() {
return recordContext;
}
@Override
public String toString() {
return "Eviction{key=" + key + ", value=" + value + ", recordContext=" + recordContext + '}';
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final Eviction<?, ?> eviction = (Eviction<?, ?>) o;
return Objects.equals(key, eviction.key) &&
Objects.equals(value, eviction.value) &&
Objects.equals(recordContext, eviction.recordContext);
}
@Override
public int hashCode() {
return Objects.hash(key, value, recordContext);
}
}
/**
* Add a value associated with this key.
*
* @param time
* @param key
* @param value
* @param recordContext
*/
void put(long time, K key, Change<V> value, RecordContext recordContext);
/**
* Evict stored records which satisfy {@code predicate}.
*
* @param predicate a boolean {@link java.util.function.Supplier}
* @param callback the callback invoked after the eviction
*/
void evictWhile(final Supplier<Boolean> predicate, final Consumer<Eviction<K, V>> callback);
|
ReadOnlySuppressionBuffer
provides read-only view of SuppressionBuffer
. It extends ReadOnlyKeyValueStore
so can be treated like a key-value mappings.
KTable (KIP-328: Ability to suppress updates for KTables). The 'KTable#suppress' operator introduced in KIP-328 controls what updates downstream table and stream operations will receive. With this feature, the contents of the upstream table are disjointed into two groups, one for the intermediate state in the suppression buffer and the other for final states emitted to the downstream table. The user can query the associated value to a specific key in the downstream table by querying the upstream table (KIP-67: Queryable state for Kafka Streams), since all of the key-value mappings in the downstream table are also stored in the upstream table.
However, there is a limitation; if the user only wants to retrieve the associated value to a specified key (like `ReadOnlyKeyValueStore#get`), it is okay. But if what the user wants is getting an iterator to a suppressed view (like `ReadOnlyKeyValueStore#range` or `ReadOnlyKeyValueStore#all`), we stuck in trouble - since there is no way to identify which key is flushed out beforehand.
One available workaround is materializing the downstream table like `downstreamTable.filter(e -> true, Materialized.as("final-state"))`. However, this way is cumbersome.
Proposed Changes
This KIP proposes to add an option to make suppression state queriable by adding a queriable flag to Suppressed.
Public Interfaces
Code Block | ||
---|---|---|
| ||
public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
... | ||
Code Block | ||
| ||
/** * A suppression buffer that only supports read operations. * * @param <K> type of the record keys * @param <V> type of the record values */ public interface ReadOnlySuppressionBuffer<K, V> extends ReadOnlyKeyValueStore<K, V> { /** * Returns suppressed view of the value associated with {@code key} with timestamp, if exists. * If not, returns null. * * @param key record key * @return Suppressed view of the value associated with given key, with timestamp (if exists) */ NullableValueAndTimestamp<V> priorValueForBuffered(K key); /** * ReturnsMake the number of key/value pairs suppressed in this buffersuppression queryable. * * @return theThe numbersame ofconfiguration key/valuewith pairs suppressed in this bufferquery enabled. */ intSuppressed<K> numRecordsenableQuery(); /** * Returns the size of this buffer, in bytes. * * @return the size of the buffer, in bytes */ long bufferSize(); /** * Returns the timestamp of the oldest record in this buffer. {@link Long#MAX_VALUE} Returns true iff the bufferquery is empty. * * @return the timestamp of the oldest record in this bufferenabled. */ long minTimestamp(); } |
Dislike to ValueAndTimestamp
, NullableValueAndTimestamp
can contain null as its value; it was introduced since suppression buffer can return a null value with a timestamp.
Code Block | ||
---|---|---|
| ||
public final class NullableValueAndTimestamp<V> {
protected final V value;
protected final long timestamp;
protected NullableValueAndTimestamp(final V value, final long timestamp) {
this.value = value;
this.timestamp = timestamp;
}
/**
* Create a new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance.
*
* @param value the value
* @param timestamp the timestamp
* @param <V> the type of the value
* @return a new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance
*/
public static <V> NullableValueAndTimestamp<V> make(final V value,
final long timestamp) {
return new NullableValueAndTimestamp<>(value, timestamp);
}
/**
* Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter
* if the parameter is not {@code null}.
*
* @param nullableValueAndTimestamp a {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance; can be {@code null}
* @param <V> the type of the value
* @return the wrapped {@code value} of {@code valueAndTimestamp} if not {@code null}; otherwise {@code null}
*/
public static <V> V getValueOrNull(final NullableValueAndTimestamp<V> nullableValueAndTimestamp) {
return nullableValueAndTimestamp == null ? null : nullableValueAndTimestamp.value();
}
public V value() {
return value;
}
public long timestamp() {
return timestamp;
}
public ValueAndTimestamp<V> toValueAndTimestamp() {
return value == null ? null : ValueAndTimestamp.make(value, timestamp);
}
@Override
public String toString() {
return "(" + value + "," + timestamp + ")";
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final NullableValueAndTimestamp<?> that = (NullableValueAndTimestamp<?>) o;
return timestamp == that.timestamp &&
Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(value, timestamp);
}
} |
2. Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder]
These methods are counterparts of other Stores
methods, like Stores#[inMemoryKeyValueStore, keyValueStoreBuilder]
. Using this methods, users can instantiate suppression buffer even in the lower level api.
Code Block | ||
---|---|---|
| ||
public final class Stores {
...
/**
* Create an in-memory {@link SuppressionBytesStoreSupplier}.
*
* @param name name of the store (cannot be {@code null})
* @return an instance of a {@link SuppressionBytesStoreSupplier}
*/
public static SuppressionBytesStoreSupplier inMemorySuppressionBuffer(final String name);
...
public static <K, V> StoreBuilder<SuppressionBuffer<K, V>> suppressionBufferBuilder(final SuppressionBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde);
...
} |
Here are the public interface of SuppressionBytesStoreSupplier
, SuppressionBufferBuilder
:
Code Block | ||
---|---|---|
| ||
/**
* A store supplier that can be used to create one or more {@link SuppressionBuffer SuppressionBuffer<Byte, byte[]>} instances.
*/
public interface SuppressionBytesStoreSupplier extends StoreSupplier<SuppressionBuffer<Bytes, byte[]>> {
/**
* Returns {@link SuppressionBuffer SuppressionBuffer<Byte, byte[]>} with logging enabled/disabled.
*/
SuppressionBuffer<Bytes, byte[]> get(boolean loggingEnabled);
} |
Code Block | ||
---|---|---|
| ||
public class SuppressionBufferBuilder<K, V> extends AbstractStoreBuilder<K, V, SuppressionBuffer<K, V>> {
private final SuppressionBytesStoreSupplier storeSupplier;
public SuppressionBufferBuilder(final SuppressionBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
super(Objects.requireNonNull(storeSupplier, "supplier cannot be null").name(), keySerde, valueSerde, time);
this.storeSupplier = storeSupplier;
}
@Override
public SuppressionBuffer<K, V> build() {
return new MeteredSuppressionBuffer<>(
storeSupplier.get(enableLogging),
storeSupplier.metricsScope(),
time,
keySerde,
valueSerde);
}
} |
3. Add a new QueryableStoreType, SuppressionBufferType
This change makes SuppressionBuffer
queriable.
Code Block | ||
---|---|---|
| ||
public final class QueryableStoreTypes {
...
/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlySuppressionBuffer}.
*
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link QueryableStoreTypes.SuppressionBufferType}
*/
public static <K, V> QueryableStoreType<ReadOnlySuppressionBuffer<K, V>> suppressionBuffer() {
return new SuppressionBufferType<>();
}
...
public static class SuppressionBufferType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySuppressionBuffer<K, V>> {
SuppressionBufferType() {
super(Collections.singleton(ReadOnlySuppressionBuffer.class));
}
@Override
public ReadOnlySuppressionBuffer<K, V> create(final StateStoreProvider storeProvider,
final String storeName) {
return new CompositeReadOnlySuppressionBuffer<>(storeProvider, this, storeName);
}
}
} |
4. Add a new method, KTable#suppress(Suppressed, Materialized)
Using thid method, users can specify the queriable name of suppression buffer.
Code Block | ||
---|---|---|
| ||
public interface KTable<K, V> {
...
/**
* Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration.
* <p>
* This controls what updates downstream table and stream operations will receive.
*
* @param suppressed Configuration object determining what, if any, updates to suppress
* @param queryableStoreName A queryableStoreName of suppression buffer
* @return A new {@code KTable} with the desired suppression characteristics.
*/
KTable<K, V> suppress(final Suppressed<? super K> suppressed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
.... |
5. Add TopologyTestDriver#getSuppressionBuffer
This method provides a testing functionality on suppression buffer.
Code Block | ||
---|---|---|
| ||
public class TopologyTestDriver implements Closeable {
...
public <K, V> SuppressionBuffer<K, V> getSuppressionBuffer(final String name) {
.... |
Compatibility, Deprecation, and Migration Plan
boolean isQueryEnabled();
.... |
The user can query the suppressed view with Suppressed#name
, if Suppressed.isQueryEnabled
is true.
Calling Suppressed#enableQuery
without specifying name with Suppressed#withName
is not allowed. For this case, IllegalArgumentException
is thrown.
Compatibility, Deprecation, and Migration Plan
None.
Rejected Alternatives
KTable#suppress(Suppressed, Materialized<K, V, KeyValueStore>)
This approach feels more consistent with existing APIs with Materialized
variant (e.g., KTable#filter(Predicate)
- KTable#filter(Predicate, Materialized)
) at first appearance. However, this approach introduces two concepts of the name for the same operation: Suppressed#name
and Materialized#name
. It is not feasible.
The current API for the Materialized
variant is just a legacy of nameless operators before KIP-307. In this case, we already have Suppressed
class and don't need to keep consistency with the old Materialized
variant methods. So rejected.
KTable#suppress(Suppressed, String)
Another alternative is passing the state store name directly. This approach is neither consistent with the existing APIs nor has clear semantics, since it also introduces two concepts for the same operation. So rejectedNone.