Table of Contents |
---|
Current state: AcceptedAdopted
Discussion thread:
JIRA: KAFKA-12313
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-659 introduced a config windowSize to TimeWindowedDeserializer which discourages the setting of windowSize to Long.MAX_VALUE as the default value. However, there is still a possibility of setting the config only for innerClassSerde or only for windowSize. This KIP aims at standardising the way the deserializer the serialisers and deserializer object can be created for the usage in console consumer, plain consumer and via the DSL.
Proposed Changes
Here are the changes being proposed in the KIP:
- StreamsConfig
- Deprecate default.windowed.key.serde.inner and default.windowed.value.serde.inner.
- Introduce a new config called windowwindowed.inner.class.deserializerserde. This way, the config comes closer to the window.size.ms config introduced in KIP-659 and it also emphasises that the config isn't really a default one. Also, as per the proposed changes below, this config is to be used really from the console consumer so we will use Deserialiser class here directly instead of a Serde. Note that setting this config from KafkaStreams application would lead to an error being thrown as this config is to be used only from plain Consumer Client.
- ConsoleConsumer
- It would be mandatory to pass windowwindowed.inner.class.deserializerserde and window.size.ms ms config.
- The following table discusses the proposed changes for the Windowed Serdes and proposed changes based on Client Type.
Windowed Serde Type Client Type Constructor Invoked Proposed Change 1 TimeWindowedDeserializer
- If the users want to run Console consumer for testing purposes, then it will invoke the default constructor of TimeWindowedDeserializer. We will ensure that both the configs (windowSize and deserialiser class) are set and if they aren't then an error would be thrown.
- For any other plain consumer client, the user can pass them in as configs OR pass the parameters to the TimeWindowedDeserializer constructor, and then pass that object to the consumer. It is ok for the user to pass both as long as there is no conflict between the params passed via constructor and via the configs.
- For use in Kafka Streams (such as the DSL), the user must supply the parameters by constructing a TimeWindowedSerde and passing that in as a parameter to any relevant DSL operators. This is already supported so no change is needed here.
Console Consumer | Default constructor | Ensure both windowSize and Serde class configs are set. Throw error if not. | ||
Plain Consumer | Default OR Parameterised | Ok to use either as long as the configs supplied don't conflict. | ||
Kafka Streams | Parameterised | User supplies parameters by constructing a TimeWindowedSerde object. No change | ||
2 | TimeWindowedSerializer | Console Producer | Default constructor | Ensure Serde class config is set. Throw error if not. |
Plain Producer | Default OR Parameterised | Ok to use either as long as the configs supplied don't conflict. | ||
Kafka Streams | Parameterised | User supplies parameters by constructing a TimeWindowedSerde object. No change | ||
3 | SessionWindowedDeserializer | Console consumer | Default constructor | Ensure Serde class config is set. Throw error if not. |
Plain consumer | Default OR Parameterised | Ok to use either as long as the configs supplied don't conflict. | ||
Kafka Streams | Parameterised | User supplies parameters by constructing a SessionWindowedSerde object. No change | ||
4 | SessionWindowedSerializer | Console Producer | Default constructor | Ensure Serde class config is set. Throw error if not. |
Plain Producer | Default OR Parameterised | Ok to use either as long as the configs supplied don't conflict. | ||
Kafka Streams | Parameterised | User supplies parameters by constructing a SessionWindowedSerde object. No change |
Public Interfaces
- StreamsConfig:
- Deprecate default.windowed.key.serde.inner and default.windowed.value.serde.inner in StreamConfig.
- Introduce a new config called
- windowed.inner.class.
- serde.
Code Block language java theme Midnight public static final String
WINDOWED_INNER_CLASS_
SERDE = "
windowed.inner.class.
serde"; private static final String
WINDOWED_INNER_CLASS_
SERDE_DOC = "
Serde for the inner class of a windowed record. Must implement the " + "<code>org.apache.kafka.common.serialization.
Serde</code> interface. Note that setting this config in KafkaStreams application would result " + "in
an error as it is meant to be used only from Plain consumer client."; @Deprecated public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner"; @Deprecated public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner";
- Deprecate default.windowed.key.serde.inner and default.windowed.value.serde.inner in StreamConfig.
Compatibility, Deprecation, and Migration Plan
This KIP deprecates following 2 configs in StreamConfig : default.windowed.key.serde.inner and default.windowed.value.serde.inner
Rejected Alternatives
N/A