Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public interface StateStoreContext {
    ...

    /**
     * Return the metadata of the current record if available. Processors may be invoked to
     * process a source record from an input topic, to run a scheduled punctuation
     * (see {@link org.apache.kafka.streams.processor.api.ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}),
     * or because a parent processor called {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}.
     * <p>
     * In the case of a punctuation, there is no source record, so this metadata would be
     * undefined. Note that when a punctuator invokes {@link ProcessorContext#forward(Record)},
     * downstream processors will receive the forwarded record as a regular
     * {@link Processor#process(Record)} invocation. In other words, it wouldn't be apparent to
     * downstream processors whether or not the record being processed came from an input topic
     * or punctuation and therefore whether or not this metadata is defined. This is why
     * the return type of this method is {@link Optional}.
     * <p>
     * If there is any possibility of punctuators upstream, any access
     * to this field should consider the case of
     * "<code>recordMetadata().isPresent() == false</code>".
     * Of course, it would be safest to always guard this condition.
     */
    Optional<RecordMetadata> recordMetadata();

    ...
}


A POC with an example showing how recordMetadata can be used in state stores is given here:  https://github.com/patrickstuedi/kafka/tree/record-metadata2

Compatibility, Deprecation, and Migration Plan

...