Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As an alternative to enforce the same key, ProcessorContext wrapper could be implemented to define an initial key:




Code Block
languagejava
public class ValueProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward> {
//...
    private KForward key;
    
    public void setRecordKey(KForward initialKey) {
        this.key = initialKey;
    }
    
    public void clearRecordKey() {
        this.key = null;
    }

    @Override
    public <K extends KForward, V extends VForward> void forward(Record<K, V> record) {
        if (key != null) {
            if (!record.key().equals(key) != key) {  // referencial equility is enough to enforce that key is not updated
                 throw new IllegalArgumentException("Key has changed while processing the record and requires enforcing repartitioning.");
                  }
        }
        delegate.forward(record);
    }
//...
}

...

This ValueProcessorContext could be used internally at the Processor level, and the method setRecordKey(K) must be used before calling user-defined Processor.

Code Block
public class KStreamValueProcessorSupplier<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
// get ...

    public static class KStreamValueProcessor<KIn, VIn, VOut> extends ContextualProcessor<KIn, VIn, KIn, VOut> {

        private final Processor<KIn, VIn, KIn, VOut> processor;

        private ValueProcessorContext<KIn, VOut> processorContext;

        public KStreamValueProcessor(Processor<KIn, VIn, KIn, VOut> transformer) {
            this.processor = transformer;
        }

        @Override
        public void init(final ProcessorContext<KIn, VOut> context) {
            super.init(context);
            this.processorContext = new ValueProcessorContext<>(context);
            processor.init(processorContext);
        }

        @Override
        public void process(final Record<KIn, VIn> record) {
            processorContext.setRecordKey(record.key());
            processor.process(record);
            processorContext.clearRecordKey();
        }
    }

}

...

Code Block
 12:08:19.125 [new-process-c2e58cba-3c47-41a9-b221-adea4cd38c87-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [new-process-c2e58cba-3c47-41a9-b221-adea4cd38c87-StreamThread-1] Failed to process stream task 0_0 due to the following error: 
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=words, partition=0, offset=60, stacktrace=java.lang.IllegalArgumentException: Key has changed while processing the record and requires enforcing repartitioning. 



Compatibility, Deprecation, and Migration Plan

...