Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. A new class name (like processor.Processor2 or processor.TypedProcessor)
  2. A new package name (like processor.generic.api.Processor or processorprocessor2.new.Processor)

The current interface is processor.Processor, for reference. The class name seems clunkier, since even after we deprecate and remove Processor, the new names would continue to be visible in source code. This wouldn't be bad if there were some obvious good name for the new interface, but unfortunately Processor seems like the perfect name.

The package name has the drawback that, in Java, you can't import a package, so any references to the new interfaces would need to be fully qualified as long as they co-exist with the old interfaces in source code. On the plus side, once we ultimately remove the old interface, we can then just import org.apache.kafka.streams.processor.genericapi.Processor, and then get back to source code that references only Processor. A relevant precedent would be the nio package in the standard library, which is the "new" version of the io package.

...

  • these classes are deprecated, which would be propagated to any public APIs that reference them.

(new class) org.apache.kafka.streams.processor.

...

api.Processor

  • Similar to org.apache.kafka.streams.processor.Processor, but adds output generic type parameters
  • Bounds to the forwarding types allowed on the ProcessorContext
  • Add init and close are defaulted to no-op for convenience
  • Javadocs are similar to existing Processor interface
Code Block
public interface Processor<KIn, VIn, KOut, VOut> {
  default void init(ProcessorContext<KOut, VOut> context) {}
  void process(KIn key, VIn value);
  default void close() {}
}

(new class) org.apache.kafka.streams.processor.

...

api.ProcessorSupplier

  • Just a Supplier for the new Processor type
  • could potentially be replaced with `Supplier<Processor<KIn, VIn, KOut, VOut>` 

...

Code Block
public synchronized <KIn, VIn, KOut, VOut> StreamsBuilder addGlobalStore(
  final StoreBuilder storeBuilder,
  final String topic,
  final Consumed consumed,
  final processor.genericapi.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier
);

...

Code Block
public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(
  final String name,
  final processor.genericapi.ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
  final String... parentNames
);

public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore(
  final StoreBuilder storeBuilder,
  final String sourceName,
  final Deserializer<KIn> keyDeserializer,
  final Deserializer<VIn> valueDeserializer,
  final String topic,
  final String processorName,
  final processor.genericapi.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier,
);

public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore(
  final StoreBuilder storeBuilder,
  final String sourceName,
  final TimestampExtractor timestampExtractor,
  final Deserializer<KIn> keyDeserializer,
  final Deserializer<VIn> valueDeserializer,
  final String topic,
  final String processorName,
  final processor.genericapi.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier,
);

...

Code Block
void process(
  final processor.genericapi.ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier, 
  final String... stateStoreNames
)

void process(
  final processor.genericapi.ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
  final Named named,
  final String... stateStoreNames
)

...