Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: changed package name

...

  1. A new class name (like processor.Processor2 or processor.TypedProcessor)
  2. A new package name (like processor2processor.generic..Processor or processor.new.Processor)

...

The package name has the drawback that, in Java, you can't import a package, so any references to the new interfaces would need to be fully qualified as long as they co-exist with the old interfaces in source code. On the plus side, once we ultimately remove the old interface, we can then just import org.apache.kafka.streams.processor2processor.generic.Processor, and then get back to source code that references only Processor. A relevant precedent would be the nio package in the standard library, which is the "new" version of the io package.

...

  • these classes are deprecated, which would be propagated to any public APIs that reference them.

(new class) org.apache.kafka.streams.

...

processor.generic.Processor

  • Similar to org.apache.kafka.streams.processor.Processor, but adds output generic type parameters
  • Bounds to the forwarding types allowed on the ProcessorContext
  • Add init and close are defaulted to no-op for convenience
  • Javadocs are similar to existing Processor interface
Code Block
public interface Processor<KIn, VIn, KOut, VOut> {
  default void init(ProcessorContext<KOut, VOut> context) {}
  void process(KIn key, VIn value);
  default void close() {}
}

(new class) org.apache.kafka.streams.

...

processor.generic.ProcessorSupplier

  • Just a Supplier for the new Processor type
  • could potentially be replaced with `Supplier<Processor<KIn, VIn, KOut, VOut>` 

...

Code Block
public synchronized <KIn, VIn, KOut, VOut> StreamsBuilder addGlobalStore(
  final StoreBuilder storeBuilder,
  final String topic,
  final Consumed consumed,
  final processor2processor.generic.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier
);

...

Code Block
public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(
  final String name,
  final processor2processor.generic.ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
  final String... parentNames
);

public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore(
  final StoreBuilder storeBuilder,
  final String sourceName,
  final Deserializer<KIn> keyDeserializer,
  final Deserializer<VIn> valueDeserializer,
  final String topic,
  final String processorName,
  final processor2processor.generic.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier,
);

public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore(
  final StoreBuilder storeBuilder,
  final String sourceName,
  final TimestampExtractor timestampExtractor,
  final Deserializer<KIn> keyDeserializer,
  final Deserializer<VIn> valueDeserializer,
  final String topic,
  final String processorName,
  final processor2processor.generic.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier,
);

...

Code Block
void process(
  final processor2processor.generic.ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier, 
  final String... stateStoreNames
)

void process(
  final processor2processor.generic.ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
  final Named named,
  final String... stateStoreNames
)

...