Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updated the proposal to specify changes to KStream#process

...

Code Block
public interface Processor<KIn, VIn, KOut, VOut> {
  default void init(ProcessorContext<KOut, VOut> context) {}
  void process(Record<KIn, VIn> record, Optional<RecordMetadata> recordMetadata);
  default void close() {}
}

...

Code Block
public interface ProcessorContext<K, V> {
  ...
- <KForward,           VForward>           void forward(final K  key, final V  value);
- <KForward,           VForward>           void forward(final K  key, final V  value, final To to);
+ <K extends KForward, V extends VForward> void forward(Record<K, V> record);
+ <K extends KForward, V extends VForward> void forward(Record<K, V> record, String childName);

- StateStore getStateStore(final String name);
+ <S extends StateStore> S getStateStore(final String name);


- void register(StateStore store, StateRestoreCallback stateRestoreCallback);

- String topic();
- int partition();
- long offset();
- Headers headers();
- long timestamp();
+ Optional<RecordMetadata> recordMetadata();
}

(new class) org.apache.kafka.streams.processor.api.Record

...

Code Block
public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(
  final String name,
  final processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
  final String... parentNames
);

public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore(
  final StoreBuilder storeBuilder,
  final String sourceName,
  final Deserializer<KIn> keyDeserializer,
  final Deserializer<VIn> valueDeserializer,
  final String topic,
  final String processorName,
  final processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier,
);

public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore(
  final StoreBuilder storeBuilder,
  final String sourceName,
  final TimestampExtractor timestampExtractor,
  final Deserializer<KIn> keyDeserializer,
  final Deserializer<VIn> valueDeserializer,
  final String topic,
  final String processorName,
  final processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier,
);


(deprecation and new method) org.apache.kafka.streams.kstream.KStream

...

.process

Note that this API is a candidate for change in the future as a part of

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10603

In the mean time, we will provide a migration path to the new PAPI. Since the KStreams.process currently does not allow forwarding, we will set the KOut and VOut parameters to Void, Void.

Code Block
// DEPRECATIONS:
/*
...
* @deprecated Since 3.0. Use {@link KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, java.lang.String...)} instead.
*/
@Deprecated
void process(
 org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,
 final String... stateStoreNames
);

*/
...
* @deprecated Since 3.0. Use {@link KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, org.apache.kafka.streams.kstream.Named, java.lang.String...)} instead.
*/
@Deprecated
void process(
 org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,
 Named named,
 String... stateStoreNames
);

// NEW METHODS:
void process(
 ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
 String... stateStoreNames
);

void process(
 ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
 Named named,
 String... stateStoreNames
);

We will also do the same with the Scala API. Note that we depart from the typical scala-api pattern for suppliers (`()=>Processor`) and take a ProcessorSupplier, because otherwise the new and old methods will clash after type erasure.

Also, we are taking the forwarding type as Void instead of Unit because it is not possible for the scala API implementation to convert a `ProcessorSupplier[K, V, Unit, Unit]` parameter to a `ProcessorSupplier[K, V, Void, Void]` argument to the java API. The only impact of this is that implementers would have to call forward with `forward(null, null)` instead of `forward((),())`. Since the actual intent is for implementers not to call forward at all, this seems like an inconsequential incongruity.

Code Block
// DEPRECATIONS:

@deprecated(since = "3.0", message = "Use process(ProcessorSupplier, String*) instead.")
def process(
 processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
 stateStoreNames: String*
): Unit

@deprecated(since = "3.0", message = "Use process(ProcessorSupplier, String*) instead.")
def process(
 processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
 named: Named,
 stateStoreNames: String*
): Unit

// NEW METHODS
def process(processorSupplier: ProcessorSupplier[K, V, Void, Void], stateStoreNames: String*): Unit

def process(processorSupplier: ProcessorSupplier[K, V, Void, Void], named: Named, stateStoreNames: String*): Unit
  • These changes are fully backward compatible
Code Block
void process(
  final processor.api.ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier, 
  final String... stateStoreNames
)

void process(
  final processor.api.ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
  final Named named,
  final String... stateStoreNames
)

(unchanged) org.apache.kafka.streams.kstream.{Transformer, ValueTransformer, ValueTransformerWithKey}

Just explicitly stating that the Transformer interfaces would not be changed at all. The generics case for Transformer is a little more complicated, and I'd like to give it the consideration it really deserves within the scope of https://issues.apache.org/jira/browse/KAFKA-8396 .

This future work is tracked as

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10603

(new class) (test-utils) org.apache.kafka.streams.processor.api.MockProcessorContext

...