Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Overloading this constructor would allow us to pass in an explicit time window size without changing the existing constructor.


In WindowedSerdes:

Code Block
static public <T> Serde<Windowed<T>> timeWindowedSerdeFrom(final Class<T> type, final long windowSize)



For example usage, we will allow users to pass in the serdes in Consumed and only require users to pass in the inner serde inside consumed parameter, and a library can wrap it with the TimeWindowedSerde and the window size, and call forChangelog() explicitly if the input is a changelog topic:

Code Block
        // Wrap the user provided serde with the window serde, with the window size,
        // Note that this needs a new constructor and the helper method timeWindowedChangelogSerdeFrom defined.
        and set the changelog input topic type.
TimeWindowedSerde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(consumed.keySerde, timeWindow.size()).forChangelog(true);

        final ConsumedInternal<K, V> consumedInternal =
                new ConsumedInternal<>(Consumed.with(windowedSerde, consumed.valueSerde()));

...

On TimeWindowedSerde, we will add an additional constructor (with isChangelogTopic set to false by default in the deserializer), as well as a helper method to set the isChangelog flag:

Code Block
public TimeWindowedSerde(final Serde<T> inner, final long windowSize) {
        super(new TimeWindowedSerializer<>(inner.serializer()), new TimeWindowedDeserializer<>(inner.deserializer(), windowSize));
}


// Add a helper method for advanced users who want to read from a changelog windowed topic
TimeWindowedSerde forChangelog(final boolean);


The changes above would address the window size changes necessary for the serde to work properly. However, there is still inconsistency on how the ChangeLoggingWindowBytesStore.java serializes the changelog windowed key and how the TimeWindowedSerde deserializes this key. The serde calls from in WindowKeySchema.java, which doesn't decode the serialized changelog windowed key properly since it doesn't take the sequence number into account. 

...

One remaining step is to let the TimeWindowedDeserializer be aware of the input topic type by creating a new constructor, and define a helper method in WindowedSerdes.java to return the serde back to the caller. Below is the proposed constructor and helper method additionchange the deserialize method to be aware of the changelog topic boolean flag.

Code Block
public TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSize, boolean isChangelogTopic) {
      this.inner = inner;
      this.windowSize = windowSize;
      this.isChangelogTopic = isChangelogTopic;
}

@Override
public Windowed<T> deserialize(final String topic, final byte[] data) {
    if (data == null || data.length == 0) {
 	...
       return null;
    }

    // toStoreKeyBinary was used to serialize the data.
    if (this.isChangelogTopic)
        return WindowKeySchema.fromStoreKey(data, windowSize, inner, topic);

    // toBinary was used to serialize the data
    return WindowKeySchema.from(data, windowSize, inner, topic);
}
Code Block
/**
 * Construct a {@code TimeWindowedSerde} object to deserialize changelog topic
 * for the specified inner class type and window size.
 */
static public <T> Serde<Windowed<T>> timeWindowedSerdeFrom(final Class<T> type, final long windowSize) {
    return new TimeWindowedSerde<>(Serdes.serdeFrom(type), windowSize, false);	...
}



Compatibility, Deprecation, and Migration Plan

...