You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: "Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-478, a strongly typed Processor API, brings the option to reconsider the abstractions for custom processing in the Kafka Streams DSL.

Currently, multiple Transformers and a final Processor operation are the options available to implement custom processing with access to Record context; including record metadata (incl. topic, partition, offset), timestamp and headers; and reference to state stores.

There have been discussions on how to refactor these APIs to minimize/simplify into a single processor:

Transformers are currently limited to the old ProcessorContext. With the addition of the new Processor API, KStream can be extended to offer access to the new, chainable API that can replace most of the operations transformers offer at the moment, with more open control to forward records.

This KIP could be considered as a step towards deprecating Transforms, though that should be discussed in a follow-up KIP.

Public Interfaces

New methods:

  • KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut> processorSupplier)
  • KStream<K,VOut> KStream#processValues(ProcessorSupplier<K, V, K, VOut> processorSupplier)

Proposed Changes

New Processor APIs replacing most Transformers

With the ability to manage forward calls as part of the Processor itself, flatTransform can be replaced by a Processor:

  private Topology topology() {
    final var builder = new StreamsBuilder();
    final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()));
    input.processValues(() -> new ContextualProcessor<String, String, String, String>() {
      @Override
      public void process(Record<String, String> record) {
        for (var s : record.value().split(",")) {
          context().forward(record.withValue("Hello " + s));
        }
      }
    }, Named.as("test"))
    .to("output", Produced.with(Serdes.String(), Serdes.String()));
    return builder.build();
  } 


New ValueProcessorContext to enforce only Value changes

Processors that require enforcing the same key to avoid repartitioning at the moment trust internal implementations. As a result the KStream API added *Values methods to signal when key is not changed, and repartitioning can be avoided.

At the moment record keys can be changed or records recreated between the record entering Processor#process(Record) and ProcessorContext#forward(Record).

As an alternative to enforce the same key, ProcessorContext wrapper could be implemented to define an initial key:



public class ValueProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward> {
//...
    private KForward key;
    
    public void setRecordKey(KForward initialKey) {
        this.key = initialKey;
    }
    
    public void clearRecordKey() {
        this.key = null;
    }

    @Override
    public <K extends KForward, V extends VForward> void forward(Record<K, V> record) {
        if (key != null) {
            if (!record.key().equals(key)) {
                throw new IllegalArgumentException("Key has changed while processing the record and requires enforcing repartitioning.");
            }
        }
        delegate.forward(record);
    }
//...
}


This ValueProcessorContext could be used internally at the Processor level, and the method setRecordKey(K) must be used before calling user-defined Processor.

public class KStreamValueProcessorSupplier<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
// get ...

    public static class KStreamValueProcessor<KIn, VIn, VOut> extends ContextualProcessor<KIn, VIn, KIn, VOut> {

        private final Processor<KIn, VIn, KIn, VOut> processor;

        private ValueProcessorContext<KIn, VOut> processorContext;

        public KStreamValueProcessor(Processor<KIn, VIn, KIn, VOut> transformer) {
            this.processor = transformer;
        }

        @Override
        public void init(final ProcessorContext<KIn, VOut> context) {
            super.init(context);
            this.processorContext = new ValueProcessorContext<>(context);
            processor.init(processorContext);
        }

        @Override
        public void process(final Record<KIn, VIn> record) {
            processorContext.setRecordKey(record.key());
            processor.process(record);
            processorContext.clearRecordKey();
        }
    }

}

Based on the previous example, if key is changed then processing will fail:



  private Topology topology() {
    final var builder = new StreamsBuilder();
    final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()));
    input.processValues(() -> new ContextualProcessor<String, String, String, String>() {
      @Override
      public void process(Record<String, String> record) {
        for (var s : record.value().split(",")) {
          context().forward(record.withKey("FAIL").withValue("Hello " + s));
        }
      }
    }, Named.as("test"))
    .to("output", Produced.with(Serdes.String(), Serdes.String()));
    return builder.build();
  } 



 12:08:19.125 [new-process-c2e58cba-3c47-41a9-b221-adea4cd38c87-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [new-process-c2e58cba-3c47-41a9-b221-adea4cd38c87-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=words, partition=0, offset=6, stacktrace=java.lang.IllegalArgumentException: Key has changed while processing the record and requires enforcing repartitioning. 



Compatibility, Deprecation, and Migration Plan

Modified method KStream#process should be compatible with previous version, that at the moment is fixed to a Void return type.

Leaving Transform API with older ProcessorContext could be considered a driver to adopt the methods proposed in this KIP.

Even though, the new Processor API could potentially replace most Transformer implementations, it is not yet possible to deprecate Transformer APIs:

  • Transform API is broadly adopted for custom processing. Even though most functionality is possible to implement with new Processor, this migration is not straightforward.
  • KTable#transformValues is an interesting use-case for ValueTransformerWithKey as the internal value is Change<V> — an internal type to handle record changes on KTable — not exposed as public API. An approach would be to deprecate this method in favor of .toStream().processValues().toTable().

Rejected Alternatives

Create new Transform APIs with latest ProcessContext

This would involve extending KStream/KTable APIs even more with specific Transform variants. If new Processor is able to support most Transform in the long-term becoming the adopted option to transform values, flat transform, etc.; then we should start getting the new Processor properly adopted in the DSL and let the usage drive the next steps.

References

  • No labels