Status
Current state: Voting in progress
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-478, a strongly-typed Processor API, brings the option to reconsider the abstractions for custom processing in the Kafka Streams DSL.
Currently, multiple Transformers
and a final Processor
operation are the options available to implement custom processing with access to Record context; including record metadata (incl. topic, partition, offset), timestamp, and headers; and reference to state stores.
There have been discussions on how to refactor these APIs to minimize/simplify into a single processor:
- https://issues.apache.org/jira/browse/KAFKA-8396
- https://issues.apache.org/jira/browse/KAFKA-8410
- https://issues.apache.org/jira/browse/KAFKA-10603
Transformers
are currently limited to the old ProcessorContext
. With the addition of the new Processor
API, KStream
can be extended to offer access to the new, chainable API that can replace most of the operations transformers offer at the moment, with more open control to forward records.
This KIP is aimed to extend the current `KStream#process` API to return output values that could be chained across the topology, as well as introduce a new KStream#processValues
to use processor while validating keys haven't changed and repartition is not required.
This KIP could be considered as a step towards deprecating Transforms
, though that should be discussed in a follow-up KIP.
Public Interfaces
New methods
KStream<K,VOut> KStream#processValues(ProcessorSupplier<K, V, K, VOut> processorSupplier, String... stateStoreNames)
KStream<K,VOut> KStream#processValues(ProcessorSupplier<K, V, Void, VOut> processorSupplier, Named named, String... stateStoreNames)
Modified methods
KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut> processorSupplier, String... stateStoreNames)
from void KStream#process(
ProcessorSupplier<K, V, Void, Void> processorSupplier
, ...)
KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut> processorSupplier, Named named, String... stateStoreNames)
from void KStream#process(
ProcessorSupplier<K, V, Void, Void> processorSupplier
, ...)
Deprecated methods
KStream#transform
KStream#transformValues
KStream#flatTransform
KStream#flatTransformValues
Proposed Changes
New Processor
APIs replacing most Transformer
s
With the ability to manage forward
calls as part of the Processor
itself, flatTransform
can be replaced by a Processor
:
private Topology topology() { final var builder = new StreamsBuilder(); final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String())); input.processValues(() -> new ContextualProcessor<String, String, String, String>() { @Override public void process(Record<String, String> record) { for (var s : record.value().split(",")) { context().forward(record.withValue("Hello " + s)); } } }, Named.as("test")) .to("output", Produced.with(Serdes.String(), Serdes.String())); return builder.build(); }
New ValueProcessorContext
to enforce only Value changes
Processors that require enforcing the same key to avoid repartitioning at the moment trust internal implementations. As a result, the KStream API added *Values
methods to signal when the key has not changed, and repartitioning can be avoided.
At the moment record keys can be changed, or record recreated, between the record entering Processor#process(Record)
and ProcessorContext#forward(Record)
.
As an alternative to enforcing the same key, ProcessorContext wrapper could be implemented to define an initial key:
class ValueProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward> { //... private KForward key; public void setRecordKey(KForward initialKey) { this.key = initialKey; } public void clearRecordKey() { this.key = null; } @Override public <K extends KForward, V extends VForward> void forward(Record<K, V> record) { delegate.forward(record.withKey(key)); } //... }
This ValueProcessorContext
is to be used internally at the Processor
level, and the method setRecordKey(K)
must be used before calling user-defined Processor
.
This API is not meant to be a public interface. Only used internally by the processValues
DSL operation to enforce at runtime that the key hasn't changed.
class KStreamValueProcessorSupplier<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> { // get ... static class KStreamValueProcessor<KIn, VIn, VOut> extends ContextualProcessor<KIn, VIn, KIn, VOut> { private final Processor<KIn, VIn, KIn, VOut> processor; private ValueProcessorContext<KIn, VOut> processorContext; public KStreamValueProcessor(Processor<KIn, VIn, KIn, VOut> transformer) { this.processor = transformer; } @Override public void init(final ProcessorContext<KIn, VOut> context) { super.init(context); this.processorContext = new ValueProcessorContext<>(context); processor.init(processorContext); } @Override public void process(final Record<KIn, VIn> record) { processorContext.setRecordKey(record.key()); processor.process(record); processorContext.clearRecordKey(); } } }
Based on the previous example, the key will have to be changed to void to prove that it hasn't changed during the processor:
private Topology topology() { final var builder = new StreamsBuilder(); final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String())); input.processValues(() -> new ContextualProcessor<String, String, String, String>() { @Override public void process(Record<String, String> record) { for (var s : record.value().split(",")) { context().forward(record.withKey((Void) null).withValue("Hello " + s)); } } }, Named.as("test")) .to("output", Produced.with(Serdes.String(), Serdes.String())); return builder.build(); }
NOTE: The key validation can be defined when processing the message. Though, with punctuations, it won't be possible to define the key for validation before forwarding, therefore it won't be possible to forward from punctuation. This is similar behavior to how ValueTransformer
s behave at the moment.
Compatibility, Deprecation, and Migration Plan
Modified operation KStream#process
Modifications to method KStream#process
are source compatible with the previous version, though not binary compatible. Therefore will require users to recompile their applications with the latest version. Though users will not require to change their code as the current returned value is void, and the input processor supplier types will include Void
that is the current type for output key and value.
Transform
methods deprecation
Transform
APIs won't be deprecated as part of this API, but transform
* operations are. This and Transform
API use of old ProcessorContext
should warn users and drive adoption of the methods proposed in this KIP.
Even though, the new Processor
API could potentially replace most Transformer
implementations, it is not yet possible to deprecate Transformer
APIs:
Transform
API is broadly adopted for custom processing. Even though most functionality is possible to implement with newProcessor
, this migration is not straightforward and will require reimplementing custom processors.KTable#transformValues
is an interesting use-case forValueTransformerWithKey
as the internal value isChange<V>
— an internal type to handle record changes on KTable — not exposed as public API. An approach would be to deprecate this method in favor of.toStream().processValues().toTable()
.
A new KIP should be proposed to continue the deprecation of Transformer
APIs.
Rejected Alternatives
Create new Transform APIs with the latest ProcessContext
This would involve extending KStream/KTable APIs even more with specific Transform variants. If a new Processor is able to support most Transform
in the long-term becoming the adopted option to transform values, flat transform, etc.; then we should start getting the new Processor
properly adopted in the DSL and let the usage drive the next steps.
Runtime key validation on KStream#processValues
On one hand, with Void type for key output, we force the users to cast to Void and change the key to null, though this can be documented on the API, so the users are aware of the peculiarity of forwarding within processValues
.
On the other hand, keeping the key type as output doesn't require to do any change of keys, but this could lead to key-checking runtime exceptions.
This KIP proposes the first option and changes the type to Void
. This will impose a bit of pain on the users to gain some type-safety and avoid runtime exceptions. We can justify this requirement as a way to prove that the key hasn't changed.
References
- Cleanup old Processor work: https://issues.apache.org/jira/browse/KAFKA-12939
- Reference implementation: https://github.com/jeqo/kafka/tree/kstream-new-process