Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public static <K> Windowed<K> from(final byte[] binaryKey, final long windowSize, final Deserializer<K> deserializer, final String topic) {
    final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE];
    System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
    final K key = deserializer.deserialize(topic, bytes); final Window window = extractWindow(binaryKey, windowSize);
    return new Windowed<>(key, window);
}

private static Window extractWindow(final byte[] binaryKey, final long windowSize) {
    final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
    final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
    return timeWindowForSize(start, windowSize);
}

/**
 * Safely construct a time window of the given size,
 * taking care of bounding endMs to Long.MAX_VALUE if necessary
 */
public static TimeWindow timeWindowForSize(final long startMs,
                                           final long windowSize) {
    final long endMs = startMs + windowSize;
    return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs);
}

...

Proposed Changes

The current TimeWindowedSerde constructor:

...

Overloading this constructor would allow us to pass in an explicit time window size without changing the existing constructor.

...


We will allow users to pass in the serdes in Consumed , instead, only requires and only require users to pass in the inner serde inside consumed parameter, and the library can wrap it with the TimeWindowedSerde and the window size:

Code Block
public synchronized <K, V> KTable<Windowed<K>, V> timeWindowedKTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized, final TimeWindows timeWindow) {

        // Wrap the user provided serde with the window serde, with the window size,
        // Note that this needs a new constructor and the helper method timeWindowedChangelogSerdeFrom defined.
        TimeWindowedSerde windowedSerde = WindowedSerdes.timeWindowedChangelogSerdeFrom(consumed.keySerde, timeWindow.size());

        final ConsumedInternal<K, V> consumedInternal =
                new ConsumedInternal<>(Consumed.with(windowedSerde, consumed.valueSerde()));

        return internalStreamsBuilder.windowTable(topic, consumedInternal, materializedInternal);
    }


On TimeWindowedSerde, we will add an additional constructor:

...

One remaining step is to let the TimeWindowedDeserializer be aware of the input topic type by creating a new constructor, and define a helper method in WindowedSerdes.java to return the serde back to the windowed KTable APIcaller. Below is the proposed constructor and helper method addition.

...