Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public static <K> Windowed<K> from(final byte[] binaryKey, final long windowSize, final Deserializer<K> deserializer, final String topic) {
    final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE];
    System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
    final K key = deserializer.deserialize(topic, bytes); final Window window = extractWindow(binaryKey, windowSize);
    return new Windowed<>(key, window);
}

private static Window extractWindow(final byte[] binaryKey, final long windowSize) {
    final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
    final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
    return timeWindowForSize(start, windowSize);
}



/**
 * Safely construct a time window of the given size,
 * taking care of bounding endMs to Long.MAX_VALUE if necessary
 */
public static TimeWindow timeWindowForSize(final long startMs,
                                           final long windowSize) {
    final long endMs = startMs + windowSize;
    return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs);
}

...