Status
Current state: Under Discussion
Discussion thread: TBD
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Today windowed serdes are kept in internal packages currently, and hence for anyone trying to consume the topics writing by a windowed table, they need to implement their own serde implementations in order to read windowed keys.
In addition, for windowed serde they require inner serdes to deserialize the embedded data type, which is not supported right now.
To help on the user experience, it's better extracting the internal windowed serdes into the public package, and at the same time enable console consumer to set the window deserializers with the inner class.
Public Interfaces
This KIP would propose to add the following new config names into StreamsConfig
and mention in the default serde configs that users should set them for window serdes:
/** * {@code default.windowed.key.serde.inner} */ public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner"; /** * {@code default.windowed.value.serde.inner} */ public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner"; /** {@code default key.serde} */ /** {@code default value.serde} */ ... + "Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via '" + DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "' or '" + DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS + "' as well";
And add the following class into the o.a.k.streams.kstream package :
public class WindowedSerdes { static public class TimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>; static public class SessionWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>; /** * Construct a {@code TimeWindowedSerde} object for the specified inner class type. */ static public <T> Serde<Windowed<T>> timeWindowedSerdeFrom(final Class<T> type); /** * Construct a {@code SessionWindowedSerde} object for the specified inner class type. */ static public <T> Serde<Windowed<T>> sessionWindowedSerdeFrom(final Class<T> type); } /** * The inner serde class can be specified by setting the property * {@link StreamsConfig#DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS} or * {@link StreamsConfig#DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS} * if the no-arg constructor is called and hence it is not passed during initialization. */ public class TimeWindowedSerializer<T> implements Serializer<Windowed<T>>; public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>>; public class SessionWindowedSerializer<T> implements Serializer<Windowed<T>>; public class SessionWindowedDeserializer<T> implements Deserializer<Windowed<T>>;
And in ConsoleConsumer
class, add the new StreamsConfigs names into its options:
val innerKeyDeserializerOpt = parser.accepts(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS) .withRequiredArg .describedAs("inner serde for key when windowed deserialier is used; would be ignored otherwise") .ofType(classOf[String]) val innerValueDeserializerOpt = parser.accepts(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS) .withRequiredArg .describedAs("inner serde for value when windowed deserialier is used; would be ignored otherwise") .ofType(classOf[String])
Note that for all windowed serdes, the inner serde can be either passed in during construction, or be configured in the configure
call via the added inner serde classes.
Proposed Changes
As above, no inernal implementations would be affected.
Compatibility, Deprecation, and Migration Plan
This KIP only add new APIs and configs and hence should not affect any existing users.
Rejected Alternatives
None