Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here

JIRA: KAFKA-6849

...

The following methods would be added to the the Java KTable interface: 

Code Block
languagejava
titleNew KTable methods
linenumberstrue
<VR> KTable<K, VR> transformValues(final ValueTransformerSupplier< ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
                                   final String... stateStoreNames); 
 
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
                                   final String... stateStoreNames);
 
<VR> KTable<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplierMaterialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                                   final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized String... stateStoreNames);

The following methods would be added to the Scala KTable class.

Code Block
languagescala
titleAdditional Scala KTable methods
def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
                                   final String... stateStoreNames);
 
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplierstateStoreNames: String*): KTable[K, VR]

def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
                                   final Materialized<Kmaterialized: Materialized[K, VR, KeyValueStore<BytesKeyValueStore[Bytes, byteArray[Byte]>> materialized]],
                        stateStoreNames: String*):          final String... stateStoreNames);KTable[K, VR]

 

Proposed Changes

The new methods on `KTableImpl` will add a new KTableTransformValues processor node and attach any state stores.  The new KTableTransformValues will be implemented in a similar manner to other processors, instantiating the user supplied transformer once per task.

The `ProcessorContext` passed to the `init()` method of transformer implementations will be restricted: any call to any variant of the `forward()` method will throw a `StreamsException`. This will stop implementations outputting values with a new key.

Compatibility, Deprecation, and Migration Plan

...

Unit tests to cover new classes and methods. Integration or system test are not required.

Rejected Alternatives

NoneInclude overloads that take a `ValueTransformerSupplier`, matching the overloads available on `KStream`.  These were no included as it keeps the interface more succinct, users can ignore the key value if they do not need it, and likely these overloads on the `KStream` interface will be deprecated, in favour of the 'WithKey' variants, in time.