Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The changes above would address the window size changes necessary for the serde to work properly. However, there is still inconsistency on how the ChangeLoggingWindowBytesStore.java serializes the changelog windowed key and how the TimeWindowedSerde deserializes this key. The serde calls from in WindowKeySchema.java, which doesn't decode the serialized changelog windowed key properly since it doesn't take the sequence number into account. 

We only want to encode the sequence number if retainDuplicate is true, which is only used for stream-stream join today, which we should fix in the long run. For now, we should let the window deserializer  deserializer be aware if it's deserializing a repartition topic with windowed key (in which case the toBinary was used to serialize), or if it's deserializing a changelog topic with windowed key (in which case the toStoreBinary was used to serialize).

...

Code Block
public TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSize, boolean isChangelogTopic) {
      this.inner = inner;
      this.windowSize = windowSize;
      this.isChangelogTopic = isChangelogTopic;
}


...


@Override
public Windowed<T> deserialize(final String topic, final byte[] data) {
    if (data == null || data.length == 0) {
        return null;
    }

    // toStoreKeyBinary was used to serialize the data.
    if (this.isChangelogTopic)
        return WindowKeySchema.fromStoreKey(data, windowSize, inner, topic);

    // toBinary was used to serialize the data
    return WindowKeySchema.from(data, windowSize, inner, topic);
}

...