...
- A new class name (like
processor.Processor2
orprocessor.TypedProcessor
) - A new package name (like
processor.generic.api.Processor
orprocessorprocessor2.new.Processor
)
The current interface is processor.Processor
, for reference. The class name seems clunkier, since even after we deprecate and remove Processor
, the new names would continue to be visible in source code. This wouldn't be bad if there were some obvious good name for the new interface, but unfortunately Processor
seems like the perfect name.
The package name has the drawback that, in Java, you can't import a package, so any references to the new interfaces would need to be fully qualified as long as they co-exist with the old interfaces in source code. On the plus side, once we ultimately remove the old interface, we can then just import org.apache.kafka.streams.processor.genericapi.Processor
, and then get back to source code that references only Processor
. A relevant precedent would be the nio
package in the standard library, which is the "new" version of the io
package.
...
- these classes are deprecated, which would be propagated to any public APIs that reference them.
(new class) org.apache.kafka.streams.processor.
...
api.Processor
- Similar to org.apache.kafka.streams.processor.Processor, but adds output generic type parameters
- Bounds to the forwarding types allowed on the ProcessorContext
- Add init and close are defaulted to no-op for convenience
- Javadocs are similar to existing Processor interface
Code Block |
---|
public interface Processor<KIn, VIn, KOut, VOut> { default void init(ProcessorContext<KOut, VOut> context) {} void process(KIn key, VIn value); default void close() {} } |
(new class) org.apache.kafka.streams.processor.
...
api.ProcessorSupplier
- Just a Supplier for the new Processor type
- could potentially be replaced with `Supplier<Processor<KIn, VIn, KOut, VOut>`
...
Code Block |
---|
public synchronized <KIn, VIn, KOut, VOut> StreamsBuilder addGlobalStore( final StoreBuilder storeBuilder, final String topic, final Consumed consumed, final processor.genericapi.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier ); |
...
Code Block |
---|
public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor( final String name, final processor.genericapi.ProcessorSupplier<KIn, VIn, KOut, VOut> supplier, final String... parentNames ); public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore( final StoreBuilder storeBuilder, final String sourceName, final Deserializer<KIn> keyDeserializer, final Deserializer<VIn> valueDeserializer, final String topic, final String processorName, final processor.genericapi.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier, ); public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore( final StoreBuilder storeBuilder, final String sourceName, final TimestampExtractor timestampExtractor, final Deserializer<KIn> keyDeserializer, final Deserializer<VIn> valueDeserializer, final String topic, final String processorName, final processor.genericapi.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier, ); |
...
Code Block |
---|
void process( final processor.genericapi.ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier, final String... stateStoreNames ) void process( final processor.genericapi.ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier, final Named named, final String... stateStoreNames ) |
...