You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current state: Under discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In order for state stores to provide stronger consistency in the future (e.g., RYW consistency) they need to be able to collect record metadata (e.g., offset information).

Today, we already make record metadata available in the AbstractProcessContext (recordMetadata()), but the call is not currently exposed through the StateStoreContext interface that is used by the state store. 

The task of this ticket is to expose recordMetadata in the StateStoreContext. 

Public Interfaces

The following interface will change:

  • org.apache.kafka.stream.processor.StateStoreContext - adding method recordMetadata()

Proposed Changes

The code segment below shows the actual code change together with the corresponding JavaDoc:

public interface StateStoreContext {
    ...

    /**
     * Return the metadata of the current record if available. Processors may be invoked to
     * process a source record from an input topic, to run a scheduled punctuation
     * (see {@link org.apache.kafka.streams.processor.api.ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}),
     * or because a parent processor called {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}.
     * <p>
     * In the case of a punctuation, there is no source record, so this metadata would be
     * undefined. Note that when a punctuator invokes {@link ProcessorContext#forward(Record)},
     * downstream processors will receive the forwarded record as a regular
     * {@link Processor#process(Record)} invocation. In other words, it wouldn't be apparent to
     * downstream processors whether or not the record being processed came from an input topic
     * or punctuation and therefore whether or not this metadata is defined. This is why
     * the return type of this method is {@link Optional}.
     * <p>
     * If there is any possibility of punctuators upstream, any access
     * to this field should consider the case of
     * "<code>recordMetadata().isPresent() == false</code>".
     * Of course, it would be safest to always guard this condition.
     */
    Optional<RecordMetadata> recordMetadata();

    ...
}


Compatibility, Deprecation, and Migration Plan

The change of the StateStoreContext interface does not require any changes to any of the implementations of the interface, it merely exposes (recordMetadata) a method that is already implemented in AbstractProcessorContext. Therefore we do not expect any compatibility issues. Moreover, recordMetadata returns an object of type RecordMetadata which is exclusively read-only, thus, protecting Kafka-internal from being tampered by applications. 

Rejected Alternatives

None.

  • No labels