Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: Not available

...

Currently the TimeWindowedSerde does not deserialize the windowed keys from a changelog topic properly. There are a few assumptions made in the TimeWindowedDeserializer that prevents the changelog windowed keys from being correctly deserialized. There is inconsistency on how the ChangeLoggingWindowBytesStore.java serializes the changelog windowed key and how the TimeWindowedSerde deserializes this key. The serde calls from in WindowKeySchema.java, which doesn't decode the serialized changelog windowed key properly since it doesn't take the sequence number into account. 


1) In the from method of WindowKeySchema (called in deserialize in TimeWindowedDeserializer), we extract the window from the binary key, but we call getLong(binaryKey.length -TIMESTAMP_SIZE). However, the changelog for ChangeLoggingWindowBytesStore will log the windowed key as:

...

Code Block
final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);

with the seqnum sequence number (used for de-duping). So the eventual result is that when we deserialize, we do not assume the windowed changelog key has a seq_numsequence number, and the window extracted will be gibberish since the bytes won't be alligned.

...

Code Block
public static <K> Windowed<K> from(final byte[] binaryKey, final long windowSize, final Deserializer<K> deserializer, final String topic) {
    final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE];
    System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
    final K key = deserializer.deserialize(topic, bytes); final Window window = extractWindow(binaryKey, windowSize);
    return new Windowed<>(key, window);
}

private static Window extractWindow(final byte[] binaryKey, final long windowSize) {
    final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
    final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
    return timeWindowForSize(start, windowSize);
}

/**
 * Safely construct a time window of the given size,
 * taking care of bounding endMs to Long.MAX_VALUE if necessary
 */
public static TimeWindow timeWindowForSize(final long startMs,
                                           final long windowSize) {
    final long endMs = startMs + windowSize;
    return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs);
}

Public Interfaces

The current proposed windowed KTable API in KIP-300 looks like the following:

Code Block
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<Windowed<K>, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);

...

Proposed Changes

The current TimeWindowedSerde constructor:

Code Block
public TimeWindowedSerde(final Serde<T> inner) {
            super(new TimeWindowedSerializer<>(inner.serializer()), new TimeWindowedDeserializer<>(inner.deserializer()));
}

Overloading this constructor would allow us to pass in an explicit time window size without changing the existing constructor.

Proposed Changes


We will allow users to pass in the serdes in Consumed, instead, only requires users to pass in the inner serde inside consumed parameter, and the library can wrap it with the TimeWindowedSerde and the In WindowedSerdes, we add a new method to return a TimeWindowSerde with configurable window size:

Code Block
static public synchronized<T> <K, V> KTable<Windowed<K>, V> timeWindowedKTableSerde<Windowed<T>> timeWindowedSerdeFrom(final StringClass<T> topictype, final Consumed<K, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized, final TimeWindows timeWindow) {

        // Wrap the user provided serde with the window serde, with the window size,
        // Note that this needs a new constructor and the helper method timeWindowedChangelogSerdeFrom defined.
        TimeWindowedSerde windowedSerde = WindowedSerdes.timeWindowedChangelogSerdeFrom(consumed.keySerde, timeWindow.size());

        final ConsumedInternal<K, V> consumedInternal =
                new ConsumedInternal<>(Consumed.with(windowedSerde, consumed.valueSerde()));

        return internalStreamsBuilder.windowTable(topic, consumedInternal, materializedInternal);
    }
long windowSize)


In TimeWindowedSerde, we will add an additional constructor (with an internal boolean field, isChangelogTopic, set to false by default in the deserializer), as well as a helper method, forChangelog to set the isChangelog flag. We introduce a new public method forChangelog, for users to explicitly set whether the input topic is a changelog topic or not so that windowed keys in a changelog topic could be deserialized properly. If users do not call forChangelog on a changelog input topic type, the windowed keys extracted will be invalid due to inconsistency in how they were serialized.On TimeWindowedSerde, we will add an additional constructor:

Code Block
public TimeWindowedSerde(final Serde<T> inner, final long windowSize) {
            super(new TimeWindowedSerializer<>(inner.serializer()), new TimeWindowedDeserializer<>(inner.deserializer(), windowSize));
}

The changes above would address the window size changes necessary for the serde to work properly. However, there is still inconsistency on how the ChangeLoggingWindowBytesStore.java serializes the changelog windowed key and how the TimeWindowedSerde deserializes this key. The serde calls from in WindowKeySchema.java, which doesn't decode the serialized changelog windowed key properly since it doesn't take the sequence number into account. 

We should let the window deserializer be aware if it's deserializing a repartition topic with windowed key (in which case the toBinary was used to serialize), or if it's deserializing a changelog topic with windowed key (in which case the toStoreBinary was used to serialize).

We introduce a new helper method fromStoreKey in WindowKeySchema to extract the store key:

Code Block
public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey, final long windowSize, final Deserializer<K> deserializer, final String topic) {
        final K key = deserializer.deserialize(topic, extractStoreKeyBytes(binaryKey));
        final Window window = extractStoreWindow(binaryKey, windowSize);
        return new Windowed<>(key, window);
}

extractStoreWindow will take care of extracting the changelog windowed key and return a TimeWindow.



// Helper method for advanced users who want to read from a changelog windowed topic
TimeWindowedSerde forChangelog(final boolean);


For example usage, we will allow users to pass in the serdes in Consumed and only require users to pass in the inner serde inside consumed parameter, and a library can wrap it with the TimeWindowedSerde and the window size, and call a method, forChangelog() explicitly, if the input is a changelog topic:

Code Block
// Wrap the user provided serde with the window serde, with the window size, and set the changelog input topic type.
TimeWindowedSerde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(consumed.keySerde, timeWindow.size()).forChangelog(true);

final ConsumedInternal<K, V> consumedInternal =
      new ConsumedInternal<>(Consumed.with(windowedSerde, consumed.valueSerde()));


One remaining step is to let the TimeWindowedDeserializer be aware of the input topic type by creating a new constructor, and define a helper method in WindowedSerdes.java to return the serde back to the windowed KTable API. Below is the proposed constructor and helper method addition.change the deserialize method to be aware of the changelog topic boolean flag.

Code Block
public TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSize, boolean isChangelogTopic) {
      this.inner = inner;
      this.windowSize = windowSize;
      this.isChangelogTopic = isChangelogTopic;
}

@Override
public Windowed<T> deserialize(final String topic, final byte[] data) {
    if (data == null || data.length == 0) {
        return null;
    }

	...
    // toStoreKeyBinary was used to serialize the data.
    if (this.isChangelogTopic)
        return WindowKeySchema.fromStoreKey(data, windowSize, inner, topic);

    // toBinary was used to serialize the data
    return WindowKeySchema.from(data, windowSize, inner, topic);
}
Code Block
/**
 * Construct a {@code TimeWindowedSerde} object for the specified inner class type.
 */
static public <T> Serde<Windowed<T>> timeWindowedSerdeFrom(final Class<T> type) {
    return new TimeWindowedSerde<>(Serdes.serdeFrom(type));
}
	...
}



Compatibility, Deprecation, and Migration Plan

...