THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
As an alternative to enforce the same key, ProcessorContext wrapper could be implemented to define an initial key:
Code Block | ||
---|---|---|
| ||
public class ValueProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward> { //... private KForward key; public void setRecordKey(KForward initialKey) { this.key = initialKey; } public void clearRecordKey() { this.key = null; } @Override public <K extends KForward, V extends VForward> void forward(Record<K, V> record) { if (key != null) { if (!record.key().equals(key) != key) { // referencial equility is enough to enforce that key is not updated throw new IllegalArgumentException("Key has changed while processing the record and requires enforcing repartitioning."); } } delegate.forward(record); } //... } |
...
This ValueProcessorContext
could be used internally at the Processor
level, and the method setRecordKey(K)
must be used before calling user-defined Processor
.
Code Block |
---|
public class KStreamValueProcessorSupplier<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> { // get ... public static class KStreamValueProcessor<KIn, VIn, VOut> extends ContextualProcessor<KIn, VIn, KIn, VOut> { private final Processor<KIn, VIn, KIn, VOut> processor; private ValueProcessorContext<KIn, VOut> processorContext; public KStreamValueProcessor(Processor<KIn, VIn, KIn, VOut> transformer) { this.processor = transformer; } @Override public void init(final ProcessorContext<KIn, VOut> context) { super.init(context); this.processorContext = new ValueProcessorContext<>(context); processor.init(processorContext); } @Override public void process(final Record<KIn, VIn> record) { processorContext.setRecordKey(record.key()); processor.process(record); processorContext.clearRecordKey(); } } } |
...
Code Block |
---|
12:08:19.125 [new-process-c2e58cba-3c47-41a9-b221-adea4cd38c87-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [new-process-c2e58cba-3c47-41a9-b221-adea4cd38c87-StreamThread-1] Failed to process stream task 0_0 due to the following error: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=words, partition=0, offset=60, stacktrace=java.lang.IllegalArgumentException: Key has changed while processing the record and requires enforcing repartitioning. |
Compatibility, Deprecation, and Migration Plan
...