Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

This KIP proposes to add a variant of suppress method with Materialize configuration, 'KTable#suppress(Suppressed, Materialized)'an option to make suppression state queriable by adding a queriable flag to Suppressed.

Public Interfaces

...

Code Block
languagejava
public interface Suppressed<K> KTable<K,extends V>NamedOperation<Suppressed<K>> {

    ...

    /**
     * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configurationMake the suppression queryable.
     * <p>
     * This@return controlsThe whatsame updatesconfiguration downstreamwith table and stream operations will receivequery enabled.
     */
     * @param suppressed            Configuration object determining what, if any, updates to suppressSuppressed<K> enableQuery();

     * @param queryableStoreName    A queryableStoreName of suppression buffer/**
     * @return AReturns new {@code KTable} withtrue iff the desiredquery suppressionis characteristicsenabled.
     */
    KTable<K, V> suppress(final Suppressed<? super K> suppressed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materializedboolean isQueryEnabled();

    ....

2. Scala DSL

Code Block
languagescala
class KTable[K, V](val inner: KTableJ[K, V]) {

    ...

    /**
     * Suppress some updates from this changelog stream, determined by the supplied [[Suppressed]] configuration.
     *
     * This controls what updates downstream table and stream operations will receive.
     *
     * @param suppressed Configuration object determining what, if any, updates to suppress.
     * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
     *                      should be materialized.
     * @return A new KTable with the desired suppression characteristics.
     * @see `org.apache.kafka.streams.kstream.KTable#suppress`
     */
    def suppress(suppressed: org.apache.kafka.streams.kstream.Suppressed[_ >: K],
                 materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
      new KTable(inner.suppress(suppressed, materialized))

    ....

Compatibility, Deprecation, and Migration Plan

The user can query the suppressed view with Suppressed#name, if Suppressed.isQueryEnabled is true.

Calling Suppressed#enableQuery without specifying name with Suppressed#withName is not allowed. For this case, IllegalArgumentException is thrown.

Compatibility, Deprecation, and Migration Plan

None.

Rejected Alternatives

KTable#suppress(Suppressed, Materialized<K, V, KeyValueStore>)

This approach feels more consistent with existing APIs with Materialized variant (e.g., KTable#filter(Predicate) - KTable#filter(Predicate, Materialized)) at first appearance. However, this approach introduces two concepts of the name for the same operation: Suppressed#name and Materialized#name. It is not feasible.

The current API for the Materialized variant is just a legacy of nameless operators before KIP-307. In this case, we already have Suppressed class and don't need to keep consistency with the old Materialized variant methods. So rejected.

KTable#suppress(Suppressed, String)

Another alternative is passing the state store name directly. This approach is neither consistent with the existing APIs nor has clear semantics, since it also introduces two concepts for the same operation. So rejectedNone.