Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Window size is not passed to the Kafka consumer during windowed aggregations, so the consumer uses the default Long.MAX_VALUE when initializing the deserializer. Both TimeWindowedSerde and TimeWindowedDeserializer have a default constructor that does not require a window size, causing fatal errors at runtime for some programs. 

Public Interfaces

In ConsumerConfigStreamsConfig.java add window.size.ms

Code Block
languagejava
titleConsumerConfig
linenumberstrue
/** <code>window.size.ms</code> */
public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
private static final String WINDOW_SIZE_MS_DOC = "Passes window size to the deserializer in order to calculate window end times. Default is Long.MAX_VALUE";

...

Proposed Changes

Add window.size.ms to the ConsumerConfigs StreamsConfigs class to ensure that the desired window size is set when the consumer creates the TimeWindowedDeserializer instance. This is relevant for consuming TimeWindowed records directly, which is helpful when testing windowed aggregations. Without this change, all windows will have a size and end time of Long.MAX_VALUE.

...