THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
public static <K> Windowed<K> from(final byte[] binaryKey, final long windowSize, final Deserializer<K> deserializer, final String topic) {
final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE];
System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
final K key = deserializer.deserialize(topic, bytes); final Window window = extractWindow(binaryKey, windowSize);
return new Windowed<>(key, window);
}
private static Window extractWindow(final byte[] binaryKey, final long windowSize) {
final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
return timeWindowForSize(start, windowSize);
}
/**
* Safely construct a time window of the given size,
* taking care of bounding endMs to Long.MAX_VALUE if necessary
*/
public static TimeWindow timeWindowForSize(final long startMs,
final long windowSize) {
final long endMs = startMs + windowSize;
return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs);
} |
...