Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updated the proposal to specify changes to KStream#process

...

Discussion threadhttps://lists.apache.org/thread.html/13f306454761ef7318fb9a658b902fb1663a73e3dde542a2c2b29ab4@%3Cdev.kafka.apache.org%3E

Vote Thread: https://lists.apache.org/thread.html/6d3b4f6d286f3f88db3e7f9eebe2f0d361152b84f2021aaf9ba56f8e%40%3Cdev.kafka.apache.org%3E

JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8410

...

  • Similar to org.apache.kafka.streams.processor.Processor, but adds output generic type parameters
  • Bounds to the forwarding types allowed on the ProcessorContext
  • Add init and close are defaulted to no-op for convenience
  • Javadocs are similar to existing Processor interface
  • updates the process method to use a complex Record type and pass the record metadata to process (only when it's defined)
Code Block
public interface Processor<KIn, VIn, KOut, VOut> {
  default void init(ProcessorContext<KOut, VOut> context) {}
  void process(KIn keyRecord<KIn, VInVIn> valuerecord);
  default void close() {}
}

...

Code Block
public interface ProcessorSupplier<KIn, VIn, KOut, VOut> {
  Processor<KIn, VIn, KOut, VOut> get();
}

(

...

alter class) org.apache.kafka.streams.processor

...

.ProcessorContext

  • Copy of processor.ProcessorContext with added generic parameters <K, V>
    • code snippet below shows how the new API compares to processor.ProcessorContext
  • Drop the deprecated members of processor.ProcessorContext
  • Alter getStateStore so that callers will no longer have to cast to the concrete store of their choice (although a cast Alter getStateStore so that callers will no longer have to cast to the concrete store of their choice (although a cast is still done internally)
      Drop `register(StateStore, StateRestoreCallback)`, which will be moved to StateStoreContext
      • This change is backward compatible
    Code Block
    public interface ProcessorContext<K, V>ProcessorContext {
      ...
    - <K,StateStore getStateStore(final String name);
    + <S extends StateStore> S getStateStore(final    V>            void forward(final K  key, final V  value);
    + <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value);
    . <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final To to);
    
    - StateStore getStateStore(final String name);
    + <S extends StateStore> S getStateStore(final String name);
    
    
    - void register(StateStore store, StateRestoreCallback stateRestoreCallback);
    }

    (new class) org.apache.kafka.streams.processor.StateStoreContext

    • Extraction of only the members of ProcessorContext that need to be provided to state stores (via `StateStore#init`)
    • It includes `register(StateStore, StateRestoreCallback)`, which only needs to be called by stores (so it's dropped from the new ProcessorContext)
    • It includes all the "general context" members (app id, config, etc), which are all still in ProcessorContext as well
    • It does not  include anything processor- or record- specific, like `forward()` or any information about the "current" record, which is only a well-defined in the context of the Processor. Processors process one record at a time, but state stores may be used to store and fetch many records, so there is no "current record".

    ...

    languagejava
    titleStateStoreContext
    linenumberstrue
    String name);
    
    }

    (new class) org.apache.kafka.streams.processor.api.ProcessorContext

    • Copy of processor.ProcessorContext with added generic parameters <K, V>
      • code snippet below shows how the new API compares to processor.ProcessorContext
    • Alter forward to take Record and optional childName
    • Drop the deprecated members of processor.ProcessorContext
    • Alter getStateStore so that callers will no longer have to cast to the concrete store of their choice (although a cast is still done internally)
    • Drop `register(StateStore, StateRestoreCallback)`, which will be moved to StateStoreContext
    • Drop the "record context" methods, which will be moved to Record and RecordMetadata
    Code Block
    public interface ProcessorContext<K, V> {
      ...
    - <KForward,           VForward>           void forward(final K  key, final V  value);
    - <KForward,           VForward>           void forward(final K  key, final V  value, final To to);
    + <K extends KForward, V extends VForward> void forward(Record<K, V> record);
    + <K extends KForward, V extends VForward> void forward(Record<K, V> record, String childName);
    
    - StateStore getStateStore(final String name);
    + <S extends StateStore> S getStateStore(final String name);
    
    
    - void register(StateStore store, StateRestoreCallback stateRestoreCallback);
    
    - String topic();
    - int partition();
    - long offset();
    - Headers headers();
    - long timestamp();
    + Optional<RecordMetadata> recordMetadata();
    }

    (new class)

    ...

    org.apache.kafka.streams.processor.api.Record

    • encapsulates all the data attributes of a record for processing: key, value, timestamp, and headers
    • can be used both to receive a record for processing in Processor and to forward a record downstream in ProcessorContext
    • includes a constructor for creating a new Record from scratch as well as builder-style methods for making a shallow copy of a Record with an attribute changed
    Code Block
    languagejava
    titleRecord
    public class Record<K, V> {
      public Record(final K key, final V value, final long timestamp, final Headers headers);
      public Record(final K key, final V value, final long timestamp);
    
      public K key();
      public V value();
      public long timestamp();
      public Headers headers();
    
      public <NewK> Record<NewK, V> withKey(final NewK key);
      public <NewV> Record<K, NewV> withValue(final NewV value);
      public Record<K, V> withTimestamp(final long timestamp);
      public Record<K, V> withHeaders(final Headers headers);
    }
    
    

    (new class) org.apache.kafka.streams.processor.api.RecordMetadata

    • interface that offers a view onto the "record context"
    • not settable nor forwardable
    • only available when a consumer record is being processed (i.e., it's wrapped with Optional in Processor.process)
    Code Block
    public interface RecordMetadata {
        String topic();
        int partition();
        long offset();
    }

    (new class) org.apache.kafka.streams.processor.StateStoreContext

    • Extraction of only the members of ProcessorContext that need to be provided to state stores (via `StateStore#init`)
    • It includes `register(StateStore, StateRestoreCallback)`, which only needs to be called by stores (so it's dropped from the new ProcessorContext)
    • It includes all the "general context" members (app id, config, etc), which are all still in ProcessorContext as well
    • It does not  include anything processor- or record- specific, like `forward()` or any information about the "current" record, which is only a well-defined in the context of the Processor. Processors process one record at a time, but state stores may be used to store and fetch many records, so there is no "current record".
    Code Block
    languagejava
    titleStateStoreContext
    linenumberstrue
    package org.apache.kafka.streams.processor;
    
    import org.apache.kafka.common.serialization.Serde;
    import org.apache.kafka.streams.StreamsMetrics;
    import org.apache.kafka.streams.errors.StreamsException;
    
    import java.io.File;
    import java.util.Map;
    
    /**
     * State store context interface.
     */
    public interface StateStoreContext {
    
        /**
         * Returns the application id.
         *
         * @return the application id
         */
        String applicationId();
    
        /**
         * Returns the task id.
         *
         * @return the task id
         */
        TaskId taskId();
    
        /**
         * Returns the default key serde.
         *
         * @return the key serializer
         */
        Serde<?> keySerde();
    
        /**
         * Returns the default value serde.
         *
         * @return the value serializer
         */
        Serde<?> valueSerde();
    
        /**
         * Returns the state directory for the partition.
         *
         * @return the state directory
         */
        File stateDir();
    
        /**
         * Returns Metrics instance.
         *
         * @return StreamsMetrics
         */
        StreamsMetrics metrics();
    
        /**
         * Registers and possibly restores the specified storage engine.
         *
         * @param store the storage engine
         * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
         *
         * @throws IllegalStateException If store gets registered after initialized is already finished
         * @throws StreamsException if the store's change log does not contain the partition
         */
        void register(final StateStore store,
                      final StateRestoreCallback stateRestoreCallback);
    
        /**
         * Returns all the application config properties as key/value pairserrors.StreamsException;
    
    import java.io.File;
    import java.util.Map;
    
    /**
     * State store context interface.
     */
    public interface StateStoreContext {
    
        /**
         * Returns the application id.
         *
         * @return the application id
         */
        String applicationId();
    
        /**
         * Returns the task id.
         *
         * @return the task id
         */
        TaskId taskId();
    
        /**
         * Returns the default key serde.
         *
         * @return the key serializer
         */
        Serde<?> keySerde();
    
        /**
         * Returns the default value serde.
         *
         * @return the value serializer
         */
        Serde<?> valueSerde();
    
        /**
         * Returns the state directory for the partition.
         *
         * @return <p> The config properties are defined in the state directory{@link org.apache.kafka.streams.StreamsConfig}
         */
     object and associated to File stateDir();
    
    the StateStoreContext.
         /**
         * <p> ReturnsThe Metricstype instance.
    of the values is dependent *
    on the {@link org.apache.kafka.common.config.ConfigDef.Type type} *of @returnthe StreamsMetricsproperty
         */
        StreamsMetrics metrics();
    
     (e.g. the value of {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG DEFAULT_KEY_SERDE_CLASS_CONFIG}
         /**
         * Registers and possibly restores the specified storage engine.* will be of type {@link Class}, even if it was specified as a String to
         *
     {@link org.apache.kafka.streams.StreamsConfig#StreamsConfig(Map) StreamsConfig(Map)}).
      * @param store the storage engine*
         * @param@return stateRestoreCallbackall the key/values restorationfrom callbackthe logicStreamsConfig forproperties
     log-backed state stores upon restart*/
        Map<String, Object> *appConfigs();
    
         /**
     @throws IllegalStateException If store gets* registeredReturns afterall initializedthe isapplication alreadyconfig finished
    properties with the given key *prefix, @throwsas StreamsException if the store's change log does not contain the partitionkey/value pairs
         * stripping the prefix.
         */
        void register(final* StateStore<p> store,
    The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
         * object and associated to final StateRestoreCallback stateRestoreCallback);
    
    the StateStoreContext.
         /**
         * Returns@param allprefix the application config properties as properties prefix
         * @return the key/value pairs.
         *values matching the given prefix from the StreamsConfig properties.
         */
     <p> The config propertiesMap<String, areObject> definedappConfigsWithPrefix(final in the {@link String prefix);
    
    }
    
    

    (deprecation and new method) org.apache.kafka.streams.processor.StateStore

    • Deprecate the existing `init(ProcessorContext)` method and replace it with `init(StateStoreContext)`
    • The new method will have a default implementation that calls the old method, preserving backward compatibility
    • In a major-versioned release, we will delete  the deprecated init method and strip off the `default` keyword from the new method, resulting in a fully compatible transition to the desired end-state in which we only have `init(StateStoreContext)`. This note about the future is informational, we would actually propose this move in a separate KIP.
    Code Block
    languagejava
    titleStateStore
    linenumberstrue
    
    @Deprecated
      void init(.StreamsConfig}
         * object and associated to the StateStoreContext.
         *
         * <p> The type of the values is dependent on the {@link org.apache.kafka.common.config.ConfigDef.Type type} of the property
         * (e.g. the value of {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG DEFAULT_KEY_SERDE_CLASS_CONFIG}
         * will be of type {@link Class}, even if it was specified as a String to
         * {@link org.apache.kafka.streams.StreamsConfig#StreamsConfig(Map) StreamsConfig(Map)}).
         *
         * @return all the key/values from the StreamsConfig properties
         */
        Map<String, Object> appConfigs();
    
        /**
         * Returns all the application config properties with the given key prefix, as key/value pairs
         * stripping the prefix.
         *
         * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
         * object and associated to the StateStoreContext.
         *
         * @param prefix the properties prefix
         * @return the key/values matching the given prefix from the StreamsConfig properties.
         */
        Map<String, Object> appConfigsWithPrefix(final String prefix);
    
    }
    
    

    (deprecation and new method) org.apache.kafka.streams.processor.StateStore

    • Deprecate the existing `init(ProcessorContext)` method and replace it with `init(StateStoreContext)`
    • The new method will have a default implementation that calls the old method, preserving backward compatibility
    • In a major-versioned release, we will delete  the deprecated init method and strip off the `default` keyword from the new method, resulting in a fully compatible transition to the desired end-state in which we only have `init(StateStoreContext)`. This note about the future is informational, we would actually propose this move in a separate KIP.
    Code Block
    languagejava
    titleStateStore
    linenumberstrue
    
    @Deprecated
      void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);
    
    + /**
    +  * Initializes this state store.
    +  * <p>
    +  * The implementation of this function must register the root store in the context via the
    +  * {@link StateStoreContext#register(StateStore, StateRestoreCallback)} function, where the
    +  * first {@link StateStore} parameter should always be the passed-in {@code root} object, and
    +  * the second parameter should be an object of user's implementation
    +  * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
    +  * <p>
    +  * Note that if the state store engine itself supports bulk writes, users can implement another
    +  * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
    +  * let users implement bulk-load restoration logic instead of restoring one record at a time.
    +  *
    +  * @throws IllegalStateException If store gets registered after initialized is already finished
    +  * @throws StreamsException if the store's change log does not contain the partition
    +  */
    + default void init(final StateStoreContext context, final StateStore root) {
    +     // delegate to init(ProcessorContext, StateStore)
    + }

    (new method) org.apache.kafka.streams.StreamsBuilder

    • These changes are fully backward compatible
    Code Block
    public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(
      final StoreBuilder storeBuilder,
      final String topic,
      final Consumed<KIn, VIn> consumed,
      final processor.api.ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier
    );

    (new method) org.apache.kafka.streams.Topology

    • These changes are fully backward compatible
    Code Block
    public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(
      final String name,
      final processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
      final String... parentNames
    );
    
    public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore(
      final StoreBuilder storeBuilder,
      final String sourceName,
      final Deserializer<KIn> keyDeserializer,
      final Deserializer<VIn> valueDeserializer,
      final String topic,
      final String processorName,
      final processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier,
    );
    
    public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore(
      final StoreBuilder storeBuilder,
      final String sourceName,
      final TimestampExtractor timestampExtractor,
      final Deserializer<KIn> keyDeserializer,
      final Deserializer<VIn> valueDeserializer,
      final String topic,
      final String processorName,
      final processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier,
    );

    (new method) org.apache.kafka.streams.kstream.KStream

    • These changes are fully backward compatible
    .processor.ProcessorContext context, StateStore root);
    
    + /**
    +  * Initializes this state store.
    +  * <p>
    +  * The implementation of this function must register the root store in the context via the
    +  * {@link StateStoreContext#register(StateStore, StateRestoreCallback)} function, where the
    +  * first {@link StateStore} parameter should always be the passed-in {@code root} object, and
    +  * the second parameter should be an object of user's implementation
    +  * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
    +  * <p>
    +  * Note that if the state store engine itself supports bulk writes, users can implement another
    +  * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
    +  * let users implement bulk-load restoration logic instead of restoring one record at a time.
    +  *
    +  * @throws IllegalStateException If store gets registered after initialized is already finished
    +  * @throws StreamsException if the store's change log does not contain the partition
    +  */
    + default void init(final StateStoreContext context, final StateStore root) {
    +     // delegate to init(ProcessorContext, StateStore)
    + }


    (new method) org.apache.kafka.streams.StreamsBuilder

    • These changes are fully backward compatible
    Code Block
    public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(
      final StoreBuilder storeBuilder,
      final String topic,
      final Consumed<KIn, VIn> consumed,
      final processor.api.ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier
    );


    (new method) org.apache.kafka.streams.Topology

    • These changes are fully backward compatible
    Code Block
    public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(
      final String name,
      final processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
      final String... parentNames
    );
    
    public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore(
      final StoreBuilder storeBuilder,
      final String sourceName,
      final Deserializer<KIn> keyDeserializer,
      final Deserializer<VIn> valueDeserializer,
      final String topic,
      final String processorName,
      final processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier,
    );
    
    public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore(
      final StoreBuilder storeBuilder,
      final String sourceName,
      final TimestampExtractor timestampExtractor,
      final Deserializer<KIn> keyDeserializer,
      final Deserializer<VIn> valueDeserializer,
      final String topic,
      final String processorName,
      final processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier,
    );


    (deprecation and new method) org.apache.kafka.streams.kstream.KStream.process

    Note that this API is a candidate for change in the future as a part of

    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-10603

    In the mean time, we will provide a migration path to the new PAPI. Since the KStreams.process currently does not allow forwarding, we will set the KOut and VOut parameters to Void, Void.

    Code Block
    // DEPRECATIONS:
    /*
    ...
    * @deprecated Since 3.0. Use {@link KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, java.lang.String...)} instead.
    */
    @Deprecated
    void process(
     org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,
     final String... stateStoreNames
    );
    
    */
    ...
    * @deprecated Since 3.0. Use {@link KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, org.apache.kafka.streams.kstream.Named, java.lang.String...)} instead.
    */
    @Deprecated
    void process(
     org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,
     Named named,
     String... stateStoreNames
    );
    
    // NEW METHODS:
    void process(
     ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
     String... stateStoreNames
    );
    
    void process(
     ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
     Named named,
     String... stateStoreNames
    );

    We will also do the same with the Scala API. Note that we depart from the typical scala-api pattern for suppliers (`()=>Processor`) and take a ProcessorSupplier, because otherwise the new and old methods will clash after type erasure.

    Also, we are taking the forwarding type as Void instead of Unit because it is not possible for the scala API implementation to convert a `ProcessorSupplier[K, V, Unit, Unit]` parameter to a `ProcessorSupplier[K, V, Void, Void]` argument to the java API. The only impact of this is that implementers would have to call forward with `forward(null, null)` instead of `forward((),())`. Since the actual intent is for implementers not to call forward at all, this seems like an inconsequential incongruity.

    Code Block
    // DEPRECATIONS:
    
    @deprecated(since = "3.0", message = "Use process(ProcessorSupplier, String*) instead.")
    def process(
     processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
     stateStoreNames: String*
    ): Unit
    
    @deprecated(since = "3.0", message = "Use process(ProcessorSupplier, String*) instead.")
    def process(
     processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
     named: Named,
     stateStoreNames: String*
    ): Unit
    
    // NEW METHODS
    def process(processorSupplier: ProcessorSupplier[K, V, Void, Void], stateStoreNames: String*): Unit
    
    def process(processorSupplier: ProcessorSupplier[K, V, Void, Void], named: Named, stateStoreNames: String*): Unit
    Code Block
    void process(
      final processor.api.ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier, 
      final String... stateStoreNames
    )
    
    void process(
      final processor.api.ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
      final Named named,
      final String... stateStoreNames
    )

    (unchanged) org.apache.kafka.streams.kstream.{Transformer, ValueTransformer, ValueTransformerWithKey}

    Just explicitly stating that the Transformer interfaces would not be changed at all. The generics case for Transformer is a little more complicated, and I'd like to give it the consideration it really deserves within the scope of https://issues.apache.org/jira/browse/KAFKA-8396 .

    This future work is tracked as

    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-10603

    (new class) (test-utils) org.apache.kafka.streams.processor.api.MockProcessorContext

    ...