Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Voting in progressImplemented and merged

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-478, a the new strongly-typed Processor API, brings the option to reconsider the abstractions for around custom processing in the Kafka Streams DSL.

Currently, multiple Transformers and a final Processor operation are the options available to implement custom processing with access to the Record context; including record metadata (incle.g. topic, partition, offset), timestamp, and headers; and reference to state stores.

There have been discussions on how to refactor these APIs to minimize/simplify into a single processor:

Transformers are currently limited to the old ProcessorContext, and there have been multiple extensions to support value processing without repartitioningand one-to-many record processing. With the addition of the new Processor API, KStream can be extended to offer access to the newnewer, chainable API that can typed API and effectively replace most of the operations transformers  *Transforms offer at the moment, with more open control to forward records.This KIP is aimed to extend the current `KStream#process` API to return output values that could be chained across the topology, as well as introduce a new KStream#processValues  to use processor while validating keys haven't changed and repartition is not required.

This KIP could be considered as a step towards deprecating Transforms, though that should be discussed in a follow-up KIP.

Public Interfaces

...

Modified methods

  • KStream<KKStream<KOut,VOut> KStream#processValuesKStream#process(ProcessorSupplier<K, V, KKOut, VOut> processorSupplier, String... stateStoreNames)
      KStream<K,VOut> KStream#processValues
      • from void KStream#process(ProcessorSupplier<K, V, Void,
      VOut>
      •  Void> processorSupplier,
      Named named, String
      •  ...
      stateStoreNames
      • )

    Modified methods

    • KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut> processorSupplier, Named named, String... stateStoreNames)
      • from void KStream#process(ProcessorSupplier<K, V, Void, Void> processorSupplier, ...)

    New methods

    Processors without forcing repartitioning:

    • KStream<KKStream<KOut,VOut> KStream#processKStream#processValues(ProcessorSupplier<KFixedKeyProcessorSupplier<K, V, KOutK, VOut> processorSupplier, Named named, String... stateStoreNames)
    • from void KStream#process(ProcessorSupplier<K, V, Void, Void> processorSupplier, ...)

    Deprecated methods

    • KStream#transform 
    • KStream#transformValues 
    • KStream#flatTransform 
    • KStream#flatTransformValues 

    Proposed Changes

    ...

    • KStream<K,VOut> KStream#processValues(FixedKeyProcessorSupplier<K, V, K, VOut> processorSupplier, Named named, String... stateStoreNames)

    Internal changes

    • Infrastructure for Fixed Key Records:
      • FixedKeyRecord:
        • Not a Record sub/superclass to casting to Record.
        • Private constructor to avoid reconstructing record and change key.
      • FixedKeyProcessor(Supplier) and FixedKeyProcessorContext interfaces for FixedKeyRecord.
        • FixedKeyContextualProcessor abstract class, similar to ContextualProcessor.

    Deprecated methods

    All transform operations on the KStream will be deprecated in favor of process  and processValues  operations:

    • KStream#transform
    • KStream#flatTransform
    • KStream#transformValues
    • KStream#flatTransformValues

    Proposed Changes

    KStream#process and KStream#processValues to replacing most Transformers

    With the ability to manage  forward  calls as part of the  Processor  itself, transform, valueTransform,  flatTransform , and flatValueTransform can be replaced by a  Processor process/processValues:

    Code Block
    languagejava
      private Topology topology() {
        final var builder = new StreamsBuilder();
        final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()));
            input.processValues.processValues(() -> new FixedKeyContextualProcessor<String, String, String>() {
                @Override
                public void process(FixedKeyRecord<String, String> record) {
                    for (final var word : record.value().split(",")) {
                        context().forward(record.withValue("Hello " + word));
                    }
                }}, Named.as("process-values-without-repartitioning"))
            .process(() -> new ContextualProcessor<String, String, String, String>() {
                @Override
                public void process(Record<String, String> record) {
                    for (final var sword : record.value().split(",")) {
                        context().forward(record.withKey(word).withValue("Hello " + sword));
                    }
            }
        }}, Named.as("testprocess-with-partitioning"))
    
            .to("output", Produced.with(Serdes.String(), Serdes.String()));
        return builder.build();
      } 

    New ValueProcessorContext to enforce only Value changes

    Processors that require enforcing the same key to avoid repartitioning at the moment trust internal implementations. As a result, the KStream API added *Values methods to signal when the key has not changed, and repartitioning can be avoided.

    At the moment record keys can be changed, or record recreated, between the record entering Processor#process(Record) and ProcessorContext#forward(Record).

    As an alternative to enforcing the same key, ProcessorContext wrapper could be implemented to define an initial key:

    Infrastructure for Fixed Key Records

    FixedKeyRecord

    Record with immutable key.

    Code Block
    public final class FixedKeyRecord<K, V> {
    
    Code Block
    languagejava
    class ValueProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward> {
    //...
        private KForwardfinal K key;
        
    private final V value;
     public void setRecordKey(KForward initialKey)private {
    final long timestamp;
        private final this.key = initialKey;
    Headers headers;
    
        FixedKeyRecord(final }
    K key, final V 
    value, final long timestamp, publicfinal voidHeaders clearRecordKey(headers) {
            this.key = nullkey;
        }
    
        @Override
    this.value =  value;
     public <K extends KForward, V extends VForward> voidif forward(Record<K,timestamp V>< record0) {
             delegate.forward(record.withKey(key));
         }
    //...
    }

    ...

    This API is not meant to be a public interface. Only used internally by the processValues DSL operation to enforce at runtime that the key hasn't changed.

    Code Block
    class KStreamValueProcessorSupplier<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
    // get ...
    
        static class KStreamValueProcessor<KIn, VIn, VOut> extends ContextualProcessor<KIn, VIn, KIn, VOut> {
    
            private final Processor<KIn, VIn, KIn, VOut> processor;
    
       throw new StreamsException(
                    "Malformed Record",
                    new IllegalArgumentException("Timestamp may not be negative. Got: " + timestamp)
             private ValueProcessorContext<KIn, VOut> processorContext);
    
            public KStreamValueProcessor(Processor<KIn, VIn, KIn, VOut> transformer) {
        }
            this.timestamp = timestamp;
            this.processorheaders = transformernew RecordHeaders(headers);
        }
    
        public K key() { return key; }
    
        public V value() { return @Override
        value; }
    
        public long timestamp() { return timestamp; }
    
        public voidHeaders initheaders(final) ProcessorContext<KIn,{ VOut>return context) {
    headers; }
    
        public <NewV> FixedKeyRecord<K, NewV> withValue(final NewV value) { return new super.init(context);
    FixedKeyRecord<>(key, value, timestamp, headers); }
    
        public FixedKeyRecord<K, V> withTimestamp(final long timestamp) { return  this.processorContext = new ValueProcessorContext<>(context);
    new FixedKeyRecord<>(key, value, timestamp, headers); }
    
        public FixedKeyRecord<K, V> withHeaders(final Headers headers) { return new processor.init(processorContext);
            }
    
            @Override
            public void process(final Record<KIn, VIn> record) {
                processorContext.setRecordKey(record.key());
         FixedKeyRecord<>(key, value, timestamp, headers); }
    }


    FixedKeyProcessorSupplier

    Code Block
    @FunctionalInterface
    public interface FixedKeyProcessorSupplier<KIn, VIn, VOut> extends ConnectedStoreProvider, Supplier<FixedKeyProcessor<KIn, VIn, VOut>> {
        FixedKeyProcessor<KIn, VIn, VOut> get();
    }



    FixedKeyProcessor


    Code Block
    public interface FixedKeyProcessor<KIn, VIn, VOut> {
    
        default void init(final FixedKeyProcessorContext<KIn, VOut> context) {}
    
        void process(FixedKeyRecord<KIn, VIn> record);
    
        default void close() {}
    }



    FixedKeyContextualProcessor

    Helper, same as ContextualProcessor.

    Code Block
    public abstract class FixedKeyContextualProcessor<KIn, VIn, VOut> implements FixedKeyProcessor<KIn, VIn, VOut> {
    
        private FixedKeyProcessorContext<KIn, VOut> context;
    
        protected FixedKeyContextualProcessor() {}
    
        @Override
        public void init(final FixedKeyProcessorContext<KIn, VOut> context) {
            processor.process(record)this.context = context;
        }
    
        protected final FixedKeyProcessorContext<KIn, VOut> processorContext.clearRecordKeycontext(); {
            }return context;
        }
    }
    
    }


    ProcessingContext

    To be extended by FixedKeyProcessorContext  and ProcessorContext Based on the previous example,the key will have to be changed to void to prove that it hasn't changed during the processor :

    Code Block
    interface  private Topology topology() {
    ProcessingContext {
    
        String applicationId();
    
         final var builder = new StreamsBuilderTaskId taskId();
    
        Optional<RecordMetadata> recordMetadata();
    
        final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()));
        input.processValues(() -> new ContextualProcessor<String, String, String, String>() {
          @Override
    Serde<?> keySerde();
    
        Serde<?> valueSerde();
    
        File stateDir();
    
        StreamsMetrics metrics();
    
        <S extends StateStore> S getStateStore(final String name);
    
        Cancellable schedule(final Duration interval,
              public void process(Record<String, String> record) {
            for (var sfinal : record.value().split(",")) {PunctuationType type,
              context().forward(record.withKey((Void) null).withValue("Hello " + s));
            }
        final Punctuator }callback);
    
        }, Named.as("test"))void commit();
    
        .to("output"Map<String, Produced.with(Serdes.String(), Serdes.String()));
        return builder.build();
      } 

    ...

    Object> appConfigsWithPrefix(final String prefix);
    }


    FixedKeyProcessorContext

    Code Block
    public interface FixedKeyProcessorContext<KForward, VForward> extends ProcessingContext {
    
        <K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> record);
    
        <K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> record, final String childName);
    }


    Compatibility, Deprecation, and Migration Plan

    Modified operation KStream#process 

    KStreams#process Return type change

    Changing return type from void to KStream<KOut, VOut> is source-compatible, but not binary-compatible. It Modifications to method KStream#processare source compatible with the previous version, though not binary compatible. Therefore will require users to recompile their applications with the application to use the latest version of the library.

    Though users will not require to change their code as the current returned value is void, and the input processor supplier types will include Void  that is the current type for output key and value.

    Transform  methods deprecation

    KStream#*transform* deprecations

    This KIP is including the deprecation of the transform operations on KStream to propose using the latest Processor API operations.

    Transform API is not marked as deprecated yet, as it requires additional considerationsTransform APIs won't be deprecated as part of this API, but transform* operations are. This and Transform API use of oldProcessorContextshould warn users and drive adoption of the methods proposed in this KIP.Even though, the new Processor API could potentially replace most Transformer implementations, it is not yet possible to deprecate Transformer APIs:

    • Transform API is broadly adopted for custom processing. Even though most functionality is possible to implement with new Processor, this migration is not straightforward and will require reimplementing custom processors.
    • KTable#transformValues is an interesting use-case for ValueTransformerWithKey as the internal value is Change<V> — an internal type to handle record changes on KTable — not exposed as public API. An approach would be to deprecate this method in favor of .toStream().processValues().toTable().

    A new KIP should be proposed to continue the deprecation of Transformer  APIs.

    Rejected Alternatives

    ...

    Migrate Transform APIs

    ...

    to the latest ProcessContext

    This would involve extending KStream/KTable APIs even more with specific Transform variants. If a new Processor is able to can support most Transform in the long-term becoming the adopted option to transform values, flat transform, etc.; then we should start getting the new Processor properly adopted in the DSL and let the usage drive the next steps.

    Runtime key validation on KStream#processValues

    ...

    and changing key output type to Void

    On one hand, with Void type for key output, we force the users to cast to Void and change the key to null, though this can be documented on the API, so the users are aware of the peculiarity of forwarding within processValues. 

    On the other hand, keeping the key type as output doesn't require to do any change of keys, but this could lead to key-checking runtime exceptions when changing keys.

    This KIP proposes

    ...

    adding a new Record type for Fixed Keys, to remove the need for runtime checked and reduce as much as possible the ability to change the key when implementing the processor.

    References