THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
For example usage, we will allow users to pass in the serdes in Consumed and
only require users to pass in the inner serde inside consumed
parameter, and a library can wrap it with the TimeWindowedSerde and the window size, and call forChangelog() explicitly if the input is a changelog topic:
Code Block |
---|
// Wrap the user provided serde with the window serde, with the window size, // Note that this needs a new constructor and the helper method timeWindowedChangelogSerdeFrom defined. TimeWindowedSerde windowedSerde = WindowedSerdes.timeWindowedChangelogSerdeFromtimeWindowedSerdeFrom(consumed.keySerde, timeWindow.size()).forChangelog(true); final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(Consumed.with(windowedSerde, consumed.valueSerde())); |
...
On TimeWindowedSerde
, we will add an additional constructor (with isChangelogTopic set to false by default):
Code Block |
---|
public TimeWindowedSerde(final Serde<T> inner, final long windowSize) {
super(new TimeWindowedSerializer<>(inner.serializer()), new TimeWindowedDeserializer<>(inner.deserializer(), windowSize));
} |
...
Code Block |
---|
/** * Construct a {@code TimeWindowedSerde} object to deserialize changelog topic * for the specified inner class type and window size. */ static public <T> Serde<Windowed<T>> timeWindowedChangelogSerdeFromtimeWindowedSerdeFrom(final Class<T> type, final long windowSize) { return new TimeWindowedSerde<>(Serdes.serdeFrom(type), windowSize, truefalse); } |
Compatibility, Deprecation, and Migration Plan
...