Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under DiscussionAdopted (3.0.0)

Discussion thread: here

JIRA: KAFKA-10062

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Testing with the TopologyTestDriver will lead to a time mismatch between The motivation is to source the internally cached system timestamp from the Kafka Stream runtime. A generic example would be if you want to execute an action when a message arrives based on the current time and then potentially some time later. Currently, if you want to implement a solution that uses wall-clock time and system time. This is because you can programmatically advance the wall-clock using advanceWallClockTime. The goal is to give the consumer the internal system time instead of them having to rely on System.currentTimeMillis(). The same is also true for stream time.

...

performs actions based on it, you would use System.currentTimeMillis(). The problem is that tests have no control over this value because it is using 'MockTime'. 

For stream time, the time is advanced before processing a new record that has a time higher than the currently known stream time. Therefore, it is possible that the ProcessorContext#timestamp may be earlier than the streamTime if the records not processed in order by time. It is simply not reliable to use solely timestamp and System.currentTimeMillis() for actions based on time.

Public Interfaces

Update:

This KIP overlaps with KIP-478 and we also added the new methods currentSystemTimeMs() and currentStreamTimeMs() to api.ProcessorContext in 3.2.0 release.

Update (2022-04-14):

As part of KIP-820 implementation, we are adding the new methods currentSystemTimeMs() and currentStreamTimeMs() to api.MockProcessorContext as well for completeness.


Add new public API for currentSystemTimeMs:

...

Code Block
languagejava
    /**	
     * ReturnsReturn the current system timestamp (also called wall-clock time) in milliseconds.
     *
     * <p>
     * Note: this method returns the internally cached system timestamp in milliseconds.
	 from the Kafka Stream runtime.
     * Thus, it may return a different value compared to {@code System.currentTimeMillis()}
     * <p>
     *
     * @return the current wall-clock system timestamp in milliseconds
     */
	long currentSystemTimeMs();

...

Code Block
languagejava
	/**	
     * Return the current stream-time of this partition group defined as the largest timestamp seen across all partitions.
	 * in milliseconds.
     *
     * <p>
     * Stream-time is the maximum observed {@link TimestampExtractor record timestamp} so far
     * (including the currently processed record), i.e., it can be considered a high-watermark.
     * Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration.
     * <p>
     *
     * Note: this method is not supported for global processors (cf. {@link Topology#addGlobalStore} (...)
     * and {@link StreamsBuilder#addGlobalStore} (...),
     * @return the because there is no concept of stream-time of this partition group defined as the largest timestamp seen across all partitions for this case.
     * Calling this method in a global processor will result in an {@link UnsupportedOperationException}.
     *
     * @return the current stream-time in milliseconds
     */
	long currentStreamTimeMs();

Proposed Changes

The goal is to create two separate public API's that return the current system time in milliseconds and the current stream time in milliseconds.

currentSystemTimeMs

...


Add new methods to MockProcessorContext for testing purposes:

org.apache.kafka.streams.processor.

...

MockProcessorContext#setRecordTimestamp

This method sets record timestamp.

Code Block
languagejava
	public void setRecordTimestamp(final long recordTimestamp) {
        this.recordTimestamp = recordTimestamp;
    }

...

org.apache.kafka.streams.processor.

...

MockProcessorContext#setCurrentSystemTimeMs

This method sets system timestamp.

Code Block
languagejava
	public void setCurrentSystemTimeMs(final long currentSystemTimeMs) {
        this.currentSystemTimeMs = currentSystemTimeMs;
    }

...

currentStreamTimeMs

Create new public API (org.apache.kafka.streams.processor.ProcessorContext#currentStreamTimeMs).

...

org.apache.kafka.streams.processor.MockProcessorContext#setCurrentStreamTimeMs

This method sets stream timeProcessorContextImpl#currentStreamTimeMs).

Code Block
languagejava
	public    @Override
    public void setCurrentStreamTimeMs(final long currentStreamTimeMs() {
        this.currentStreamTimeMs = throwUnsupportedOperationExceptionIfStandby("currentStreamTimeMs");
        return streamTask.streamTime();
    }}

Proposed Changes

The goal is to create two separate public API's that return the current cached system time in milliseconds and the current stream time in milliseconds.


currentSystemTimeMs

It is expected that this will return the internally cached system timestamp from the Kafka Stream runtime. Thus, it may return a different value compared to System.currentTimeMillis(). The cached system time represents the time when we start processing / punctuating, and it would not change throughout the process / punctuate. So this method will return current system time (also called wall-clock time) known from kafka streams runtime. 


currentStreamTimeMs

It is expected that this will return the StreamTask's time from the partition group. This stream time will be the maximum timestamp of any record yet processed by the task. This would provide the actual stream time because relying on the timestamp of records is not reliable when the records might not be processed in order of time.

Compatibility, Deprecation, and Migration Plan

N/AThis change will be backwards compatible because it is adding two new API's. However we want to deprecate org.apache.kafka.streams.processor.MockProcessorContext#setTimestamp as it's name is misleading and we are adding new method  org.apache.kafka.streams.processor.MockProcessorContext#setRecordTimestamp which does the same work.

Rejected Alternatives

One single API with punctuator type as input

...