Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Testing with the TopologyTestDriver will lead to a time mismatch between The motivation is to source the wall-clock time the same way in normal execution environment and in tests. A generic example would be if you want to execute an action when a message arrives based on the current time and then potentially some time later. Currently, if you want to implement a solution that uses wall-clock time and system time. This is because you can programmatically advance the wall-clock using advanceWallClockTime. The goal is to give the consumer the internal system time instead of them having to rely on performs actions based on it, you would use System.currentTimeMillis(). The problem is that tests have no control over this value because it is using 'MockTime'.

For stream time, the time is advanced before processing a new record that has a time higher than the currently known stream time. Therefore, it is possible that the ProcessorContext#timestamp may be earlier than the streamTime if the records not processed in order by time. It is simply not reliable to use solely timestamp and System.currentTimeMillis() for actions based on System.currentTimeMillis(). The same is also true for stream time.

Public Interfaces

Add new public API for currentSystemTimeMs:

...

The goal is to create two separate public API's that return the current system time in milliseconds and the current stream time in milliseconds.


currentSystemTimeMs

Move existing internal API (org.apache.kafka.streams.processor.internals.InternalProcessorContext#currentSystemTimeMs)

to become a new public API (org.apache.kafka.streams.processor.ProcessorContext#currentSystemTimeMs).

currentStreamTimeMs

Create new public API (org.apache.kafka.streams.processor.ProcessorContext#currentStreamTimeMs).

Implement this API (org.apache.kafka.streams.processor.ProcessorContextImpl#currentStreamTimeMs).

Code Block
languagejava
    @Override
    public long currentStreamTimeMs() {
        throwUnsupportedOperationExceptionIfStandby("currentStreamTimeMs");
        return streamTask.streamTime();
    }

It is expected that this will return the wall-clock time in a normal execution environment and the mocked wall-clock time in a testing environment. This time already exists internally, so the goal is to expose it to the public.


currentStreamTimeMs

It is expected that this will return the StreamTask's time from the partition group. This would provide the actual stream time because relying on the timestamp of records is not reliable when the records might not be processed in order of time.

Compatibility, Deprecation, and Migration Plan

N/AThis change will be backwards compatible because it is adding two new API's and no other method will be changed or removed.

Rejected Alternatives

One single API with punctuator type as input

...