Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Motivation

KIP-478, a strongly-typed Processor API, brings the option to reconsider the abstractions for custom processing in the Kafka Streams DSL.

Currently, multiple Transformers and a final Processor operation are the options available to implement custom processing with access to Record context; including record metadata (incl. topic, partition, offset), timestamp, and headers; and reference to state stores.

...

This KIP is aimed to extend the current `KStream#process` API to return output values that could be chained across the topology, as well as introducing introduce a new KStream#processValues  to use processor while validating keys haven't change changed and repartition is not required.

This KIP could be considered as a step towards deprecating Transforms, though that should be discussed in a follow-up KIP.

Public Interfaces

New methods

...

  • KStream<K,VOut> KStream#processValues(ProcessorSupplier<K, V, K, VOut> processorSupplier, String... stateStoreNames)
  • KStream<K,VOut> KStream#processValues(ProcessorSupplier<K, V, KVoid, VOut> processorSupplier, Named named, String... stateStoreNames)

Modified methods

...

  • KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut> processorSupplier, String... stateStoreNames)
    • from void KStream#process(ProcessorSupplier<K, V, Void, Void> processorSupplier, ...)
  • KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut> processorSupplier, Named named, String... stateStoreNames)
    • from void KStream#process(ProcessorSupplier<K, V, Void, Void> processorSupplier, ...)

Deprecated methods

...

  • KStream#transform 
  • KStream#transformValues 
  • KStream#flatTransform 
  • KStream#flatTransformValues 

...

Processors that require enforcing the same key to avoid repartitioning at the moment trust internal implementations. As a result, the KStream API added *Values methods to signal when the key is has not changed, and repartitioning can be avoided.

At the moment record keys can be changed, or records record recreated, between the record entering Processor#process(Record) and ProcessorContext#forward(Record).

As an alternative to enforce enforcing the same key, ProcessorContext wrapper could be implemented to define an initial key:

...

Code Block
languagejava
class ValueProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward> {
//...
    private KForward key;
    
    public void setRecordKey(KForward initialKey) {
        this.key = initialKey;
    }
    
    public void clearRecordKey() {
        this.key = null;
    }

    @Override
    public <K extends KForward, V extends VForward> void forward(Record<K, V> record) {
        if (key != null) {
            if  delegate.forward(record.keywithKey() != key) {  // referencial equility is enough to enforce that key is not updated
                 throw new IllegalArgumentException("Key has changed while processing the record and requires enforcing repartitioning.");
             }
        }
        delegate.forward(record);
    }
//...
}

This ValueProcessorContext could is to be used internally at the Processor level, and the method setRecordKey(K) must be used before calling user-defined Processor.

This API is not meant to be a public interface. Only used internally by the processValues DSL operation to enforce at runtime that the key hasn't changed.


Code Block
class KStreamValueProcessorSupplier<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
// get ...

    static class KStreamValueProcessor<KIn, VIn, VOut> extends ContextualProcessor<KIn, VIn, KIn, VOut> {

        private final Processor<KIn, VIn, KIn, VOut> processor;

        private ValueProcessorContext<KIn, VOut> processorContext;

        public KStreamValueProcessor(Processor<KIn, VIn, KIn, VOut> transformer) {
            this.processor = transformer;
        }

        @Override
        public void init(final ProcessorContext<KIn, VOut> context) {
            super.init(context);
            this.processorContext = new ValueProcessorContext<>(context);
            processor.init(processorContext);
        }

        @Override
        public void process(final Record<KIn, VIn> record) {
            processorContext.setRecordKey(record.key());
            processor.process(record);
            processorContext.clearRecordKey();
        }
    }

}

...

Based on the previous example, if key is changed then processing will failthe key will have to be changed to void to prove that it hasn't changed during the processor:

Code Block
  private Topology topology() {
    final var builder = new StreamsBuilder();
    final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()));
    input.processValues(() -> new ContextualProcessor<String, String, String, String>() {
      @Override
      public void process(Record<String, String> record) {
        for (var s : record.value().split(",")) {
          context().forward(record.withKey("FAIL"(Void) null).withValue("Hello " + s));
        }
      }
    }, Named.as("test"))
    .to("output", Produced.with(Serdes.String(), Serdes.String()));
    return builder.build();
  } 
Code Block
 12:08:19.125 [new-process-c2e58cba-3c47-41a9-b221-adea4cd38c87-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [new-process-c2e58cba-3c47-41a9-b221-adea4cd38c87-StreamThread-1] Failed to process stream task 0_0 due to the following error: 
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=words, partition=0, offset=0, stacktrace=java.lang.IllegalArgumentException: Key has changed while processing the record and requires enforcing repartitioning.


NOTE: The key validation can be defined when processing the message. Though, with punctuations, it won't be possible to define the key for validation before forwarding, therefore it won't be possible to forward from punctuation. This is similar behavior to how ValueTransformers behave at the moment.

Compatibility, Deprecation, and Migration Plan

Modified operation KStream#process 

Modifications to method KStream#processare source compatible with the previous version, though not binary compatible. Therefore will require users to recompile their applications with the latest version. Though users will not require to change their code as the current returned value is void, and the input processor supplier types will include Void  that is the current type for output key and value.

Transform  methods deprecation

Transform APIs won't be deprecated as part of this API, but transform* operations are. This and Transform API use of oldProcessorContextshould warn users and drive adoption of the methods proposed in this KIP.

...

A new KIP should be proposed to continue the deprecation of Transformer  APIs.

Rejected Alternatives

Create new Transform APIs with the latest ProcessContext

This would involve extending KStream/KTable APIs even more with specific Transform variants. If a new Processor is able to support most Transform in the long-term becoming the adopted option to transform values, flat transform, etc.; then we should start getting the new Processor properly adopted in the DSL and let the usage drive the next steps.

Runtime key validation on KStream#processValues 

On one hand, with Void type for key output, we force the users to cast to Void and change the key to null, though this can be documented on the API, so the users are aware of the peculiarity of forwarding within processValues. 

On the other hand, keeping the key type as output doesn't require to do any change of keys, but this could lead to key-checking runtime exceptions.

This KIP proposes the first option and changes the type to Void. This will impose a bit of pain on the users to gain some type-safety and avoid runtime exceptions. We can justify this requirement as a way to prove that the key hasn't changed.

References