Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For example usage, we will allow users to pass in the serdes in Consumed and only require users to pass in the inner serde inside consumed parameter, and a library can wrap it with the TimeWindowedSerde and the window size, and call forChangelog() explicitly if the input is a changelog topic:

Code Block
        // Wrap the user provided serde with the window serde, with the window size,
        // Note that this needs a new constructor and the helper method timeWindowedChangelogSerdeFrom defined.
        TimeWindowedSerde windowedSerde = WindowedSerdes.timeWindowedChangelogSerdeFromtimeWindowedSerdeFrom(consumed.keySerde, timeWindow.size()).forChangelog(true);

        final ConsumedInternal<K, V> consumedInternal =
                new ConsumedInternal<>(Consumed.with(windowedSerde, consumed.valueSerde()));

...

On TimeWindowedSerde, we will add an additional constructor (with isChangelogTopic set to false by default):

Code Block
public TimeWindowedSerde(final Serde<T> inner, final long windowSize) {
        super(new TimeWindowedSerializer<>(inner.serializer()), new TimeWindowedDeserializer<>(inner.deserializer(), windowSize));
}

...

Code Block
/**
 * Construct a {@code TimeWindowedSerde} object to deserialize changelog topic
 * for the specified inner class type and window size.
 */
static public <T> Serde<Windowed<T>> timeWindowedChangelogSerdeFromtimeWindowedSerdeFrom(final Class<T> type, final long windowSize) {
    return new TimeWindowedSerde<>(Serdes.serdeFrom(type), windowSize, truefalse);
}


Compatibility, Deprecation, and Migration Plan

...