Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here

JIRA: KAFKA-6849

...

The following methods would be added to the the Java KTable interface: 

Code Block
languagejava
titleNew KTable methods
linenumberstrue
<VR> KTable<K, VR> transformValues(final ValueTransformerSupplier< ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
                                   final String... stateStoreNames); 
 
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
                                   final String... stateStoreNames);
 
<VR> KTable<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplierMaterialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                                   final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized String... stateStoreNames);

The following methods would be added to the Scala KTable class.

Code Block
languagescala
titleAdditional Scala KTable methods
def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
                                   final String... stateStoreNames);
 
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplierstateStoreNames: String*): KTable[K, VR]

def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
                                   final Materialized<Kmaterialized: Materialized[K, VR, KeyValueStore<BytesKeyValueStore[Bytes, byteArray[Byte]]>> materialized],
                        stateStoreNames: String*):          final String... stateStoreNames);KTable[K, VR]

 

Proposed Changes

The new methods on `KTableImpl` will add a new KTableTransformValues processor node and attach any state stores.  The new KTableTransformValues will be implemented in a similar manner to other processors, instantiating the user supplied transformer once per task.

The `ProcessorContext` passed to the `init()` method of transformer implementations will be restricted. Any : any call to any variant of the `forward()` method will throw a `StreamsException`, otherwise implementations could output . This will stop implementations outputting values with a new key. Any call to `getStateStore` where the requested store is the one passed in via the `materialized` parameter will throw a `StreamsException`, as this state store is managed by the the streams library.

Compatibility, Deprecation, and Migration Plan

...

Unit tests to cover new classes and methods. Integration or system test are not required.

Rejected Alternatives

NoneInclude overloads that take a `ValueTransformerSupplier`, matching the overloads available on `KStream`.  These were no included as it keeps the interface more succinct, users can ignore the key value if they do not need it, and likely these overloads on the `KStream` interface will be deprecated, in favour of the 'WithKey' variants, in time.