Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-478, 

...

the new strongly-typed Processor API, brings the option to reconsider the abstractions

...

around custom processing in the Kafka Streams DSL.

Currently, multiple Transformers and a final Processor operation are the options available to implement custom processing with access to the Record context; including record metadata (

...

e.g. topic, partition, offset), timestamp

...

and headers; and reference to state stores.

There have been discussions on how to refactor these APIs

...

:

Transformers are currently limited to the old ProcessorContext, and there have been multiple extensions to support value processing without repartitioning, and one-to-many record processing. With the addition of the new Processor API, KStream can be extended to

...

access

...

the new, chainable API

...

and effectively replace most of the operations

...

 *Transforms offer at the moment, with more open control to forward records

...

.

This KIP could be considered as a step towards deprecating Transforms, though that should be discussed in a follow-up KIP.

Public Interfaces

...

Modified methods

Processors without forcing repartitioning:

...

  • KStream<KOut,VOut>

...

  • KStream#process(ProcessorSupplier<K, V,

...

  • KOut, VOut> processorSupplier, String... stateStoreNames)

      ...

        • from void KStream#process(ProcessorSupplier<K, V,

      ...

        •  Void, Void> processorSupplier,

      ...

        •  ...

      ...

        • )

      ...

      • KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut> processorSupplier, Named named, String... stateStoreNames)
        • from void KStream#process(ProcessorSupplier<K, V, Void, Void> processorSupplier, ...)

      New methods

      Processors without forcing repartitioning:

      • KStream<KKStream<KOut,VOut> KStream#processKStream#processValues(ProcessorSupplier<KFixedKeyProcessorSupplier<K, V, KOutK, VOut> processorSupplier, Named named, String... stateStoreNames)from void KStream#process
      • (ProcessorSupplier<K, V, Void, Void> processorSupplier, KStream<K,VOut> KStream#processValues(FixedKeyProcessorSupplier<K, V, K, VOut> processorSupplier, Named named, String... stateStoreNames)

      Internal changes

      • Infrastructure for Fixed Key Records:
        • FixedKeyRecord:
          • Not a Record sub/superclass to casting to Record.
          • Private constructor to avoid reconstructing record and change key.
        • FixedKeyProcessor(Supplier) and FixedKeyProcessorContext interfaces for FixedKeyRecord.
          • FixedKeyContextualProcessor abstract class, similar to ContextualProcessor.

      Deprecated methods

      • KStream#transform

      ...

      • KStream#flatTransform
      • KStream#transformValues

      ...

      • KStream#flatTransformValues

      ...

      Proposed Changes

      ...

      KStream#process and 

      ...

      KStream#processValues 

      ...

      to replacing most Transformers

      With the ability to manage  forward  calls as part of the  Processor  itself, transform, valueTransform,  flatTransform , and flatValueTransform can be replaced by a  Processor process/processValues:

      Code Block
      languagejava
        private Topology topology() {
          final var builder = new StreamsBuilder();
          final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()));
      
               input.processValues(() -> new ContextualProcessor<String, StringFixedKeyContextualProcessor<String, String, String>() {
            @Override
            public void process(Record<String, String> record) { @Override
              for (var s : record.value().split(",")) {
                context().forward(record.withValue("Hello " + s));
              }
            }
          }, Named.as("test")) // doesn't flag repartitioning 
      
          input.process(() -> new ContextualProcessor<String, String, Integer, String>() {
            @Override
            public void process(Record<String, String> record) {
       public void process(FixedKeyRecord<String, String> record) {
                      for (final var sword : record.value().split(",")) {
                context().forward(record.withKey(record.value().lenght()).withValue("Hello " + s));
              }
            }
          }, Named.as("test")) // flags repartitioning
      
          .to("output", Produced.with(Serdes.String(), Serdes.String()));
          return builder.build();
        } 

      New ValueProcessorContext to enforce only Value changes

      Processors that require enforcing the same key to avoid repartitioning at the moment trust internal implementations. As a result, the KStream API added *Values methods to signal when the key has not changed, and repartitioning can be avoided.

      At the moment record keys can be changed, or record recreated, between the record entering Processor#process(Record) and ProcessorContext#forward(Record).

      As an alternative to enforcing the same key, ProcessorContext wrapper could be implemented to define an initial key:

      Code Block
      languagejava
      class ValueProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward> {
      //...
          private KForward key;
          
          public void setRecordKey(KForward initialKey) {
       context().forward(record.withValue("Hello " + word));
                this.key = initialKey;
          }
          
          public void clearRecordKey() {
              this.key = null;
          }
      
          @Override
          public <K extends KForward, V extends VForward> void forward(Record<K, V> record) {
              if (key != null) {
                  if (record.key() != key) {  // referencial equility is enough to enforce that key is not updated
                       throw new IllegalArgumentException("Key has changed while processing the record and requires enforcing repartitioning.");
                   }
       }}, Named.as("process-values-without-repartitioning"))
             }
              delegate.forward(record.withKey(key));
           }
      //...
      }

      ...

      NOTE: This API is not meant to be a public interface. Only used internally by the processValues DSL operation to enforce at runtime that the key hasn't changed.

      Code Block
      class KStreamValueProcessorSupplier<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
      // get ...
      
          static class KStreamValueProcessor<KIn, VIn, VOut> extends ContextualProcessor<KIn, VIn, KIn, VOut> {
      
              private final Processor<KIn, VIn, KIn, VOut> processor;
      
              private ValueProcessorContext<KIn, VOut> processorContext;
      
              public KStreamValueProcessor(Processor<KIn, VIn, KIn, VOut> transformer) {
          .process(() -> new ContextualProcessor<String, String, String, String>() {
              this.processor = transformer;
              }
      
              @Override
              public void init(final ProcessorContext<KIn, VOut> context) {
                  super.init(context);
                  this.processorContext = new ValueProcessorContext<>(context);
                  processor.init(processorContext);
              }
      
              @Override
              public void process(final Record<KInRecord<String, VIn>String> record) {
                  processorContext.setRecordKey(record.key());
                 for processor.process(record);
                  processorContext.clearRecordKey();
              }
          }
      
      }

      Based on the previous example, if the key changes a runtime exception will be thrown:

      Code Block
        private Topology topology() {
          final var builderword = new StreamsBuilder();
          final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()));
          input.processValues(() -> new ContextualProcessor<String, String, String, String>() {
            @Override
            public void process(Record<String, String> record) {
         : record.value().split(",")) {
           for (var s : record.value().split(",")) {
                context().forward(record.withKey("FAIL!"word).withValue("Hello " + sword)); // will throw a Runtime Exception because key has changed on `processValues`
      
                      }
              }
          }}, Named.as("testprocess-with-partitioning"))
      
              .to("output", Produced.with(Serdes.String(), Serdes.String()));
          return builder.build();
        } 

      NOTE: The key validation can be defined when processing the message. Though, with punctuations, it won't be possible to define the key for validation before forwarding, therefore it won't be possible to forward from punctuation. This is similar behavior to how ValueTransformers behave at the moment.


      Compatibility, Deprecation, and Migration Plan

      Modified operation KStream#process 

      ...

      KStreams#process Return type change

      Changing return type from void to KStream<KOut, VOut> is source-compatible, but not binary-compatible. It will require users to recompile

      ...

      the application to use the latest version of the library.

      Though users will not require to change their code as the current returned value is void, and the input processor supplier types will include Void  that is the current type for output key and value.

      Transform  methods deprecation

      ...

      KStream#*transform* deprecations

      This KIP is including the deprecation of the transform operations on KStream to propose using the latest Processor API operations.

      Transform API is not marked as deprecated yet, as it requires additional considerations

      ...

      :

      • Transform API is broadly adopted for custom processing. Even though most functionality is possible to implement with new Processor

      ...

      • , this migration is not straightforward

      ...

      • .
      • KTable#transformValues is an interesting use-case for ValueTransformerWithKey as the internal value is Change<V> — an internal type to handle record changes on KTable — not exposed as public API. An approach would be to deprecate this method in favor of .toStream().processValues().toTable().

      A new KIP should be proposed to continue the deprecation of Transformer  APIs.

      Rejected Alternatives

      ...

      Migrate Transform APIs

      ...

      to the latest ProcessContext

      This would involve extending KStream/KTable APIs even more with specific Transform variants. If a new Processor can support most Transform in the long-term becoming the adopted option to transform values, flat transform, etc.; then we should start getting the new Processor properly adopted in the DSL and let the usage drive the next steps.

      Runtime key validation on KStream#processValues

      ...

      and changing key output type to Void

      ...


      On one hand, with Void type for key output, we force the users to cast to Void and change the key to null, though this can be documented on the API, so the users are aware of the peculiarity of forwarding within processValues. 

      On the other hand, keeping the key type as output doesn't require to do any change of keys, but this could lead to key-checking runtime exceptions when changing keys.

      This KIP proposes the second option and does runtime key validation to avoid allocation of two Record with withKey() (one for setting to Void null, and another for setting the previous key). Users will need to be aware that using processValues  will lead to runtime exceptions when changing the keys when not supposed to.

      Adding a ValueRecord type for processValues  to avoid key change with Immutability

      Another alternative to deal with key verification is to introduce a new type with an immutable key. This could remove some of the challenges but the record (ValueRecord in this case) would still be possible to recreate.

      Adding another API will also increase the complexity of the processing layer.

      adding a new Record type for Fixed Keys, to remove the need for runtime checked and reduce as much as possible the ability to change the key when implementing the processor.

      References