Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents
Status

Current state:  "DISCUSS"Adopted

Discussion thread

JIRA: KAFKA-12313

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

KIP-659 introduced a config windowSize to TimeWindowedDeserializer which discourages the setting of windowSize to Long.MAX_VALUE as the default value. However, there is still a possibility of setting the config only for innerClassSerde or only for windowSize. This KIP aims at standardising the way

...

the serialisers and deserializer object can be created for the usage in console consumer, plain consumer and via the DSL. 

...

Proposed Changes

Here are the changes being proposed in the KIP:

...

  1. StreamsConfig
    1. Deprecate 
    1. default.windowed.key.serde.inner

...

    1.  and default.windowed.

...

    1. value.serde.inner

...

    1. Introduce a new config called windowed.

...

    1. inner.class

...

    1. .serde. This way, the config comes closer to the window.size.ms config introduced in KIP-659 and it also emphasises that the config isn't really a default one.

...

  1. ConsoleConsumer
    1. It would be mandatory to pass windowed.inner.class.serde and window.size.ms config. 
  2. The following table discusses the proposed changes for the Windowed Serdes and proposed changes based on Client Type.



    Windowed Serde TypeClient TypeConstructor InvokedProposed Change
    1

    TimeWindowedDeserializer

    Console ConsumerDefault constructorEnsure both windowSize and Serde class configs are set. Throw error if not.
    Plain ConsumerDefault OR ParameterisedOk to use either as long as the configs supplied don't conflict.
    Kafka StreamsParameterisedUser supplies parameters by constructing a TimeWindowedSerde object. No change
    2TimeWindowedSerializerConsole ProducerDefault constructorEnsure Serde class config is set. Throw error if not.
    Plain ProducerDefault OR ParameterisedOk to use either as long as the configs supplied don't conflict.
    Kafka StreamsParameterisedUser supplies parameters by constructing a TimeWindowedSerde object. No change
    3SessionWindowedDeserializerConsole consumerDefault constructorEnsure Serde class config is set. Throw error if not.
    Plain consumerDefault OR ParameterisedOk to use either as long as the configs supplied don't conflict.
    Kafka StreamsParameterisedUser supplies parameters by constructing a SessionWindowedSerde object. No change
    4SessionWindowedSerializerConsole ProducerDefault constructorEnsure Serde class config is set. Throw error if not.
    Plain ProducerDefault OR ParameterisedOk to use either as long as the configs supplied don't conflict.
    Kafka StreamsParameterisedUser supplies parameters by constructing a SessionWindowedSerde object. No change


Public Interfaces

  • StreamsConfig:

      ...

      Proposed Changes

      Here are the changes being proposed in the KIP:

      • Rename the default.windowed.key.serde.inner to windowed.deserializer.inner.class in StreamConfig.
      • Deprecate default.windowed.value.serde.inner in StreamConfig.
      • If the users want to run Console consumer for testing purposes, then it will invoke the default constructor of TimeWindowedDeserializer. We will ensure that both the configs (windowSize and deserialiser class) are set and if they aren't then an error would be thrown. 
      • For any other plain consumer client, the user can pass them in as configs OR pass the parameters to the TimeWindowedDeserializer constructor, and then pass that object to the consumer. It is ok for the user to pass both as long as there is no conflict between the params passed via constructor and via the configs.
      • For use in Kafka Streams (such as the DSL), the user must supply the parameters by constructing a TimeWindowedSerde and passing that in as a parameter to any relevant DSL operators. This is already supported so no change is needed here.
      • Similar to the checks added in KIP-659 to validate the windownSize config, a check would be added for deserialiserInnerClass. This would ensure that it has only 1 valid value.

      Public Interfaces

      • StreamsConfig:
        • Deprecate 
        • Rename default.windowed.key.serde.innerto  and default.windowed.deserializervalue.serde.inner in StreamConfig.class.Deprecate  
          • Introduce a new config called windowed.inner.class
          default.windowed.value
          • .serde.
          inner.
          •  


            Code Block
            languagejava
            themeMidnight
                public static final String WINDOWED
          _DESERIALISER
          • _INNER_CLASS_SERDE = "windowed
          .deserializer
          • .inner.class.serde";
                private static final String WINDOWED
          _DESERIALISER
          • _INNER_CLASS_SERDE_DOC = " 
          Deserializer
          • Serde for the inner class of a windowed 
          key
          • record. Must implement the " +
                    "<code>org.apache.kafka.common.serialization.
          Deserializer<
          • Serde</code> interface. Note that setting this config in KafkaStreams application would result " + 
            		"in an error as it is meant to be used only from Plain consumer client.";
            
            	@Deprecated
            	public static final String DEFAULT_WINDOWED_
          VALUE
          • KEY_SERDE_INNER_CLASS = "default.windowed.
          value
          • key.serde.inner";
      • ConsoleConsumer
        • It would be mandatory to pass windowed.deserializer.inner.class and window.size.ms config. <Need to check how to do this>
      • TimeWindowedDeserializer
        • Inside configure method, ensure that innerClassDeserialiser passed via constructor and the config are not conflicting. What it means is that:
          • If both inner and windowedDeserialiserInnerClass are set, then they should be the pointing to the same class.
          • Atleast one of inner and windowedDeserialiserInnerClass are set.
          • Code Block
            languagejava
            themeMidnight
                if (innerClassDeserializer != null && configInnerClassDeserializer != null) { 
            		if (innerClassDeserializer != configInnerClassDeserializer)
            			throw new IllegalArgumentException("Inner class deserializer passed via constructor and config windowed.deserializer.inner.class should match"); 
            	} else if (innerClassDeserializer == null && configInnerClassDeserializer == null) {
            		throw new IllegalArgumentException("Inner class deserializer should be passed either via constructor or via "); 
            	}
            
            	@Deprecated
            	public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner";


      Compatibility, Deprecation, and Migration Plan

      Since This KIP deprecates following 2 configs in StreamConfig : default.windowed.key.serde.innerconfig is being rename to windowed.deserializer.inner.class in StreamConfig and also  and default.windowed.value.serde.inner is being deprecated, we will have to follow a deprecation path for the same. We can maybe enforce the removal of the deprecated configs and then enforce users?inner

      Rejected Alternatives

      N/A