THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- Similar to org.apache.kafka.streams.processor.Processor, but adds output generic type parameters
- Bounds to the forwarding types allowed on the ProcessorContext
- Add init and close are defaulted to no-op for convenience
- Javadocs are similar to existing Processor interface
- updates the process method to use a complex Record type and pass the record metadata to process (only when it's defined)
Code Block |
---|
public interface Processor<KIn, VIn, KOut, VOut> { default void init(ProcessorContext<KOut, VOut> context) {} void process(KIn keyRecord<KIn, VIn> record, VInOptional<RecordMetadata> valuerecordMetadata); default void close() {} } |
...
Code Block |
---|
public interface ProcessorSupplier<KIn, VIn, KOut, VOut> { Processor<KIn, VIn, KOut, VOut> get(); } |
(
...
alter class) org.apache.kafka.streams
...
.
...
processor.ProcessorContext
- Drop the deprecated members of processor.ProcessorContext
- Alter getStateStore so that callers will no longer have to cast to the concrete store of their choice (although a cast is still done internally)
- Drop `register(StateStore, StateRestoreCallback)`, which will be moved to StateStoreContext
- This change is backward compatible
Code Block |
---|
public interface ProcessorContext {
- StateStore getStateStore(final String name |
Code Block |
public interface ProcessorContext<K, V> { ... - <K, V> void forward(final K key, final V value); + <K1<S extends K,StateStore> V1 extends V> void forwardS getStateStore(final K1 key, final V1 value); . <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final To to); - StateStore getStateStore(final String name); + <S extends StateStore> S getStateStore(final String name); - void register(StateStore store, StateRestoreCallback stateRestoreCallbackString name); } |
(new class) org.apache.kafka.streams.processor.api.ProcessorContext
- Copy of processor.ProcessorContext with added generic parameters <K, V>
- code snippet below shows how the new API compares to processor.ProcessorContext
- Alter forward to take Record and optional childName
- Drop the deprecated members of processor.ProcessorContext
- Alter getStateStore so that callers will no longer have to cast to the concrete store of their choice (although a cast is still done internally)
- Drop `register(StateStore, StateRestoreCallback)`, which will be moved to StateStoreContext
- Drop the "record context" methods, which will be moved to Record and RecordMetadata
Code Block |
---|
public interface ProcessorContext<K, V> {
...
- <KForward, VForward> void forward(final K key, final V value);
- <KForward, VForward> void forward(final K key, final V value, final To to);
+ <K extends KForward, V extends VForward> void forward(Record<K, V> record);
+ <K extends KForward, V extends VForward> void forward(Record<K, V> record, String childName);
- StateStore getStateStore(final String name);
+ <S extends StateStore> S getStateStore(final String name);
- void register(StateStore store, StateRestoreCallback stateRestoreCallback);
- String topic();
- int partition();
- long offset();
- Headers headers();
- long timestamp();
} |
(new class) org.apache.kafka.streams.processor.api.Record
- encapsulates all the data attributes of a record for processing: key, value, timestamp, and headers
- can be used both to receive a record for processing in Processor and to forward a record downstream in ProcessorContext
- includes a constructor for creating a new Record from scratch as well as builder-style methods for making a shallow copy of a Record with an attribute changed
Code Block | ||||
---|---|---|---|---|
| ||||
public class Record<K, V> {
public Record(final K key, final V value, final long timestamp, final Headers headers);
public Record(final K key, final V value, final long timestamp);
public K key();
public V value();
public long timestamp();
public Headers headers();
public <NewK> Record<NewK, V> withKey(final NewK key);
public <NewV> Record<K, NewV> withValue(final NewV value);
public Record<K, V> withTimestamp(final long timestamp);
public Record<K, V> withHeaders(final Headers headers);
}
|
(new class) org.apache.kafka.streams.processor.api.RecordMetadata
- interface that offers a view onto the "record context"
- not settable nor forwardable
- only available when a consumer record is being processed (i.e., it's wrapped with Optional in Processor.process)
Code Block |
---|
public interface RecordMetadata {
String topic();
int partition();
long offset();
} |
(new class) org.apache.kafka.streams.processor.StateStoreContext
...