Table of Contents |
---|
Status
Current state: Voting in progressImplemented and merged
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-478, a the new strongly-typed Processor API, brings the option to reconsider the abstractions for around custom processing in the Kafka Streams DSL.
Currently, multiple Transformers
and a final Processor
operation are the options available to implement custom processing with access to the Record context; including record metadata (incle.g. topic, partition, offset), timestamp, and headers; and reference to state stores.
There have been discussions on how to refactor these APIs to minimize/simplify into a single processor:
- https://issues.apache.org/jira/browse/KAFKA-8396
- https://issues.apache.org/jira/browse/KAFKA-8410
- https://issues.apache.org/jira/browse/KAFKA-10603
Transformers
are currently limited to the old ProcessorContext
, and there have been multiple extensions to support value processing without repartitioningand one-to-many record processing. With the addition of the new Processor
API, KStream
can be extended to offer access to the newnewer, chainable API that can typed API and effectively replace most of the operations transformers *Transforms
offer at the moment, with more open control to forward records.
This KIP is aimed to extend the current `KStream#process` API to return output values that could be chained across the topology, as well as introduce a new KStream#processValues
to use processor while validating keys haven't changed and repartition is not required.This KIP could be considered as a step towards deprecating Transforms
, though that should be discussed in a follow-up KIP.
Public Interfaces
...
Modified methods
Processors without forcing repartitioning:
KStream<KKStream<KOut,VOut> KStream#processValuesKStream#process(ProcessorSupplier<K, V, KKOut, VOut> processorSupplier, String... stateStoreNames)
from void KStream#process(
ProcessorSupplier<K, V,
Void, Void> processorSupplier
,
...
)
Modified methods
KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut> processorSupplier, Named named, String... stateStoreNames)
from void KStream#process(
ProcessorSupplier<K, V, Void, Void> processorSupplier
, ...)
New methods
Processors without forcing repartitioning:
KStream<KKStream<KOut,VOut> KStream#processKStream#processValues(ProcessorSupplier<KFixedKeyProcessorSupplier<K, V, KOutK, VOut> processorSupplier, Named named, String... stateStoreNames)
from void KStream#process(ProcessorSupplier<K, V, Void, Void> processorSupplier
,KStream<K,VOut> KStream#processValues(FixedKeyProcessorSupplier<K, V, K, VOut> processorSupplier, Named named, String... stateStoreNames)
Deprecated methods
KStream#transform
KStream#transformValues
KStream#flatTransform
KStream#flatTransformValues
Proposed Changes
...
Internal changes
- Infrastructure for Fixed Key Records:
FixedKeyRecord
:- Not a
Record
sub/superclass to casting toRecord
. - Private constructor to avoid reconstructing record and change key.
- Not a
FixedKeyProcessor(Supplier)
andFixedKeyProcessorContext
interfaces forFixedKeyRecord
.FixedKeyContextualProcessor
abstract class, similar toContextualProcessor
.
Deprecated methods
All transform operations on the KStream will be deprecated in favor of process
and processValues
operations:
KStream#transform
KStream#flatTransform
KStream#transformValues
KStream#flatTransformValues
Proposed Changes
KStream#process
and KStream#processValues
to replacing most Transformer
s
With the ability to manage forward
calls as part of the Processor
itself, transform
, valueTransform
, flatTransform
, and flatValueTransform
can be replaced by a Processor
process
/processValues
:
Code Block | ||
---|---|---|
| ||
private Topology topology() { final var builder = new StreamsBuilder(); final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String())); input.processValues(() -> new ContextualProcessor<String, StringFixedKeyContextualProcessor<String, String, String>() { @Override public void process(Record<StringFixedKeyRecord<String, String> record) { for (final var sword : record.value().split(",")) { context().forward(record.withValue("Hello " + sword)); } } }}, Named.as("testprocess-values-without-repartitioning")) // doesn't flag repartitioning input.process(() -> new ContextualProcessor<String, String, IntegerString, String>() { @Override public void process(Record<String, String> record) { for (final var sword : record.value().split(",")) { context().forward(record.withKey(record.value().lenght()word).withValue("Hello " + sword)); } } }}, Named.as("testprocess-with-partitioning")) // flags repartitioning .to("output", Produced.with(Serdes.String(), Serdes.String())); return builder.build(); } |
New ValueProcessorContext
to enforce only Value changes
Processors that require enforcing the same key to avoid repartitioning at the moment trust internal implementations. As a result, the KStream API added *Values
methods to signal when the key has not changed, and repartitioning can be avoided.
At the moment record keys can be changed, or record recreated, between the record entering Processor#process(Record)
and ProcessorContext#forward(Record)
.
As an alternative to enforcing the same key, ProcessorContext wrapper could be implemented to define an initial key:
Infrastructure for Fixed Key Records
FixedKeyRecord
Record with immutable key.
Code Block | ||
---|---|---|
public final class FixedKeyRecord<K, V> {
| ||
Code Block | ||
| ||
class ValueProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward> { //... private final KForwardK key; private final V value; public void setRecordKey(KForward initialKey)private { final long timestamp; private final this.key = initialKeyHeaders headers; } FixedKeyRecord(final K key, final V value, final long timestamp, publicfinal voidHeaders clearRecordKey(headers) { this.key = nullkey; } @Override this.value = value; public <K extends KForward, V extends VForward> voidif forward(Record<K,timestamp V>< record0) { if (key != null) { throw new StreamsException( if (record.key() != key) { // referencial equility is enough to enforce that key is not updated "Malformed Record", new IllegalArgumentException("Timestamp may not be negative. Got: " + timestamp) throw new IllegalArgumentException("Key has changed while processing the record and requires enforcing repartitioning."); } } this.timestamp = timestamp; } delegate.forward(record.withKey(key)this.headers = new RecordHeaders(headers); } //... } |
...
NOTE: This API is not meant to be a public interface. Only used internally by the processValues
DSL operation to enforce at runtime that the key hasn't changed.
Code Block |
---|
class KStreamValueProcessorSupplier<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> { // get ... public K key() { return key; } public V value() { return value; } staticpublic class KStreamValueProcessor<KIn, VIn, VOut> extends ContextualProcessor<KIn, VIn, KIn, VOut> { private final Processor<KIn, VIn, KIn, VOut> processor; private ValueProcessorContext<KIn, VOut> processorContext;long timestamp() { return timestamp; } public Headers headers() { return headers; } public <NewV> FixedKeyRecord<K, NewV> withValue(final NewV value) { return new FixedKeyRecord<>(key, value, timestamp, headers); } public FixedKeyRecord<K, V> withTimestamp(final long timestamp) { return publicnew KStreamValueProcessorFixedKeyRecord<>(Processor<KInkey, VInvalue, KIntimestamp, VOut> transformerheaders); {} public FixedKeyRecord<K, V> withHeaders(final Headers headers) { return this.processor = transformer; } new FixedKeyRecord<>(key, value, timestamp, headers); } } |
FixedKeyProcessorSupplier
Code Block |
---|
@FunctionalInterface public interface FixedKeyProcessorSupplier<KIn, VIn, VOut> extends ConnectedStoreProvider, Supplier<FixedKeyProcessor<KIn, VIn, VOut>> { FixedKeyProcessor<KIn, VIn, @Override publicVOut> get(); } |
FixedKeyProcessor
Code Block |
---|
public interface FixedKeyProcessor<KIn, VIn, VOut> { default void init(final ProcessorContext<KInFixedKeyProcessorContext<KIn, VOut> context) {} void process(FixedKeyRecord<KIn, super.init(contextVIn> record); default void close() {} } |
FixedKeyContextualProcessor
Helper, same as ContextualProcessor
.
Code Block |
---|
public abstract class FixedKeyContextualProcessor<KIn, VIn, this.processorContextVOut> =implements new ValueProcessorContext<>(context); FixedKeyProcessor<KIn, VIn, VOut> { private FixedKeyProcessorContext<KIn, processor.init(processorContext);VOut> context; protected FixedKeyContextualProcessor() {} @Override public void processinit(final Record<KInFixedKeyProcessorContext<KIn, VIn>VOut> recordcontext) { this.context processorContext.setRecordKey(record.key())= context; } protected final FixedKeyProcessorContext<KIn, VOut> processor.processcontext(record); { processorContext.clearRecordKey()return context; } } } |
ProcessingContext
To be extended by FixedKeyProcessorContext
and ProcessorContext
Based on the previous example, if the key changes a runtime exception will be thrown :
Code Block |
---|
interface ProcessingContext { private TopologyString topologyapplicationId(); { final var builder = new StreamsBuilderTaskId taskId(); Optional<RecordMetadata> recordMetadata(); final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()));Serde<?> keySerde(); Serde<?> valueSerde(); input.processValues(File stateDir(); -> new ContextualProcessor<String, String, String, String>StreamsMetrics metrics(); { <S extends StateStore> @Override S getStateStore(final String name); public voidCancellable process(Record<String, String> record) { schedule(final Duration interval, for (var s : record.value().split(",")) { context().forward(record.withKey("FAIL!").withValue("Hello " + s)); // will throw a Runtime Exception because key has changed on `processValues` final PunctuationType type, } final Punctuator }callback); }, Named.as("test"))void commit(); .to("output", Produced.with(Serdes.String(), Serdes.String()));Map<String, Object> appConfigsWithPrefix(final String prefix); } |
FixedKeyProcessorContext
Code Block |
---|
public interface FixedKeyProcessorContext<KForward, VForward> extends ProcessingContext { return builder.build(); } |
...
<K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> record);
<K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> record, final String childName);
} |
Compatibility, Deprecation, and Migration Plan
Modified operation KStream#process
KStreams#process
Return type change
Changing return type from void
to KStream<KOut, VOut>
is source-compatible, but not binary-compatible. It Modifications to method KStream#process
are source compatible with the previous version, though not binary compatible. Therefore will require users to recompile their applications with the application to use the latest version of the library.
Though users will not require to change their code as the current returned value is void, and the input processor supplier types will include Void
that is the current type for output key and value.
Transform
methods deprecation
KStream#*transform*
deprecations
This KIP is including the deprecation of the transform
operations on KStream
to propose using the latest Processor API operations.
Transform
API is not marked as deprecated yet, as it requires additional considerationsTransform
APIs won't be deprecated as part of this API, but transform
* operations are. This and Transform
API use of oldProcessorContext
should warn users and drive adoption of the methods proposed in this KIP.Even though, the new Processor
API could potentially replace most Transformer
implementations, it is not yet possible to deprecate Transformer
APIs:
Transform
API is broadly adopted for custom processing. Even though most functionality is possible to implement with newProcessor
API, this migration is not straightforward and will require reimplementing custom processors.KTable#transformValues
is an interesting use-case forValueTransformerWithKey
as the internal value isChange<V>
— an internal type to handle record changes on KTable — not exposed as public API. An approach would be to deprecate this method in favor of.toStream().processValues().toTable()
.
A new KIP should be proposed to continue the deprecation of Transformer
APIs.
Rejected Alternatives
...
Migrate Transform APIs
...
to the latest ProcessContext
This would involve extending KStream/KTable APIs even more with specific Transform variants. If a new Processor can support most Transform
in the long-term becoming the adopted option to transform values, flat transform, etc.; then we should start getting the new Processor
properly adopted in the DSL and let the usage drive the next steps.
Runtime key validation on KStream#processValues
...
and changing key output type to Void
...
On one hand, with Void type for key output, we force the users to cast to Void and change the key to null, though this can be documented on the API, so the users are aware of the peculiarity of forwarding within processValues
.
On the other hand, keeping the key type as output doesn't require to do any change of keys, but this could lead to key-checking runtime exceptions when changing keys.
This KIP proposes the second option and does runtime key validation to avoid allocation of two Record with withKey()
(one for setting to Void null, and another for setting the previous key). Users will need to be aware that using processValues
will lead to runtime exceptions when changing the keys when not supposed to.
Adding a ValueRecord type for processValues
to avoid key change with Immutability
Another alternative to deal with key verification is to introduce a new type with an immutable key. This could remove some of the challenges but the record (ValueRecord in this case) would still be possible to recreate.
Adding another API will also increase the complexity of the processing layer.
adding a new Record type for Fixed Keys, to remove the need for runtime checked and reduce as much as possible the ability to change the key when implementing the processor.
References
- Cleanup old Processor work: https://issues.apache.org/jira/browse/KAFKA-12939
- Reference implementation: https://github.com/jeqo/kafka/tree/kstream-new-process
- Draft implementation of fixed record key: https://github.com/apache/kafka/pull/11854