Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under DiscussionAccepted

Discussion thread: here

Vote Discussion thread: here

JIRA: 

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10366

...

Code Block
languagejava
titleConsumerConfigStreamsConfig
linenumberstrue
/** <code>window.size.ms</code> */
public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
private static final String WINDOW_SIZE_MS_DOC = "Passes window size to the deserializer in order to calculate window end times. Default is Long.MAX_VALUE";


Deprecate the following methods

...

Code Block
languagejava
titleWindowedSerdeslinenumberstrueTimeWindowedSerde
@Deprecated
public TimeWindowedSerde(final Serde<T> inner) {
	super(new TimeWindowedSerializer<>(inner.serializer()), new TimeWindowedDeserializer<>(inner.deserializer()));
}

...

Code Block
languagejava
titleTimeWindowedDeserializerWindowedSerdes
linenumberstrue
@Deprecated
public TimeWindowedDeserializer(final Deserializer<T> inner) {
	this(inner, Long.MAX_VALUE);
}

/**
* Construct a {@code TimeWindowedSerde} object for the specified inner class type.
*/
@Deprecated
static public <T> Serde<Windowed<T>> timeWindowedSerdeFrom(final Class<T> type) {
	return new TimeWindowedSerde<>(Serdes.serdeFrom(type));
}


Code Block
languagejava
titleTimeWindowedDeserializer
linenumberstrue
@Deprecated
public TimeWindowedDeserializer(final Deserializer<T> inner) {
	this(inner, Long.MAX_VALUE);
}


Log an error message in Serdes.scala for implicit def timeWindowedSerde and pass through to the deserializer

Code Block
languagescala
titleSerdes
// Log error message and pass through to underlying deserializer 
implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.TimeWindowedSerde[T] =
  new WindowedSerdes.TimeWindowedSerde[T](tSerde)


Proposed Changes

Add window.size.ms to the StreamsConfigs class to ensure that the desired window size is set when the consumer creates the TimeWindowedDeserializer instance. This is relevant for consuming TimeWindowed records directly, which is helpful when testing windowed aggregations. Without this change, all windows will have a size and end time of Long.MAX_VALUE. Users can either set the window size through the config or set the window size through the constructor. To help enforce this, the window.size.ms will not have a default value.

Additionally, deprecate constructors in both TimeWindowedDeserializer and WindowedSerdes that don't require a window size. This ensures that the window size would be properly set before encountering any instances where it should be used. When these are deprecated, the timeWindowedSerde implicit in Serdes.scala will be updated to log an error message explaining that this implicit should no longer be used. To keep backwards compatibility, this implicit won't be deprecated but will pass through to the deserializer.

Note that to use the windows.size.ms config through the console consumer (if desired), add the key.deserializer prefix and pass it in as a property.

...