THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
public interface StateStoreContext { ... /** * Return the metadata of the current record if available. Processors may be invoked to * process a source record from an input topic, to run a scheduled punctuation * (see {@link org.apache.kafka.streams.processor.api.ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}), * or because a parent processor called {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}. * <p> * In the case of a punctuation, there is no source record, so this metadata would be * undefined. Note that when a punctuator invokes {@link ProcessorContext#forward(Record)}, * downstream processors will receive the forwarded record as a regular * {@link Processor#process(Record)} invocation. In other words, it wouldn't be apparent to * downstream processors whether or not the record being processed came from an input topic * or punctuation and therefore whether or not this metadata is defined. This is why * the return type of this method is {@link Optional}. * <p> * If there is any possibility of punctuators upstream, any access * to this field should consider the case of * "<code>recordMetadata().isPresent() == false</code>". * Of course, it would be safest to always guard this condition. */ Optional<RecordMetadata> recordMetadata(); ... } |
A POC with an example showing how recordMetadata can be used in state stores is given here: https://github.com/patrickstuedi/kafka/tree/record-metadata2
Compatibility, Deprecation, and Migration Plan
...