Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: here todo
JIRA: here todo KAFKA-6849
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The following methods would be added to the the Java KTable
interface:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
<VR> KTable<K, VR> transformValues(final ValueTransformerSupplier<ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, final String... stateStoreNames); <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized, final String... stateStoreNames); |
The following methods would be added to the Scala KTable class.
Code Block | ||||
---|---|---|---|---|
| ||||
def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
stateStoreNames: String*): KTable[K, VR]
def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
stateStoreNames: String*): KTable[K, VR] |
Proposed Changes
The new methods on `KTableImpl` will add a new KTableTransformValues
processor node and attach any state stores. The new KTableTransformValues
will be implemented in a similar manner to other processors, instantiating the user supplied transformer once per task.
The `ProcessorContext` passed to the `init()` method of transformer implementations will be restricted: any call to any variant of the `forward()` method will throw a `StreamsException`. This will stop implementations outputting values with a new key.
Compatibility, Deprecation, and Migration Plan
...
Unit tests to cover new classes and methods. Integration or system test are not required.
Rejected Alternatives
NoneInclude overloads that take a `ValueTransformerSupplier`, matching the overloads available on `KStream`. These were no included as it keeps the interface more succinct, users can ignore the key value if they do not need it, and likely these overloads on the `KStream` interface will be deprecated, in favour of the 'WithKey' variants, in time.