...
The changes above would address the window size changes necessary for the serde to work properly. However, there is still inconsistency on how the ChangeLoggingWindowBytesStore.java serializes the changelog windowed key and how the TimeWindowedSerde deserializes this key. The serde calls from in WindowKeySchema.java, which doesn't decode the serialized changelog windowed key properly since it doesn't take the sequence number into account.
We only want to encode the sequence number if retainDuplicate
is true, which is only used for stream-stream join today, which we should fix in the long run. For now, we should let the window deserializer deserializer be aware if it's deserializing a repartition topic with windowed key (in which case the toBinary
was used to serialize), or if it's deserializing a changelog topic with windowed key (in which case the toStoreBinary
was used to serialize).
...
Code Block |
---|
public TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSize, boolean isChangelogTopic) {
this.inner = inner;
this.windowSize = windowSize;
this.isChangelogTopic = isChangelogTopic;
}
...
@Override
public Windowed<T> deserialize(final String topic, final byte[] data) {
if (data == null || data.length == 0) {
return null;
}
// toStoreKeyBinary was used to serialize the data.
if (this.isChangelogTopic)
return WindowKeySchema.fromStoreKey(data, windowSize, inner, topic);
// toBinary was used to serialize the data
return WindowKeySchema.from(data, windowSize, inner, topic);
} |
...