Status
Current state: Under Discussion
Discussion thread: here todo
JIRA: here todo
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In Kafka Streams the KStream
interface has several transformValues
methods, where as the KTable
interface does not. The related mapValues
method is available on both KTable
and KStream
. The transformValue
method is more flexible than mapValues
:
- Stateful implementations.
transformValues
accepts a supplier of transformers, rather than the mapper instancemapValues
accepts. The supplier is used to instantiate a transformer per stream task, meaning the implementation does not need to be thread-safe. Conversely,mapValues
shares the passed in mapper instance across stream tasks, so the implementations must be thread-safe. - State-store access:
transformValues
allows access to existing state-stores, where asmapValues
does not. - Richer API: the
ValueTransformer
interface is much richer than theValueMapper
, supporting bothinit()
andclose()
calls.
There is no conceptual reason not to support the richer transformValues
on the KTable
interface.
Public Interfaces
The following methods would be added to the KTable
interface:
<VR> KTable<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, final String... stateStoreNames); <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, final String... stateStoreNames)
Proposed Changes
The new methods on `KTableImp` will be implemented:
public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> { ... private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-"; @Override public <VR> KTable<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, final String... stateStoreNames) { return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames); } @Override public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, String... stateStoreNames) { return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames); } private <VR> KTable<K, VR> transformValues(final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerWithKeySupplier, final String... stateStoreNames) { final String name = builder.newProcessorName(TRANSFORMVALUES_NAME); final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>(this, transformerWithKeySupplier); builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); if (stateStoreNames != null && stateStoreNames.length > 0) { builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames); } return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, queryableStoreName, isQueryable); } }
The new KTableTransformValues
processor supplier will be implemented as:
The ForwardingDisabledProcessorContext
class passed to the init
method of the valueTransformer
has been extracted from the KStreamTransformValuesProcessor
class, and throws a StreamException
should the transformer implementation throw an exception.
Compatibility, Deprecation, and Migration Plan
Users must upgrade to new version if they want to use this functionality.
Test Plan
Unit tests to cover new classes and methods. Integration or system test are not required.
Rejected Alternatives
None.