...
Public Interfaces
New methods
Processors without forcing repartitioning:
KStream<K,VOut> KStream#processValues(ProcessorSupplier<K, V, K, VOut> processorSupplier, String... stateStoreNames)
KStream<K,VOut> KStream#processValues(ProcessorSupplier<K, V, VoidK, VOut> processorSupplier, Named named, String... stateStoreNames)
...
Code Block | ||
---|---|---|
| ||
private Topology topology() { final var builder = new StreamsBuilder(); final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String())); input.processValues(() -> new ContextualProcessor<String, String, String, String>() { @Override public void process(Record<String, String> record) { for (var s : record.value().split(",")) { context().forward(record.withValue("Hello " + s)); } } }, Named.as("test")) // doesn't flag repartitioning input.process(() -> new ContextualProcessor<String, String, Integer, String>() { @Override public void process(Record<String, String> record) { for (var s : record.value().split(",")) { context().forward(record.withKey(record.value().lenght()).withValue("Hello " + s)); } } }, Named.as("test")) // flags repartitioning .to("output", Produced.with(Serdes.String(), Serdes.String())); return builder.build(); } |
...
Code Block | ||
---|---|---|
| ||
class ValueProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward> { //... private KForward key; public void setRecordKey(KForward initialKey) { this.key = initialKey; } public void clearRecordKey() { this.key = null; } @Override public <K extends KForward, V extends VForward> void forward(Record<K, V> record) { delegateif (key != null) { if (record.key() != key) { // referencial equility is enough to enforce that key is not updated throw new IllegalArgumentException("Key has changed while processing the record and requires enforcing repartitioning."); } } delegate.forward(record.withKey(key)); } //... } |
...
This ValueProcessorContext
is to be used internally at the Processor
level, and the method setRecordKey(K)
must be used before calling user-defined Processor
, and the clearRecordKey()
right after.
NOTE: This API is not meant to be a public interface. Only used internally by the processValues
DSL operation to enforce at runtime that the key hasn't changed.
...
Based on the previous example, if the key changes a runtime exception will have to be changed to void to prove that it hasn't changed during the processorbe thrown:
Code Block |
---|
private Topology topology() { final var builder = new StreamsBuilder(); final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String())); input.processValues(() -> new ContextualProcessor<String, String, String, String>() { @Override public void process(Record<String, String> record) { for (var s : record.value().split(",")) { context().forward(record.withKey((Void) null)"FAIL!").withValue("Hello " + s)); // will throw a Runtime Exception because key has changed on `processValues` } } }, Named.as("test")) .to("output", Produced.with(Serdes.String(), Serdes.String())); return builder.build(); } |
...
Transform
API is broadly adopted for custom processing. Even though most functionality is possible to implement with newProcessor
API, this migration is not straightforward and will require reimplementing custom processors.KTable#transformValues
is an interesting use-case forValueTransformerWithKey
as the internal value isChange<V>
— an internal type to handle record changes on KTable — not exposed as public API. An approach would be to deprecate this method in favor of.toStream().processValues().toTable()
.
...
This would involve extending KStream/KTable APIs even more with specific Transform variants. If a new Processor is able to can support most Transform
in the long-term becoming the adopted option to transform values, flat transform, etc.; then we should start getting the new Processor
properly adopted in the DSL and let the usage drive the next steps.
Runtime key validation on KStream#processValues
...
vs. key output Void type
On one hand, with Void type for key output, we force the users to cast to Void and change the key to null, though this can be documented on the API, so the users are aware of the peculiarity of forwarding within processValues
.
On the other hand, keeping the key type as output doesn't require to do any change of keys, but this could lead to key-checking runtime exceptions when changing keys.
This KIP proposes the
...
second option and does runtime key validation to avoid allocation of two Record with withKey()
(one for setting to Void null, and another for setting the previous key). Users will need to be aware that using processValues
will lead to runtime exceptions when changing the keys when not supposed to.
References
- Cleanup old Processor work: https://issues.apache.org/jira/browse/KAFKA-12939
- Reference implementation: https://github.com/jeqo/kafka/tree/kstream-new-process