You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion"

Discussion thread: here

JIRA: here and here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When doing windowed aggregations the window size is not passed to the Kafka consumer, as there is no config for window size, so when the consumer creates a TimeWindowedDeserializer, it uses the default Long.MAX_VALUE, resulting in imprecise testing output. Both TimeWindowedSerde and TimeWindowedDeserializer have a default constructor that does not require a window size, causing fatal errors at runtime for some programs. 


Public Interfaces

In ConsumerConfig.java add WINDOW_SIZE_MS

ConsumerConfig
/** <code>window.size.ms</code> */
public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
private static final String WINDOW_SIZE_MS_DOC = "Passes window size to the deserializer in order to calculate window end times. Default is Long.MAX_VALUE";
WindowedSerdes
@Deprecated
public TimeWindowedSerde(final Serde<T> inner) {
	super(new TimeWindowedSerializer<>(inner.serializer()), new TimeWindowedDeserializer<>(inner.deserializer()));
}


TimeWindowedDeserializer
@Deprecated
public TimeWindowedDeserializer(final Deserializer<T> inner) {
	this(inner, Long.MAX_VALUE);
}

/**
* Construct a {@code TimeWindowedSerde} object for the specified inner class type.
*/
@Deprecated
static public <T> Serde<Windowed<T>> timeWindowedSerdeFrom(final Class<T> type) {
	return new TimeWindowedSerde<>(Serdes.serdeFrom(type));
}


Proposed Changes

Add window.size.ms to the ConsumerConfigs class and ensure that window size is initialized when the consumer creates the TimeWindowedDeserializer instance. This is relevant for consuming TimeWindowed records directly, which is particularly helpful for testing windowed aggregations. Additionally, deprecate constructors in both TimeWindowedDeserializer and WindowedSerdes that don't require a window size. This ensures that the window size would be properly set before encountering any instances where it should be used.

Note that to use the windows.size.ms config through the console consumer (if desired), add the key.deserializer. prefix and pass it in as a property.

Compatibility, Deprecation, and Migration Plan

Tests that need to be updated:

Classes that need to be updated:

Rejected Alternatives


  • No labels