Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update to include latest discussed modifications

...

  • Similar to org.apache.kafka.streams.processor.Processor, but adds output generic type parameters
  • Bounds to the forwarding types allowed on the ProcessorContext
  • Add init and close are defaulted to no-op for convenience
  • Javadocs are similar to existing Processor interface
  • updates the process method to use a complex Record type and pass the record metadata to process (only when it's defined)
Code Block
public interface Processor<KIn, VIn, KOut, VOut> {
  default void init(ProcessorContext<KOut, VOut> context) {}
  void process(KIn keyRecord<KIn, VIn> record, VInOptional<RecordMetadata> valuerecordMetadata);
  default void close() {}
}

...

Code Block
public interface ProcessorSupplier<KIn, VIn, KOut, VOut> {
  Processor<KIn, VIn, KOut, VOut> get();
}

(

...

alter class) org.apache.kafka.streams

...

.

...

processor.ProcessorContext

  • Drop the deprecated members of processor.ProcessorContext
  • Alter getStateStore so that callers will no longer have to cast to the concrete store of their choice (although a cast is still done internally)
  • Drop `register(StateStore, StateRestoreCallback)`, which will be moved to StateStoreContext
    • This change is backward compatible
Code Block
public interface ProcessorContext {

- StateStore getStateStore(final String name
Code Block
public interface ProcessorContext<K, V> {
  ...
- <K,            V>            void forward(final K  key, final V  value);
+ <K1<S extends K,StateStore> V1 extends V> void forwardS getStateStore(final K1 key, final V1 value);
. <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final To to);

- StateStore getStateStore(final String name);
+ <S extends StateStore> S getStateStore(final String name);


- void register(StateStore store, StateRestoreCallback stateRestoreCallbackString name);

}

(new class) org.apache.kafka.streams.processor.api.ProcessorContext

  • Copy of processor.ProcessorContext with added generic parameters <K, V>
    • code snippet below shows how the new API compares to processor.ProcessorContext
  • Alter forward to take Record and optional childName
  • Drop the deprecated members of processor.ProcessorContext
  • Alter getStateStore so that callers will no longer have to cast to the concrete store of their choice (although a cast is still done internally)
  • Drop `register(StateStore, StateRestoreCallback)`, which will be moved to StateStoreContext
  • Drop the "record context" methods, which will be moved to Record and RecordMetadata
Code Block
public interface ProcessorContext<K, V> {
  ...
- <KForward,           VForward>           void forward(final K  key, final V  value);
- <KForward,           VForward>           void forward(final K  key, final V  value, final To to);
+ <K extends KForward, V extends VForward> void forward(Record<K, V> record);
+ <K extends KForward, V extends VForward> void forward(Record<K, V> record, String childName);

- StateStore getStateStore(final String name);
+ <S extends StateStore> S getStateStore(final String name);


- void register(StateStore store, StateRestoreCallback stateRestoreCallback);

- String topic();
- int partition();
- long offset();
- Headers headers();
- long timestamp();
}

(new class) org.apache.kafka.streams.processor.api.Record

  • encapsulates all the data attributes of a record for processing: key, value, timestamp, and headers
  • can be used both to receive a record for processing in Processor and to forward a record downstream in ProcessorContext
  • includes a constructor for creating a new Record from scratch as well as builder-style methods for making a shallow copy of a Record with an attribute changed
Code Block
languagejava
titleRecord
public class Record<K, V> {
  public Record(final K key, final V value, final long timestamp, final Headers headers);
  public Record(final K key, final V value, final long timestamp);

  public K key();
  public V value();
  public long timestamp();
  public Headers headers();

  public <NewK> Record<NewK, V> withKey(final NewK key);
  public <NewV> Record<K, NewV> withValue(final NewV value);
  public Record<K, V> withTimestamp(final long timestamp);
  public Record<K, V> withHeaders(final Headers headers);
}

(new class) org.apache.kafka.streams.processor.api.RecordMetadata

  • interface that offers a view onto the "record context"
  • not settable nor forwardable
  • only available when a consumer record is being processed (i.e., it's wrapped with Optional in Processor.process)
Code Block
public interface RecordMetadata {
    String topic();
    int partition();
    long offset();
}

(new class) org.apache.kafka.streams.processor.StateStoreContext

...