You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion threadTBD

JIRA

key summary type created updated due assignee reporter priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Today windowed serdes are kept in internal packages currently, and hence for anyone trying to consume the topics writing by a windowed table, they need to implement their own serde implementations in order to read windowed keys.

In addition, for windowed serde they require inner serdes to deserialize the embedded data type, which is not supported right now.

 

To help on the user experience, it's better extracting the internal windowed serdes into the public package, and at the same time enable console consumer to set the window deserializers with the inner class.

 

Public Interfaces

This KIP would propose to add the following new config names into StreamsConfig and mention in the default serde configs that users should set them for window serdes:

/**
 * {@code default.windowed.key.serde.inner}
 */
public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner";

/**
 * {@code default.windowed.value.serde.inner}
 */
public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner";
 
/** {@code default key.serde} */
/** {@code default value.serde} */
...
        + "Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via '"
        + DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "' or '" + DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS + "' as well";

 

And add the following class into the o.a.k.streams.kstream package :

 

public class WindowedSerdes {

    static public class TimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>;

    static public class SessionWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>;

    /**
     * Construct a {@code TimeWindowedSerde} object for the specified inner class type.
     */
    static public <T> Serde<Windowed<T>> timeWindowedSerdeFrom(final Class<T> type);

    /**
     * Construct a {@code SessionWindowedSerde} object for the specified inner class type.
     */
    static public <T> Serde<Windowed<T>> sessionWindowedSerdeFrom(final Class<T> type);
}
 
 
/**
 *  The inner serde class can be specified by setting the property
 *  {@link StreamsConfig#DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS} or
 *  {@link StreamsConfig#DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS}
 *  if the no-arg constructor is called and hence it is not passed during initialization.
 */
public class TimeWindowedSerializer<T> implements Serializer<Windowed<T>>;
 
public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>>;


public class SessionWindowedSerializer<T> implements Serializer<Windowed<T>>;

public class SessionWindowedDeserializer<T> implements Deserializer<Windowed<T>>;


And in ConsoleConsumer class, add the new StreamsConfigs names into its options:

 

val innerKeyDeserializerOpt = parser.accepts(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS)
      .withRequiredArg
      .describedAs("inner serde for key when windowed deserialier is used; would be ignored otherwise")
      .ofType(classOf[String])


val innerValueDeserializerOpt = parser.accepts(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS)
      .withRequiredArg
      .describedAs("inner serde for value when windowed deserialier is used; would be ignored otherwise")
      .ofType(classOf[String])


Note that for all windowed serdes, the inner serde can be either passed in during construction, or be configured in the configure call via the added inner serde classes.

 

Proposed Changes

As above, no inernal implementations would be affected.

 

Compatibility, Deprecation, and Migration Plan

This KIP only add new APIs and configs and hence should not affect any existing users.

 

Rejected Alternatives

None

  • No labels